1、安装 pip
pip install faststream
2.需要使用一个中间人,我这里用的是redis作为中间人
3.使用案例,新建一个send文件
from redis import Redis, ConnectionPool,
import json
pool = ConnectionPool.from_url(
url='redis://账号:密码@ip:端口号/0',
decode_responses=True
)
redis_client = Redis(connection_pool=pool)
class Queue:
WebSocket = 'test' # 任务队列名称
def send(code):
message = {"data": code}
redis_client.lpush(Queue.WebSocket, json.dumps(message))
send("test")
这里主要是用Redis 向队列test发送了一个数据
2.新建一个server.py,用来处理队列的信息
from faststream import FastStream
from loguru import logger as uru_logger
from faststream.redis import RedisBroker
from models.send import Queue
import asyncio
# 创建 FastStream redis 中间人
redis_broker = RedisBroker('redis://账号:密码@ip:端口号/0')
# 创建 FastStream 实例
fast_stream = FastStream(redis_broker)
app = FastStream(redis_broker, logger=uru_logger)
@redis_broker.subscriber(list=Queue.WebSocket) # 这是订阅这个队列
async def handle_websocket_message(message):
print(message)
print(type(message))
_str = message.decode('utf-8')
print(_str) # test
if __name__ == '__main__':
asyncio.run(app.run()) # 因为这个是异步的,所以就只能asyncio这样去运行,以及函数定义带有这个async
3.这样你就可以发现redis里面的队列一直被实时消费了