Locust直译是“蝗虫”的意思,意在压测时产生的压力就像是漫天蝗虫一样,铺天盖地。
Locust是用Python实现的开源性能测试框架,不同于其他压测工具基于进程/线程产生压力,Locust是完全基于事件,支持分布式,一个Locust节点可以在一个进程中轻松支持上千并发用户。
locust特点- 基于协程 ,低成本实现更多并发;
- 脚本增强(“测试即代码”),不支持脚本录制,通过编写代码,提供更大的灵活性;
- HttpUser类使用了requests发送http请求,FastHttpUser类基于geventhttpclient实现,性能更高;
- 分布式压测更加简单,可以在同一服务器上使用分布式,也可以在多台服务器上进行分布式压测;
- 使用Flask 提供WebUI,同时提供无界面压测;
- 有第三方插件、 易于扩展,只要有相关协议的包,可以随机扩压测压。
pip install locust # 查看安装版本 locust -v
locust使用帮助:locust --help
测试脚本执行事件顺序
由于许多设置和清除操作是相互依赖的,因此以下是它们的执行顺序:
- Locust setup (一次)
- TaskSet setup (一次)
- TaskSet on_start (每个locust一次)
- TaskSet tasks…
- TaskSet on_stop (每个locust一次)
- TaskSet teardown (一次)
- Locust teardown (一次) 通常,setup和teardown方法应该是互补的。
- 创建测试用户类,用于继承HttpUser或者FastHttpUser;
- 创建测试类,继承TaskSet类;
- __init__()方法进行数据初始化;
- on_start方法进行测试用例执行前准备,如执行登录;
- on_end方法测试结束后执行;
- 测试用例编写,待测用例方法前加@locust.task(weight: int=1)装饰器
- 使用self.client进行get/post/put/delete等请求
import hashlib
import json
import logging
from urllib import parse
from locust import TaskSet, task, between
from locust.contrib.fasthttp import FastHttpUser
logging.basicConfig(level=logging.INFO, # filename="info.log", filemode="w ",
format="%(asctime)s %(filename)s %(funcName)s %(levelname)s Line:%(lineno)s :%(message)s")
logger = logging.getLogger(__name__)
HOST = "10.209.1.12"
PORT = "8081"
def get_hash_md5(text):
# 返回MD5值
text = text.encode('utf-8') if isinstance(text, str) else text # 转换bytes
try:
# m = hashlib.md5(b'credit')
m = hashlib.md5()
m.update(text)
return m.hexdigest()
except Exception as e:
return False
def dict_to_params_str(di):
s = ""
for k, v in di.items():
if isinstance(v, dict):
v = str(v)
s = "{}={}&".format(k, v)
return s[:-1]
def get_data_result(data):
data = data.decode('utf-8') if isinstance(data, bytes) else data
data = json.loads(data) if isinstance(data, str) else data
if int(data.get("errno")) == 0:
return True
return False
class UserTask(TaskSet):
def __init__(self, parent):
super().__init__(parent)
self._cookies = ""
self._headers = headers
def on_start(self):
logger.info("测试开始...")
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"Cookie": "sidebarStatus=1",
"Host": "{}:{}".format(HOST, PORT),
"Origin": "http://{}:{}".format(HOST, PORT),
"Referer": "http://{}:{}/dist/".format(HOST, PORT),
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36",
}
data = {
"username": "admin",
"password": "62638c576e044bffcbe34ff77554d43e"}
resp = self.client.post("/login", data=dict_to_params_str(data), name='登录',
headers=headers, catch_response=True)
if resp.status_code == 200:
res = get_data_result(resp.content.decode('utf-8'))
if res:
self._cookies = resp.headers['Set-Cookie']
logger.info(self._cookies)
self._headers["Cookie"] = self._cookies
logger.info("登录成功!")
else:
logger.warning("登录失败:{}".format(resp.content.decode('utf-8')))
return
def on_stop(self):
logger.info("测试结束!")
@task
def test_host_getlist_terminal(self):
params = {
"limit": "10",
"page": "1",
"prodcombo": "360exthost/pcinfo",
"filter": '{"id": "-1"}'
}
with self.client.get("/host/getlist", name='终端概况', data=dict_to_params_str(params),
headers=self._headers, catch_response=True) as resp:
if resp.status_code == 200:
if get_data_result(resp.content.decode('utf-8')):
resp.success()
else:
resp.failure("resp data error:{}".format(resp.content.decode('utf-8')))
else:
resp.failure('request status error:{}'.format(resp.status_code))
class WebsiteUser(FastHttpUser): # HttpUser
host = 'http://10.209.1.12:8081'
wait_time = between(0, 0.01) # 每个请求间隔/秒
tasks = [UserTask]
if __name__ == '__main__':
# os.system("locust -f web_api.py --host=http://10.173.220.46:8081 --csv=req --logfile=info.log")
os.system("locust -f web_api.py --loglevel=INFO")
class WebSocketClient(InstanceClass):
def __init__(self):
headers = ["Accept-Encoding:gzip, deflate, br",
"Accept-Language:zh-CN,zh;q=0.9",
"Cache-Control:no-cache",
"Connection:Upgrade",
"Host:{}:36060".format(HOST),
"Origin:file://",
"Pragma:no-cache",
"Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits",
"Sec-WebSocket-Key:{}".format(get_websocket_key()),
"Sec-WebSocket-Version:13",
"Upgrade:websocket",
"User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.366",
]
self.ws = create_connection(WS_URL, header=headers, sslopt={"cert_reqs": ssl.CERT_NONE})
def on_message(self, message):
message = message.decode('utf-8') if isinstance(message, bytes) else message
logger.info("接收数据: {}".format(message))
def on_error(self, error):
logger.info("错误信息: {}".format(error))
def on_close(self):
logger.info("### closed ###")
def on_open(self):
logger.info("####on openen####")
class WebSocketLocust(User):
abstract = True
def __init__(self, parent):
super(WebSocketLocust, self).__init__(parent)
self.s_client = WebSocketClient()
class UserBehavior(TaskSet):
def __init__(self, parent):
super().__init__(parent)
self.wss = self.parent.s_client.ws
def get_m2_info(self):
try:
param = self.parent.q.get()
# self.parent.q.put_nowait(param) # 获取完数据重新加入队列
return param
except queue.Empty:
logger.error("queue is Empty!!!")
exit(0)
def on_start(self):
logger.info(f"发送AUTH数据...")
self.client_auth()
logger.info(f"资产上报...")
self.test_asset_push()
# self.wss.ping()
self.wss.pong()
threading.Thread(target=self.recv).start()
# self.recv()
def on_stop(self):
logger.info('---------- task stop ------------')
self.wss.close()
def recv(self):
logger.info("进入接收数据方法...")
while True:
res = self.wss.recv()
try:
data = json.loads(res)
logger.info("接收数据receive:{}".format(data))
except websocket.WebSocketTimeoutException as e:
logger.exception("接收数据异常:{}".format(str(e)))
except websocket.WebSocketConnectionClosedException as e:
logger.warning("websocket close!!!")
def assert_result(self, name, result, start_time, msg=""):
if result:
events.request_success.fire(
request_type=name,
name=name "_succ",
response_time=int(time.time()) - start_time,
response_length=0)
else:
events.request_failure.fire(request_type=name, name=name "_fail",
response_time=int(time.time()) - start_time,
exception=msg, response_length=1)
def sends(self, msg, name):
# logger.info("发送类型:{}, 数据类型:{} 数据: {}!".format(name, type(msg), msg))
start_time = int(time.time())
try:
self.wss.send(json.dumps(msg))
self.assert_result(name, True, start_time)
except Exception as e:
logger.error(f"发送错误: {str(e)}")
self.assert_result(name, False, start_time, msg=str(e))
def client_auth(self):
# 客户端认证
req = {"msgType": 0, "epGuid": self.uuid, "authData": get_sha256(self.uuid ".eppmc")}
logger.info("客户端认证,发送AUTH数据: {}".format(req))
self.sends(req, "auth_conn")
@task
def policy_type_task(self):
# 3. 获取策略 POLICY_REQUEST: 101
_policy_type = random.choice(policy_type)
name = sys._getframe().f_code.co_name
req = {"epGuid": self.uuid, "_os": 1, "msgType": 104,
"_guid": self.uuid, "policyHash": get_str_md5(self.uuid),
"policyType": "SetSecurityReinforce"}
self.sends(req, name=name)
import queue
import gnsq
from locust import between, task, User, TaskSet, events, tag
class InstanceClass(object):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(InstanceClass, cls).__new__(cls, *args, **kwargs)
return cls._instance
class WebNsqclient(InstanceClass):
"""nsq连接"""
def __init__(self):
super(WebNsqclient, self).__init__()
"""建立nsq连接"""
self.producer_list = []
self.topic = "CloudSafeLine.EEPP"
# self.cwpp_topic = "CloudSafeLine.CCPP"
# _nsq_addr = f"{random.choice(nsq_host_list)}:{nsq_port}"
self.producer = gnsq.Producer(nsq_addr)
self.producer.start()
@property
def producers(self):
return random.choice(self.producer_list)
def on_message(self, data):
"""nsq消息发送,发送成功返回OK"""
zip_data = data_compress(data)
res = self.producer.publish(self.topic, zip_data) # 报文发送
return res
def on_end(self):
"""nsq连接关闭"""
self.producer.close()
class WebNsqLocust(User):
"""自定义locust User类"""
abstract = True
def __init__(self, parent):
super(WebNsqLocust, self).__init__(parent)
self.client = WebNsqclient()
class UserTask(TaskSet):
def __init__(self, parent):
super().__init__(parent)
monitor_name = "monitor"
def get_m2_info(self):
try:
param = self.parent.q.get() # 参数化
# self.parent.q.put_nowait(param) # 获取完数据重新加入队列
return param
except queue.Empty:
logger.error("queue is Empty!!!")
exit(0)
def on_start(self):
logger.debug("开始测试...")
# threading.Thread(target=monitor_redis).start()
global START_FLAG
if not START_FLAG:
START_FLAG = True
threading.Thread(target=monitor_redis).start() # redis监控
monitor_name = "monitor"
monitor_time = 5 # 分钟 minutes
easynmon_monitor_start(easynmon_ip_port, name=monitor_name, monitor_time=monitor_time) # easynmon监控
def on_end(self):
logger.debug("测试结束!!!")
self.client.on_end()
easynmon_monitor_stop(easynmon_ip_port)
def assert_result(self, name, result, start_time):
if result.decode() == "OK":
events.request_success.fire(
request_type=name,
name=name "_succ",
response_time=int((time.time() - start_time) * 1000),
response_length=0)
else:
events.request_failure.fire(request_type=name, name=name "_fail",
response_time=int((time.time() - start_time) * 1000),
exception=result, response_length=1)
@task
@tag("pluginmgr", "test")
def pluginmgr_task(self):
name = sys._getframe().f_code.co_name
param = self.get_m2_info()
data = pluginmgr_info(param)
start_time = time.time()
res = self.client.on_message(data)
self.assert_result(name, res, start_time)
1、参数化
- 引入队列的概念 queue ,实现方式是将参数推入队列,测试时依次取出,全部取完后 locust 会自动停止。若是使用参数循环压测,需要将取出的参数再推入队尾。
2、检查点
- 使用self.client提供的catch_response=True`参数, 添加locust提供的ResponseContextManager类的上下文方法手动设置检查点。
- ResponseContextManager里面的有两个方法来声明成功和失败,分别是success和failure。其中failure方法需要我们传入一个参数,内容就是失败的原因。
3、思考时间
- wait_time = between(0, 0.01)
- wait_time = constant(1)
- wait_time = constant_pacing/1)
4、权重
- 类中测试方法执行比例通过@task(int)进行控制,默认1;
- 一个文件多个类中可以通过weight=1进行控制;
5、集合点:模拟一定数量的用户,同时并发请求。
from locust import HttpUser, TaskSet, task,between,events
from gevent._semaphore import Semaphore
all_locusts_spawned = Semaphore() # 创建集合点
all_locusts_spawned.acquire() # 阻塞线程
# 注册事件
@events.spawning_complete.add_listener
def on_hatch_complete(**kwargs):
# Select_task类的钩子方法
# 挂在到locust钩子函数(所有的Locust实例产生完成时触发
all_locusts_spawned.release()
num = 0
class UserTask(TaskSet):
def login(self):
global num
num = 1
print("%s个虚拟用户开始启动,并登录"%n)
self.client.post("/login", data={"username":"wantest001gwu", "password":"test00123"})
def logout(self):
print("退出登录")
def on_start(self):
self.login()
all_locusts_spawned.wait()
@task(4)
def test1(self):
param = {"limit":1, "offset":0}
with self.client.get("/list", params=param, headers={}, catch_response = True) as response:
print("用户浏览首页商品列表")
6、分布式
locust -f locust.py --master --web-host=0.0.0.0 --loglevel=ERROR
locust -f locust.py --worker --master-host=10.20.14.24 --loglevel=ERROR
7、无界面测试
总结
locust不支持录制,使用python进行脚本开发,虽然需要一定编码基础,但是灵活性更大;可以通过代码组合方式实现更多场景!
通过代码增强,测试脚本中日常用到的检查点、参数化、集合、思考时间等都可以实现,而且分布式压测特别方便!
缺点是压测过程中,对服务器的检测需要借助其他工具进行!
,