Python 操作 Redis 的客户端 - Redis Stream

发布于:2025-09-01 ⋅ 阅读:(12) ⋅ 点赞:(0)

redis-py - Python Redis 客户端
https://redis.io/docs/latest/develop/clients/redis-py/
https://redis-py.pythonlang.cn/en/stable/index.html

redis-py (Redis Python client)
https://github.com/redis/redis-py

The Python interface to the Redis key-value store.

redis-py is the Python client for Redis. redis-py requires a running Redis server.

The sections below explain how to install redis-py and connect your Python application to a Redis database.

Redis Commands
https://redis.readthedocs.io/en/stable/commands.html

Redis 是一个开源的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。

1. Redis Stream

https://redis.com.cn/redis-stream.html

Redis Stream 主要用于消息队列 (Message Queue, MQ),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 是一种数据结构,用于处理大规模的时间序列数据。Redis Stream 的每个条目都有一个唯一的 id 和关联的数据字段,适合用于日志、事件收集或实时数据流等场景。

2. Redis Commands

2.1. CoreCommands.xadd() (生产端)

https://redis.readthedocs.io/en/stable/commands.html
https://redis.io/docs/latest/commands/xadd/

xadd(name, fields, id='*', maxlen=None, approximate=True, nomkstream=False, minid=None, limit=None, ref_policy=None)

Add to a stream.

name: name of the stream (name 是队列名称,如果队列 name 不存在就创建。)
fields: dict of field/value pairs to insert into the stream (添加到流的字段和值。)
id: location to insert this record. By default it is appended. (消息 id,使用 * 表示由 Redis 自动生成。可以自定义,但是要自己保证递增性。)
maxlen: truncate old stream members beyond this size. Can’t be specified with minid. (maxlen 是可选参数,设置流最大长度。)
approximate: actual stream length may be slightly more than maxlen
nomkstream: When set to true, do not make a stream
minid: the minimum id in the stream to query. Can’t be specified with maxlen. (可选参数,只有 id 大于给定 minid 的条目才会被加入流。)
limit: specifies the maximum number of entries to retrieve
ref_policy: optional reference policy for consumer groups when trimming:

  • KEEPREF (default): When trimming, preserves references in consumer groups’ PEL
  • DELREF: When trimming, removes all references from consumer groups’ PEL
  • ACKED: When trimming, only removes entries acknowledged by all consumer groups

Parameters

  • name (Union[bytes, str, memoryview])
  • fields (Dict[Union[bytes, bytearray, memoryview, str, int, float], Union[bytes, bytearray, memoryview, str, int, float]])
  • id (Union[int, bytes, str, memoryview])
  • maxlen (Optional[int])
  • approximate (bool)
  • nomkstream (bool)
  • minid (Optional[Union[int, bytes, str, memoryview]])
  • limit (Optional[int])
  • ref_policy (Optional[Literal[‘KEEPREF’, ‘DELREF’, ‘ACKED’]])

Return type

  • Union[Awaitable[Any], Any]

2.2. CoreCommands.xlen() (生产端)

https://redis.io/docs/latest/commands/xlen/

xlen(name)

Returns the number of elements in a given stream.

Parameters

  • name (Union[bytes, str, memoryview])

Return type

  • Union[Awaitable[Any], Any]

2.3. CoreCommands.xdel() (生产端)

https://redis.io/docs/latest/commands/xdel/

xdel(name, *ids)

Deletes one or more messages from a stream.

Parameters

  • name (Union[bytes, str, memoryview]) - name of the stream.
  • *ids (Union[int, bytes, str, memoryview]) - message ids to delete.

Return type
Union[Awaitable[Any], Any]

2.4. CoreCommands.xrange() (生产端)

https://redis.io/docs/latest/commands/xrange/

xrange(name, min='-', max='+', count=None)

Read stream values within an interval.

name: name of the stream.
start: first stream ID. defaults to ‘-‘, meaning the earliest available.
finish: last stream ID. defaults to ‘+’, meaning the latest available.
count: if set, only return this many items, beginning with the
earliest available.

Parameters

  • name (Union[bytes, str, memoryview])
  • min (Union[int, bytes, str, memoryview])
  • max (Union[int, bytes, str, memoryview])
  • count (Optional[int])

Return type
Union[Awaitable[Any], Any]

(base) yongqiang@yongqiang:~$ sudo service redis-server start
[sudo] password for yongqiang:
Starting redis-server: redis-server.
(base) yongqiang@yongqiang:~$
(base) yongqiang@yongqiang:~$ sudo service redis-server status
 * redis-server is running
(base) yongqiang@yongqiang:~$
#!/usr/bin/env python
# coding=utf-8

import redis

# 连接到 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db=0)

# 定义流的名称
stream_key = "forever_stream"

# 使用 xadd 将事件添加到流
r.xadd(stream_key, {"event": "signup", "user_id": 123})
r.xadd(stream_key, {"event": "login", "user_id": 456})
r.xadd(stream_key, {"event": "logout", "user_id": 789})

# 打印当前流中的所有条目
entries = r.xrange(stream_key)
for entry in entries:
    print(entry)

/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py 
(b'1756565556646-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756565556646-1', {b'event': b'login', b'user_id': b'456'})
(b'1756565556646-2', {b'event': b'logout', b'user_id': b'789'})

Process finished with exit code 0

2.5. RedisClusterCommands.delete()

  • delete(*keys)

Deletes the given keys in the cluster. The keys are first split up into slots and then an DEL command is sent for every slot

Non-existent keys are ignored. Returns the number of keys that were deleted.

Parameters
keys (Union[bytes, str, memoryview]) –

Return type
Union[Awaitable[Any], Any]

#!/usr/bin/env python
# coding=utf-8

import redis

# 连接到 Redis 服务器
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 定义流的名称
stream_key = "forever_stream"

# 使用 xadd 将事件添加到流
redis_client.xadd(stream_key, {"event": "signup", "user_id": 123})
redis_client.xadd(stream_key, {"event": "login", "user_id": 456})
redis_client.xadd(stream_key, {"event": "logout", "user_id": 789})

# 打印当前流中的所有条目
entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
for entry in entries:
    print(entry)

redis_client.delete(stream_key)

entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")

/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py 
length: 3
(b'1756567113334-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756567113334-1', {b'event': b'login', b'user_id': b'456'})
(b'1756567113334-2', {b'event': b'logout', b'user_id': b'789'})
length: 0

Process finished with exit code 0
  • DEL

https://redis.io/docs/latest/commands/del/

Syntax

DEL key [key ...]

Removes the specified keys. A key is ignored if it does not exist.
删除整个 Redis Stream。

(base) yongqiang@yongqiang:~$ redis-cli
127.0.0.1:6379> del "forever_stream"
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> del "forever_stream"
(integer) 0
127.0.0.1:6379> exit
(base) yongqiang@yongqiang:~$

3. Redis Stream Examples

https://redis.readthedocs.io/en/stable/examples/redis-stream-example.html
https://redis-py.pythonlang.cn/en/stable/examples/redis-stream-example.html

References

[1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/
[2] What is Redis? https://www.ibm.com/think/topics/redis
[3] Redis 教程, https://redis.com.cn/


网站公告

今日签到

点亮在社区的每一天
去签到