Tasks
当负载测试开始时,将为每个模拟用户创建一个User类的实例,他们将开始在自己的greenlet中运行。当这些用户运行时,他们会选择要执行的任务,休眠一段时间,然后选择一个新任务等等。
@task装饰器
为用户添加任务的最简单方法是使用task装饰器。
from locust import User, task, constant
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
print("User instance (%r) executing my_task" % self)
@task接受一个可选的权重参数,可用于指定任务的执行比率。在以下示例中,任务2被选中的可能性是任务1的两倍:
from locust import User, task, between
class MyUser(User):
wait_time = between(5, 15)
@task(3)
def task1(self):
pass
@task(6)
def task2(self):
pass
tasks属性
定义用户任务的另一种方法是设置任务属性。
tasks属性是tasks列表或Task:int字典,其中Task是python可调用对象或TaskSet类。如果任务是一个普通的python函数,它们会收到一个参数,即执行任务的User实例。
以下是一个声明为普通python函数的User任务的示例:
from locust import User, constant
def my_task(user):
pass
class MyUser(User):
tasks = [my_task]
wait_time = constant(1)
如果任务属性指定为列表,则每次执行任务时,都会从任务属性中随机选择。然而,如果任务是一个字典——以可调用对象作为键,以整数作为值——则将随机选择要执行的任务,但以整数作为比率。因此,对于一个看起来像这样的任务:
{my_task: 3, another_task: 1}
my_task被执行的可能性是另一个task的3倍。
在内部,上面的dict实际上会扩展成一个列表(任务属性也会更新),看起来像这样:
[my_task, my_task, my_task, another_task]
然后使用Python的random.choice()从列表中选择任务。
@tag装饰器
通过使用@tag装饰器标记任务,您可以使用–tags和–exclude tags参数来挑剔测试期间执行的任务。考虑以下示例:
from locust import User, constant, task, tag
class MyUser(User):
wait_time = constant(1)
@tag('tag1')
@task
def task1(self):
pass
@tag('tag1', 'tag2')
@task
def task2(self):
pass
@tag('tag3')
@task
def task3(self):
pass
@task
def task4(self):
pass
如果您使用–tags tag1开始此测试,则在测试期间只会执行task1和task2。如果从–tags tag2 tag3开始,则只会执行task2和task3。
–exclude-tags的行为将完全相反。因此,如果您以–exclude-tags tag3开始测试,则只会执行任务1、任务2和任务4。排除总是胜过包含,所以如果一个任务有一个你包含的标签和一个你排除的标签,它就不会被执行。
Events
如果你想在测试中运行一些设置代码,通常将其放在locustfile的模块级别就足够了,但有时你需要在运行的特定时间做一些事情。为了满足这一需求,Locust提供了事件钩子。
test_start和test_stop
如果你需要在负载测试的开始或停止时运行一些代码,你应该使用test_start和test_stop事件。您可以在locustfile的模块级别为这些事件设置监听器:
from locust import events
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("A new test is starting")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("A new test is ending")
init
init事件在每个Locust进程开始时触发。这在分布式模式下特别有用,在这种模式下,每个工作进程(而不是每个用户)都需要一个机会来进行一些初始化。例如,假设你有一个全局状态,所有从这个过程中产生的用户都需要它:
from locust import events
from locust.runners import MasterRunner
@events.init.add_listener
def on_locust_init(environment, **kwargs):
if isinstance(environment.runner, MasterRunner):
print("I'm on master node")
else:
print("I'm on a worker or standalone node")
HttpUser class
HttpUser是最常用的User。它添加了一个用于发出HTTP请求的客户端属性。
from locust import HttpUser, task, between
class MyUser(HttpUser):
wait_time = between(5, 15)
@task(4)
def index(self):
self.client.get("/")
@task(1)
def about(self):
self.client.get("/about/")
client属性/HttpSession
client是HttpSession的一个实例。HttpSession是requests.Session的子类,因此其功能有很好的文档记录,许多人应该熟悉。HttpSession添加的主要是将请求结果报告到Locust中(成功/失败、响应时间、响应长度、名称)。
它包含所有HTTP方法的方法:get、post、put…
就像requests.Session一样,它在请求之间保留Cookie,因此可以轻松用于登录网站。
发出POST请求,查看响应,并隐式重用我们为第二个请求获得的任何会话cookie
response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")
HttpSession捕获任何请求。Session抛出的RequestException(由连接错误、超时或类似原因引起),而是返回一个虚拟的Response对象,其status_code设置为0,内容设置为None。
验证响应
如果HTTP响应代码正常(<400),则认为请求成功,但对响应进行一些额外的验证通常是有用的。
您可以使用catch_response参数、with语句和response.failure()调用将请求标记为失败
with self.client.get("/", catch_response=True) as response:
if response.text != "Success":
response.failure("Got wrong response")
elif response.elapsed.total_seconds() > 0.5:
response.failure("Request took too long")
您还可以将请求标记为成功,即使响应代码不正确:
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
response.success()
您甚至可以通过抛出异常并在with块外捕获它来避免记录请求。或者你可以抛出一个locust exception,就像下面的例子一样,让locust捕捉它。
from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
raise RescheduleTask()
REST/JSON APIs
FastHttpUser提供了一个现成的rest方法,但你也可以自己做:
from json import JSONDecodeError
...
with self.client.post("/", json={"foo": 42, "bar": None}, catch_response=True) as response:
try:
if response.json()["greeting"] != "hello":
response.failure("Did not get expected value in greeting")
except JSONDecodeError:
response.failure("Response could not be decoded as JSON")
except KeyError:
response.failure("Response did not contain expected key 'greeting'")
分组请求
网站的URL包含某种动态参数的页面很常见。通常,在用户统计数据中将这些URL分组在一起是有意义的。这可以通过将name参数传递给HttpSession的不同请求方法来实现。
例子:
# Statistics for these requests will be grouped under: /blog/?id=[id]
for i in range(10):
self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")
可能存在无法将参数传递到请求函数的情况,例如在与包装Requests会话的库/SDK交互时。通过设置client.request_name属性,提供了一种对请求进行分组的替代方法。
# Statistics for these requests will be grouped under: /blog/?id=[id]
self.client.request_name="/blog?id=[id]"
for i in range(10):
self.client.get("/blog?id=%i" % i)
self.client.request_name=None
如果你想用最少的样板链接多个分组,你可以使用client.rename_request()上下文管理器。
@task
def multiple_groupings_example(self):
# Statistics for these requests will be grouped under: /blog/?id=[id]
with self.client.rename_request("/blog?id=[id]"):
for i in range(10):
self.client.get("/blog?id=%i" % i)
# Statistics for these requests will be grouped under: /article/?id=[id]
with self.client.rename_request("/article?id=[id]"):
for i in range(10):
self.client.get("/article?id=%i" % i)
使用catch_response并直接访问request_meta,您甚至可以根据响应中的内容重命名请求。
with self.client.get("/", catch_response=True) as resp:
resp.request_meta["name"] = resp.json()["name"]
HTTP代理设置
为了提高性能,我们通过设置请求来配置请求,使其不在环境中查找HTTP代理设置。会话的trust_env属性为False。如## Tasks
当负载测试开始时,将为每个模拟用户创建一个User类的实例,他们将开始在自己的greenlet中运行。当这些用户运行时,他们会选择要执行的任务,休眠一段时间,然后选择一个新任务等等。
@task装饰器
为用户添加任务的最简单方法是使用task装饰器。
from locust import User, task, constant
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
print("User instance (%r) executing my_task" % self)
@task接受一个可选的权重参数,可用于指定任务的执行比率。在以下示例中,任务2被选中的可能性是任务1的两倍:
from locust import User, task, between
class MyUser(User):
wait_time = between(5, 15)
@task(3)
def task1(self):
pass
@task(6)
def task2(self):
pass
tasks属性
定义用户任务的另一种方法是设置任务属性。
tasks属性是tasks列表或Task:int字典,其中Task是python可调用对象或TaskSet类。如果任务是一个普通的python函数,它们会收到一个参数,即执行任务的User实例。
以下是一个声明为普通python函数的User任务的示例:
from locust import User, constant
def my_task(user):
pass
class MyUser(User):
tasks = [my_task]
wait_time = constant(1)
如果任务属性指定为列表,则每次执行任务时,都会从任务属性中随机选择。然而,如果任务是一个字典——以可调用对象作为键,以整数作为值——则将随机选择要执行的任务,但以整数作为比率。因此,对于一个看起来像这样的任务:
{my_task: 3, another_task: 1}
my_task被执行的可能性是另一个task的3倍。
在内部,上面的dict实际上会扩展成一个列表(任务属性也会更新),看起来像这样:
[my_task, my_task, my_task, another_task]
然后使用Python的random.choice()从列表中选择任务。
@tag装饰器
通过使用@tag装饰器标记任务,您可以使用–tags和–exclude tags参数来挑剔测试期间执行的任务。考虑以下示例:
from locust import User, constant, task, tag
class MyUser(User):
wait_time = constant(1)
@tag('tag1')
@task
def task1(self):
pass
@tag('tag1', 'tag2')
@task
def task2(self):
pass
@tag('tag3')
@task
def task3(self):
pass
@task
def task4(self):
pass
如果您使用–tags tag1开始此测试,则在测试期间只会执行task1和task2。如果从–tags tag2 tag3开始,则只会执行task2和task3。
–exclude-tags的行为将完全相反。因此,如果您以–exclude-tags tag3开始测试,则只会执行任务1、任务2和任务4。排除总是胜过包含,所以如果一个任务有一个你包含的标签和一个你排除的标签,它就不会被执行。
Events
如果你想在测试中运行一些设置代码,通常将其放在locustfile的模块级别就足够了,但有时你需要在运行的特定时间做一些事情。为了满足这一需求,Locust提供了事件钩子。
test_start和test_stop
如果你需要在负载测试的开始或停止时运行一些代码,你应该使用test_start和test_stop事件。您可以在locustfile的模块级别为这些事件设置监听器:
from locust import events
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("A new test is starting")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("A new test is ending")
init
init事件在每个Locust进程开始时触发。这在分布式模式下特别有用,在这种模式下,每个工作进程(而不是每个用户)都需要一个机会来进行一些初始化。例如,假设你有一个全局状态,所有从这个过程中产生的用户都需要它:
from locust import events
from locust.runners import MasterRunner
@events.init.add_listener
def on_locust_init(environment, **kwargs):
if isinstance(environment.runner, MasterRunner):
print("I'm on master node")
else:
print("I'm on a worker or standalone node")
HttpUser class
HttpUser是最常用的User。它添加了一个用于发出HTTP请求的客户端属性。
from locust import HttpUser, task, between
class MyUser(HttpUser):
wait_time = between(5, 15)
@task(4)
def index(self):
self.client.get("/")
@task(1)
def about(self):
self.client.get("/about/")
client属性/HttpSession
client是HttpSession的一个实例。HttpSession是requests.Session的子类,因此其功能有很好的文档记录,许多人应该熟悉。HttpSession添加的主要是将请求结果报告到Locust中(成功/失败、响应时间、响应长度、名称)。
它包含所有HTTP方法的方法:get、post、put…
就像requests.Session一样,它在请求之间保留Cookie,因此可以轻松用于登录网站。
发出POST请求,查看响应,并隐式重用我们为第二个请求获得的任何会话cookie
response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")
HttpSession捕获任何请求。Session抛出的RequestException(由连接错误、超时或类似原因引起),而是返回一个虚拟的Response对象,其status_code设置为0,内容设置为None。
验证响应
如果HTTP响应代码正常(<400),则认为请求成功,但对响应进行一些额外的验证通常是有用的。
您可以使用catch_response参数、with语句和response.failure()调用将请求标记为失败
with self.client.get("/", catch_response=True) as response:
if response.text != "Success":
response.failure("Got wrong response")
elif response.elapsed.total_seconds() > 0.5:
response.failure("Request took too long")
您还可以将请求标记为成功,即使响应代码不正确:
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
response.success()
您甚至可以通过抛出异常并在with块外捕获它来避免记录请求。或者你可以抛出一个locust exception,就像下面的例子一样,让locust捕捉它。
from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
raise RescheduleTask()
REST/JSON APIs
FastHttpUser提供了一个现成的rest方法,但你也可以自己做:
from json import JSONDecodeError
...
with self.client.post("/", json={"foo": 42, "bar": None}, catch_response=True) as response:
try:
if response.json()["greeting"] != "hello":
response.failure("Did not get expected value in greeting")
except JSONDecodeError:
response.failure("Response could not be decoded as JSON")
except KeyError:
response.failure("Response did not contain expected key 'greeting'")
分组请求
网站的URL包含某种动态参数的页面很常见。通常,在用户统计数据中将这些URL分组在一起是有意义的。这可以通过将name参数传递给HttpSession的不同请求方法来实现。
例子:
# Statistics for these requests will be grouped under: /blog/?id=[id]
for i in range(10):
self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")
可能存在无法将参数传递到请求函数的情况,例如在与包装Requests会话的库/SDK交互时。通过设置client.request_name属性,提供了一种对请求进行分组的替代方法。
# Statistics for these requests will be grouped under: /blog/?id=[id]
self.client.request_name="/blog?id=[id]"
for i in range(10):
self.client.get("/blog?id=%i" % i)
self.client.request_name=None
如果你想用最少的样板链接多个分组,你可以使用client.rename_request()上下文管理器。
@task
def multiple_groupings_example(self):
# Statistics for these requests will be grouped under: /blog/?id=[id]
with self.client.rename_request("/blog?id=[id]"):
for i in range(10):
self.client.get("/blog?id=%i" % i)
# Statistics for these requests will be grouped under: /article/?id=[id]
with self.client.rename_request("/article?id=[id]"):
for i in range(10):
self.client.get("/article?id=%i" % i)
使用catch_response并直接访问request_meta,您甚至可以根据响应中的内容重命名请求。
with self.client.get("/", catch_response=True) as resp:
resp.request_meta["name"] = resp.json()["name"]
HTTP代理设置
为了提高性能,我们通过设置请求来配置请求,使其不在环境中查找HTTP代理设置。会话的trust_env属性为False。如果你不想这样,你可以手动将locost_instance.client.trust_env设置为True。
连接重用
默认情况下,连接由HttpUser重用,即使在任务运行之间也是如此。为了避免连接重用,您可以执行以下操作:
self.client.get("/", headers={"Connection": "close"})
self.client.get("/new_connection_here")
或者,您可以关闭整个请求。会话对象(这也会删除Cookie、关闭SSL会话等)。这会有一些CPU开销(由于SSL重新协商等原因,下一个请求的响应时间会更高),所以除非你真的需要,否则不要使用它。
self.client.get("/")
self.client.close()
self.client.get("/new_connection_here")
连接池
当每个HttpUser创建新的HttpSession时,每个用户实例都有自己的连接池。这类似于真实用户(浏览器)与web服务器的交互方式。
如果您想共享连接,可以使用单个池管理器。为此,请将pool_manager类属性设置为urllib3.PoolManager的实例。
from locust import HttpUser
from urllib3 import PoolManager
class MyUser(HttpUser):
# All instances of this class will be limited to 10 concurrent connections at most.
pool_manager = PoolManager(maxsize=10, block=True)
果你不想这样,你可以手动将locost_instance.client.trust_env设置为True。
连接重用
默认情况下,连接由HttpUser重用,即使在任务运行之间也是如此。为了避免连接重用,您可以执行以下操作:
self.client.get("/", headers={"Connection": "close"})
self.client.get("/new_connection_here")
或者,您可以关闭整个请求。会话对象(这也会删除Cookie、关闭SSL会话等)。这会有一些CPU开销(由于SSL重新协商等原因,下一个请求的响应时间会更高),所以除非你真的需要,否则不要使用它。
self.client.get("/")
self.client.close()
self.client.get("/new_connection_here")
连接池
当每个HttpUser创建新的HttpSession时,每个用户实例都有自己的连接池。这类似于真实用户(浏览器)与web服务器的交互方式。
如果您想共享连接,可以使用单个池管理器。为此,请将pool_manager类属性设置为urllib3.PoolManager的实例。
from locust import HttpUser
from urllib3 import PoolManager
class MyUser(HttpUser):
# All instances of this class will be limited to 10 concurrent connections at most.
pool_manager = PoolManager(maxsize=10, block=True)