性能测试:Locust使用介绍(二)

发布于:2024-09-18 ⋅ 阅读:(126) ⋅ 点赞:(0)

Tasks

当负载测试开始时,将为每个模拟用户创建一个User类的实例,他们将开始在自己的greenlet中运行。当这些用户运行时,他们会选择要执行的任务,休眠一段时间,然后选择一个新任务等等。

@task装饰器

为用户添加任务的最简单方法是使用task装饰器。

from locust import User, task, constant

class MyUser(User):
    wait_time = constant(1)

    @task
    def my_task(self):
        print("User instance (%r) executing my_task" % self)

@task接受一个可选的权重参数,可用于指定任务的执行比率。在以下示例中,任务2被选中的可能性是任务1的两倍:

from locust import User, task, between

class MyUser(User):
    wait_time = between(5, 15)

    @task(3)
    def task1(self):
        pass

    @task(6)
    def task2(self):
        pass

tasks属性

定义用户任务的另一种方法是设置任务属性。

tasks属性是tasks列表或Task:int字典,其中Task是python可调用对象或TaskSet类。如果任务是一个普通的python函数,它们会收到一个参数,即执行任务的User实例。

以下是一个声明为普通python函数的User任务的示例:

from locust import User, constant

def my_task(user):
    pass

class MyUser(User):
    tasks = [my_task]
    wait_time = constant(1)

如果任务属性指定为列表,则每次执行任务时,都会从任务属性中随机选择。然而,如果任务是一个字典——以可调用对象作为键,以整数作为值——则将随机选择要执行的任务,但以整数作为比率。因此,对于一个看起来像这样的任务:

{my_task: 3, another_task: 1}

my_task被执行的可能性是另一个task的3倍。

在内部,上面的dict实际上会扩展成一个列表(任务属性也会更新),看起来像这样:

[my_task, my_task, my_task, another_task]

然后使用Python的random.choice()从列表中选择任务。

@tag装饰器

通过使用@tag装饰器标记任务,您可以使用–tags和–exclude tags参数来挑剔测试期间执行的任务。考虑以下示例:

from locust import User, constant, task, tag

class MyUser(User):
    wait_time = constant(1)

    @tag('tag1')
    @task
    def task1(self):
        pass

    @tag('tag1', 'tag2')
    @task
    def task2(self):
        pass

    @tag('tag3')
    @task
    def task3(self):
        pass

    @task
    def task4(self):
        pass

如果您使用–tags tag1开始此测试,则在测试期间只会执行task1和task2。如果从–tags tag2 tag3开始,则只会执行task2和task3。

–exclude-tags的行为将完全相反。因此,如果您以–exclude-tags tag3开始测试,则只会执行任务1、任务2和任务4。排除总是胜过包含,所以如果一个任务有一个你包含的标签和一个你排除的标签,它就不会被执行。

Events

如果你想在测试中运行一些设置代码,通常将其放在locustfile的模块级别就足够了,但有时你需要在运行的特定时间做一些事情。为了满足这一需求,Locust提供了事件钩子。

test_start和test_stop

如果你需要在负载测试的开始或停止时运行一些代码,你应该使用test_start和test_stop事件。您可以在locustfile的模块级别为这些事件设置监听器:

from locust import events

@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    print("A new test is starting")

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    print("A new test is ending")

init

init事件在每个Locust进程开始时触发。这在分布式模式下特别有用,在这种模式下,每个工作进程(而不是每个用户)都需要一个机会来进行一些初始化。例如,假设你有一个全局状态,所有从这个过程中产生的用户都需要它:

from locust import events
from locust.runners import MasterRunner

@events.init.add_listener
def on_locust_init(environment, **kwargs):
    if isinstance(environment.runner, MasterRunner):
        print("I'm on master node")
    else:
        print("I'm on a worker or standalone node")

HttpUser class

HttpUser是最常用的User。它添加了一个用于发出HTTP请求的客户端属性。

from locust import HttpUser, task, between

class MyUser(HttpUser):
    wait_time = between(5, 15)

    @task(4)
    def index(self):
        self.client.get("/")

    @task(1)
    def about(self):
        self.client.get("/about/")

client属性/HttpSession

client是HttpSession的一个实例。HttpSession是requests.Session的子类,因此其功能有很好的文档记录,许多人应该熟悉。HttpSession添加的主要是将请求结果报告到Locust中(成功/失败、响应时间、响应长度、名称)。

它包含所有HTTP方法的方法:get、post、put…

就像requests.Session一样,它在请求之间保留Cookie,因此可以轻松用于登录网站。

发出POST请求,查看响应,并隐式重用我们为第二个请求获得的任何会话cookie

response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")

HttpSession捕获任何请求。Session抛出的RequestException(由连接错误、超时或类似原因引起),而是返回一个虚拟的Response对象,其status_code设置为0,内容设置为None。

验证响应

如果HTTP响应代码正常(<400),则认为请求成功,但对响应进行一些额外的验证通常是有用的。

您可以使用catch_response参数、with语句和response.failure()调用将请求标记为失败

with self.client.get("/", catch_response=True) as response:
    if response.text != "Success":
        response.failure("Got wrong response")
    elif response.elapsed.total_seconds() > 0.5:
        response.failure("Request took too long")

您还可以将请求标记为成功,即使响应代码不正确:

with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        response.success()

您甚至可以通过抛出异常并在with块外捕获它来避免记录请求。或者你可以抛出一个locust exception,就像下面的例子一样,让locust捕捉它。

from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        raise RescheduleTask()

REST/JSON APIs

FastHttpUser提供了一个现成的rest方法,但你也可以自己做:

from json import JSONDecodeError
...
with self.client.post("/", json={"foo": 42, "bar": None}, catch_response=True) as response:
    try:
        if response.json()["greeting"] != "hello":
            response.failure("Did not get expected value in greeting")
    except JSONDecodeError:
        response.failure("Response could not be decoded as JSON")
    except KeyError:
        response.failure("Response did not contain expected key 'greeting'")

分组请求

网站的URL包含某种动态参数的页面很常见。通常,在用户统计数据中将这些URL分组在一起是有意义的。这可以通过将name参数传递给HttpSession的不同请求方法来实现。

例子:

# Statistics for these requests will be grouped under: /blog/?id=[id]
for i in range(10):
    self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")

可能存在无法将参数传递到请求函数的情况,例如在与包装Requests会话的库/SDK交互时。通过设置client.request_name属性,提供了一种对请求进行分组的替代方法。

# Statistics for these requests will be grouped under: /blog/?id=[id]
self.client.request_name="/blog?id=[id]"
for i in range(10):
    self.client.get("/blog?id=%i" % i)
self.client.request_name=None

如果你想用最少的样板链接多个分组,你可以使用client.rename_request()上下文管理器。

@task
def multiple_groupings_example(self):
    # Statistics for these requests will be grouped under: /blog/?id=[id]
    with self.client.rename_request("/blog?id=[id]"):
        for i in range(10):
            self.client.get("/blog?id=%i" % i)

    # Statistics for these requests will be grouped under: /article/?id=[id]
    with self.client.rename_request("/article?id=[id]"):
        for i in range(10):
            self.client.get("/article?id=%i" % i)

使用catch_response并直接访问request_meta,您甚至可以根据响应中的内容重命名请求。

with self.client.get("/", catch_response=True) as resp:
    resp.request_meta["name"] = resp.json()["name"]

HTTP代理设置

为了提高性能,我们通过设置请求来配置请求,使其不在环境中查找HTTP代理设置。会话的trust_env属性为False。如## Tasks

当负载测试开始时,将为每个模拟用户创建一个User类的实例,他们将开始在自己的greenlet中运行。当这些用户运行时,他们会选择要执行的任务,休眠一段时间,然后选择一个新任务等等。

@task装饰器

为用户添加任务的最简单方法是使用task装饰器。

from locust import User, task, constant

class MyUser(User):
    wait_time = constant(1)

    @task
    def my_task(self):
        print("User instance (%r) executing my_task" % self)

@task接受一个可选的权重参数,可用于指定任务的执行比率。在以下示例中,任务2被选中的可能性是任务1的两倍:

from locust import User, task, between

class MyUser(User):
    wait_time = between(5, 15)

    @task(3)
    def task1(self):
        pass

    @task(6)
    def task2(self):
        pass

tasks属性

定义用户任务的另一种方法是设置任务属性。

tasks属性是tasks列表或Task:int字典,其中Task是python可调用对象或TaskSet类。如果任务是一个普通的python函数,它们会收到一个参数,即执行任务的User实例。

以下是一个声明为普通python函数的User任务的示例:

from locust import User, constant

def my_task(user):
    pass

class MyUser(User):
    tasks = [my_task]
    wait_time = constant(1)

如果任务属性指定为列表,则每次执行任务时,都会从任务属性中随机选择。然而,如果任务是一个字典——以可调用对象作为键,以整数作为值——则将随机选择要执行的任务,但以整数作为比率。因此,对于一个看起来像这样的任务:

{my_task: 3, another_task: 1}

my_task被执行的可能性是另一个task的3倍。

在内部,上面的dict实际上会扩展成一个列表(任务属性也会更新),看起来像这样:

[my_task, my_task, my_task, another_task]

然后使用Python的random.choice()从列表中选择任务。

@tag装饰器

通过使用@tag装饰器标记任务,您可以使用–tags和–exclude tags参数来挑剔测试期间执行的任务。考虑以下示例:

from locust import User, constant, task, tag

class MyUser(User):
    wait_time = constant(1)

    @tag('tag1')
    @task
    def task1(self):
        pass

    @tag('tag1', 'tag2')
    @task
    def task2(self):
        pass

    @tag('tag3')
    @task
    def task3(self):
        pass

    @task
    def task4(self):
        pass

如果您使用–tags tag1开始此测试,则在测试期间只会执行task1和task2。如果从–tags tag2 tag3开始,则只会执行task2和task3。

–exclude-tags的行为将完全相反。因此,如果您以–exclude-tags tag3开始测试,则只会执行任务1、任务2和任务4。排除总是胜过包含,所以如果一个任务有一个你包含的标签和一个你排除的标签,它就不会被执行。

Events

如果你想在测试中运行一些设置代码,通常将其放在locustfile的模块级别就足够了,但有时你需要在运行的特定时间做一些事情。为了满足这一需求,Locust提供了事件钩子。

test_start和test_stop

如果你需要在负载测试的开始或停止时运行一些代码,你应该使用test_start和test_stop事件。您可以在locustfile的模块级别为这些事件设置监听器:

from locust import events

@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    print("A new test is starting")

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    print("A new test is ending")

init

init事件在每个Locust进程开始时触发。这在分布式模式下特别有用,在这种模式下,每个工作进程(而不是每个用户)都需要一个机会来进行一些初始化。例如,假设你有一个全局状态,所有从这个过程中产生的用户都需要它:

from locust import events
from locust.runners import MasterRunner

@events.init.add_listener
def on_locust_init(environment, **kwargs):
    if isinstance(environment.runner, MasterRunner):
        print("I'm on master node")
    else:
        print("I'm on a worker or standalone node")

HttpUser class

HttpUser是最常用的User。它添加了一个用于发出HTTP请求的客户端属性。

from locust import HttpUser, task, between

class MyUser(HttpUser):
    wait_time = between(5, 15)

    @task(4)
    def index(self):
        self.client.get("/")

    @task(1)
    def about(self):
        self.client.get("/about/")

client属性/HttpSession

client是HttpSession的一个实例。HttpSession是requests.Session的子类,因此其功能有很好的文档记录,许多人应该熟悉。HttpSession添加的主要是将请求结果报告到Locust中(成功/失败、响应时间、响应长度、名称)。

它包含所有HTTP方法的方法:get、post、put…

就像requests.Session一样,它在请求之间保留Cookie,因此可以轻松用于登录网站。

发出POST请求,查看响应,并隐式重用我们为第二个请求获得的任何会话cookie

response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")

HttpSession捕获任何请求。Session抛出的RequestException(由连接错误、超时或类似原因引起),而是返回一个虚拟的Response对象,其status_code设置为0,内容设置为None。

验证响应

如果HTTP响应代码正常(<400),则认为请求成功,但对响应进行一些额外的验证通常是有用的。

您可以使用catch_response参数、with语句和response.failure()调用将请求标记为失败

with self.client.get("/", catch_response=True) as response:
    if response.text != "Success":
        response.failure("Got wrong response")
    elif response.elapsed.total_seconds() > 0.5:
        response.failure("Request took too long")

您还可以将请求标记为成功,即使响应代码不正确:

with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        response.success()

您甚至可以通过抛出异常并在with块外捕获它来避免记录请求。或者你可以抛出一个locust exception,就像下面的例子一样,让locust捕捉它。

from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        raise RescheduleTask()

REST/JSON APIs

FastHttpUser提供了一个现成的rest方法,但你也可以自己做:

from json import JSONDecodeError
...
with self.client.post("/", json={"foo": 42, "bar": None}, catch_response=True) as response:
    try:
        if response.json()["greeting"] != "hello":
            response.failure("Did not get expected value in greeting")
    except JSONDecodeError:
        response.failure("Response could not be decoded as JSON")
    except KeyError:
        response.failure("Response did not contain expected key 'greeting'")

分组请求

网站的URL包含某种动态参数的页面很常见。通常,在用户统计数据中将这些URL分组在一起是有意义的。这可以通过将name参数传递给HttpSession的不同请求方法来实现。

例子:

# Statistics for these requests will be grouped under: /blog/?id=[id]
for i in range(10):
    self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")

可能存在无法将参数传递到请求函数的情况,例如在与包装Requests会话的库/SDK交互时。通过设置client.request_name属性,提供了一种对请求进行分组的替代方法。

# Statistics for these requests will be grouped under: /blog/?id=[id]
self.client.request_name="/blog?id=[id]"
for i in range(10):
    self.client.get("/blog?id=%i" % i)
self.client.request_name=None

如果你想用最少的样板链接多个分组,你可以使用client.rename_request()上下文管理器。

@task
def multiple_groupings_example(self):
    # Statistics for these requests will be grouped under: /blog/?id=[id]
    with self.client.rename_request("/blog?id=[id]"):
        for i in range(10):
            self.client.get("/blog?id=%i" % i)

    # Statistics for these requests will be grouped under: /article/?id=[id]
    with self.client.rename_request("/article?id=[id]"):
        for i in range(10):
            self.client.get("/article?id=%i" % i)

使用catch_response并直接访问request_meta,您甚至可以根据响应中的内容重命名请求。

with self.client.get("/", catch_response=True) as resp:
    resp.request_meta["name"] = resp.json()["name"]

HTTP代理设置

为了提高性能,我们通过设置请求来配置请求,使其不在环境中查找HTTP代理设置。会话的trust_env属性为False。如果你不想这样,你可以手动将locost_instance.client.trust_env设置为True。

连接重用

默认情况下,连接由HttpUser重用,即使在任务运行之间也是如此。为了避免连接重用,您可以执行以下操作:

self.client.get("/", headers={"Connection": "close"})
self.client.get("/new_connection_here")

或者,您可以关闭整个请求。会话对象(这也会删除Cookie、关闭SSL会话等)。这会有一些CPU开销(由于SSL重新协商等原因,下一个请求的响应时间会更高),所以除非你真的需要,否则不要使用它。

self.client.get("/")
self.client.close()
self.client.get("/new_connection_here")

连接池

当每个HttpUser创建新的HttpSession时,每个用户实例都有自己的连接池。这类似于真实用户(浏览器)与web服务器的交互方式。

如果您想共享连接,可以使用单个池管理器。为此,请将pool_manager类属性设置为urllib3.PoolManager的实例。

from locust import HttpUser
from urllib3 import PoolManager

class MyUser(HttpUser):
    # All instances of this class will be limited to 10 concurrent connections at most.
    pool_manager = PoolManager(maxsize=10, block=True)

果你不想这样,你可以手动将locost_instance.client.trust_env设置为True。

连接重用

默认情况下,连接由HttpUser重用,即使在任务运行之间也是如此。为了避免连接重用,您可以执行以下操作:

self.client.get("/", headers={"Connection": "close"})
self.client.get("/new_connection_here")

或者,您可以关闭整个请求。会话对象(这也会删除Cookie、关闭SSL会话等)。这会有一些CPU开销(由于SSL重新协商等原因,下一个请求的响应时间会更高),所以除非你真的需要,否则不要使用它。

self.client.get("/")
self.client.close()
self.client.get("/new_connection_here")

连接池

当每个HttpUser创建新的HttpSession时,每个用户实例都有自己的连接池。这类似于真实用户(浏览器)与web服务器的交互方式。

如果您想共享连接,可以使用单个池管理器。为此,请将pool_manager类属性设置为urllib3.PoolManager的实例。

from locust import HttpUser
from urllib3 import PoolManager

class MyUser(HttpUser):
    # All instances of this class will be limited to 10 concurrent connections at most.
    pool_manager = PoolManager(maxsize=10, block=True)

网站公告

今日签到

点亮在社区的每一天
去签到