Python 操作 Redis 的客户端 - Redis Stream
redis-py - Python Redis 客户端
https://redis.io/docs/latest/develop/clients/redis-py/
https://redis-py.pythonlang.cn/en/stable/index.html
redis-py (Redis Python client)
https://github.com/redis/redis-py
The Python interface to the Redis key-value store.
redis-py is the Python client for Redis. redis-py requires a running Redis server.
The sections below explain how to install redis-py and connect your Python application to a Redis database.
Redis Commands
https://redis.readthedocs.io/en/stable/commands.html
Redis 是一个开源的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。
1. Redis Stream
https://redis.com.cn/redis-stream.html
Redis Stream 主要用于消息队列 (Message Queue, MQ),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 是一种数据结构,用于处理大规模的时间序列数据。Redis Stream 的每个条目都有一个唯一的 id
和关联的数据字段,适合用于日志、事件收集或实时数据流等场景。
2. Redis Commands
2.1. CoreCommands.xadd()
(生产端)
https://redis.readthedocs.io/en/stable/commands.html
https://redis.io/docs/latest/commands/xadd/
xadd(name, fields, id='*', maxlen=None, approximate=True, nomkstream=False, minid=None, limit=None, ref_policy=None)
Add to a stream.
name
: name of the stream (name
是队列名称,如果队列 name
不存在就创建。)
fields
: dict of field/value pairs to insert into the stream (添加到流的字段和值。)
id
: location to insert this record. By default it is appended. (消息 id,使用 *
表示由 Redis 自动生成。可以自定义,但是要自己保证递增性。)
maxlen
: truncate old stream members beyond this size. Can’t be specified with minid
. (maxlen
是可选参数,设置流最大长度。)
approximate
: actual stream length may be slightly more than maxlen
nomkstream
: When set to true, do not make a stream
minid
: the minimum id
in the stream to query. Can’t be specified with maxlen
. (可选参数,只有 id
大于给定 minid
的条目才会被加入流。)
limit
: specifies the maximum number of entries to retrieve
ref_policy
: optional reference policy for consumer groups when trimming:
KEEPREF
(default): When trimming, preserves references in consumer groups’ PELDELREF
: When trimming, removes all references from consumer groups’ PELACKED
: When trimming, only removes entries acknowledged by all consumer groups
Parameters
name
(Union[bytes, str, memoryview])fields
(Dict[Union[bytes, bytearray, memoryview, str, int, float], Union[bytes, bytearray, memoryview, str, int, float]])id
(Union[int, bytes, str, memoryview])maxlen
(Optional[int])approximate
(bool)nomkstream
(bool)minid
(Optional[Union[int, bytes, str, memoryview]])limit
(Optional[int])ref_policy
(Optional[Literal[‘KEEPREF’, ‘DELREF’, ‘ACKED’]])
Return type
Union[Awaitable[Any], Any]
2.2. CoreCommands.xlen()
(生产端)
https://redis.io/docs/latest/commands/xlen/
xlen(name)
Returns the number of elements in a given stream.
Parameters
name
(Union[bytes, str, memoryview])
Return type
- Union[Awaitable[Any], Any]
2.3. CoreCommands.xdel()
(生产端)
https://redis.io/docs/latest/commands/xdel/
xdel(name, *ids)
Deletes one or more messages from a stream.
Parameters
name
(Union[bytes, str, memoryview]) - name of the stream.*ids
(Union[int, bytes, str, memoryview]) - message ids to delete.
Return type
Union[Awaitable[Any], Any]
2.4. CoreCommands.xrange()
(生产端)
https://redis.io/docs/latest/commands/xrange/
xrange(name, min='-', max='+', count=None)
Read stream values within an interval.
name
: name of the stream.
start
: first stream ID. defaults to ‘-‘
, meaning the earliest available.
finish
: last stream ID. defaults to ‘+’
, meaning the latest available.
count
: if set, only return this many items, beginning with the
earliest available.
Parameters
name
(Union[bytes, str, memoryview])min
(Union[int, bytes, str, memoryview])max
(Union[int, bytes, str, memoryview])count
(Optional[int])
Return type
Union[Awaitable[Any], Any]
(base) yongqiang@yongqiang:~$ sudo service redis-server start
[sudo] password for yongqiang:
Starting redis-server: redis-server.
(base) yongqiang@yongqiang:~$
(base) yongqiang@yongqiang:~$ sudo service redis-server status
* redis-server is running
(base) yongqiang@yongqiang:~$
#!/usr/bin/env python
# coding=utf-8
import redis
# 连接到 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db=0)
# 定义流的名称
stream_key = "forever_stream"
# 使用 xadd 将事件添加到流
r.xadd(stream_key, {"event": "signup", "user_id": 123})
r.xadd(stream_key, {"event": "login", "user_id": 456})
r.xadd(stream_key, {"event": "logout", "user_id": 789})
# 打印当前流中的所有条目
entries = r.xrange(stream_key)
for entry in entries:
print(entry)
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py
(b'1756565556646-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756565556646-1', {b'event': b'login', b'user_id': b'456'})
(b'1756565556646-2', {b'event': b'logout', b'user_id': b'789'})
Process finished with exit code 0
2.5. RedisClusterCommands.delete()
delete(*keys)
Deletes the given keys in the cluster. The keys are first split up into slots and then an DEL
command is sent for every slot
Non-existent keys are ignored. Returns the number of keys that were deleted.
Parameters
keys
(Union[bytes, str, memoryview]) –
Return type
Union[Awaitable[Any], Any]
#!/usr/bin/env python
# coding=utf-8
import redis
# 连接到 Redis 服务器
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# 定义流的名称
stream_key = "forever_stream"
# 使用 xadd 将事件添加到流
redis_client.xadd(stream_key, {"event": "signup", "user_id": 123})
redis_client.xadd(stream_key, {"event": "login", "user_id": 456})
redis_client.xadd(stream_key, {"event": "logout", "user_id": 789})
# 打印当前流中的所有条目
entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
for entry in entries:
print(entry)
redis_client.delete(stream_key)
entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py
length: 3
(b'1756567113334-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756567113334-1', {b'event': b'login', b'user_id': b'456'})
(b'1756567113334-2', {b'event': b'logout', b'user_id': b'789'})
length: 0
Process finished with exit code 0
DEL
https://redis.io/docs/latest/commands/del/
Syntax
DEL key [key ...]
Removes the specified keys. A key is ignored if it does not exist.
删除整个 Redis Stream。
(base) yongqiang@yongqiang:~$ redis-cli
127.0.0.1:6379> del "forever_stream"
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> del "forever_stream"
(integer) 0
127.0.0.1:6379> exit
(base) yongqiang@yongqiang:~$
3. Redis Stream Examples
https://redis.readthedocs.io/en/stable/examples/redis-stream-example.html
https://redis-py.pythonlang.cn/en/stable/examples/redis-stream-example.html
References
[1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/
[2] What is Redis? https://www.ibm.com/think/topics/redis
[3] Redis 教程, https://redis.com.cn/