FastStream异步消息队列框架 使用案例python

发布于:2024-07-10 ⋅ 阅读:(109) ⋅ 点赞:(0)

1、安装 pip 

pip install faststream
 

2.需要使用一个中间人,我这里用的是redis作为中间人

3.使用案例,新建一个send文件

from redis import Redis, ConnectionPool,

import json
pool = ConnectionPool.from_url(
    url='redis://账号:密码@ip:端口号/0',
    decode_responses=True
)
redis_client = Redis(connection_pool=pool)



class Queue:
    WebSocket = 'test'  # 任务队列名称


def send(code):
    message = {"data": code}
    redis_client.lpush(Queue.WebSocket, json.dumps(message))


send("test")
这里主要是用Redis 向队列test发送了一个数据

2.新建一个server.py,用来处理队列的信息

from faststream import FastStream
from loguru import logger as uru_logger
from faststream.redis import RedisBroker
from models.send import Queue
import asyncio

# 创建 FastStream redis 中间人
redis_broker = RedisBroker('redis://账号:密码@ip:端口号/0')

# 创建 FastStream 实例
fast_stream = FastStream(redis_broker)

app = FastStream(redis_broker, logger=uru_logger)


@redis_broker.subscriber(list=Queue.WebSocket) # 这是订阅这个队列
async def handle_websocket_message(message):
    print(message)
    print(type(message))
    _str = message.decode('utf-8')
    print(_str)  # test


if __name__ == '__main__':
    asyncio.run(app.run())  # 因为这个是异步的,所以就只能asyncio这样去运行,以及函数定义带有这个async 

3.这样你就可以发现redis里面的队列一直被实时消费了