Redis的发布订阅(Pub/Sub)是一种基于消息多播的通信机制,它允许消息的**发布者(Publisher)向特定频道发送消息,而订阅者(Subscriber)**通过订阅频道或模式来接收消息。
其核心特点如下:
轻量级:无需额外组件,直接通过Redis服务实现
实时性:消息即时推送,无轮询延迟
广播模式:一个消息可被多个订阅者同时接收
无状态性:不存储历史消息,订阅者只能接收订阅后的消息
发布订阅命令的使用
有关发布订阅的命令可以通过help @pubsub
命令来查看。有关命令的使用可以通过help 命令
来查看,例如help publish
。
基础命令速查表
命令 | 作用 | 示例 |
---|---|---|
SUBSCRIBE |
订阅一个或多个频道 | SUBSCRIBE news sports |
PSUBSCRIBE |
使用模式匹配订阅频道 | PSUBSCRIBE sensor.* |
PUBLISH |
向指定频道发送消息 | PUBLISH news "Hello" |
UNSUBSCRIBE |
退订指定频道 | UNSUBSCRIBE news |
PUNSUBSCRIBE |
退订模式订阅 | PUNSUBSCRIBE sensor.* |
PUBSUB CHANNELS |
查看活跃频道列表 | PUBSUB CHANNELS "sensor.*" |
操作示例
# 订阅者A(终端1)
127.0.0.1:6379> subscribe notifications
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notifications"
3) (integer) 1
# 订阅者B(终端2)
127.0.0.1:6379> psubscribe system.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "system.*"
3) (integer) 1
# 发布消息(终端3)
127.0.0.1:6379> publish notifications "Service will be upgraded soon"
(integer) 1
127.0.0.1:6379> publish system.alert "CPU usage exceeds 90%"
(integer) 1
# 订阅者A收到:
1) "message"
2) "notifications"
3) "Service will be upgraded soon"
# 订阅者B收到:
1) "pmessage"
2) "system.*"
3) "system.alert"
4) "CPU usage exceeds 90%"
发布订阅的使用场景与优缺点
适用场景
实时通知系统:用户在线状态更新,即时聊天消息推送
事件驱动架构:缓存失效广播,分布式配置更新
轻量级监控:服务器状态报警,业务指标异常通知
优点
极低延迟(平均<1ms)
支持百万级TPS消息吞吐
模式匹配订阅实现灵活路由
零外部依赖(仅需Redis服务)
缺点
消息不可靠性:不保证送达,离线订阅者会丢失消息
无持久化机制:重启后所有订阅关系丢失
客户端阻塞:订阅操作会占用连接线程(需异步处理)
替代方案建议:需要可靠消息时,使用Redis Streams(支持消息持久化、消费者组)或RabbitMQ/Kafka
在Java中使用RedisTemplate实现
配置RedisTemplate
package com.morris.redis.demo.pubsub;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* 对redis的键值进行序列化
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(
RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// 使用 String 序列化 key
template.setKeySerializer(new StringRedisSerializer());
// 使用 JSON 序列化 value(需要额外依赖 jackson)
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
// 对于 Hash 结构同理
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
实现消息发布者
package com.morris.redis.demo.pubsub;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 消息发布者
*/
@Service
public class MessagePublisher {
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void sendNotification(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
}
实现消息订阅者
package com.morris.redis.demo.pubsub;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 消息订阅者
*/
@Component
public class MessageSubscriber implements MessageListener {
@Resource
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
String body = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
System.out.printf("收到频道[%s]的消息: %s\n", channel, body);
}
}
配置订阅监听
package com.morris.redis.demo.pubsub;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* 配置redis消息订阅监听器
*/
@Configuration
@Slf4j
public class RedisPubSubConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, MessageSubscriber messageSubscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// 订阅具体频道
container.addMessageListener(messageSubscriber, new ChannelTopic("notifications"));
// 订阅模式匹配
container.addMessageListener(messageSubscriber, new PatternTopic("system.*"));
// 异常处理
container.setErrorHandler((e) -> {
log.error("[listen message] error ", e);
});
return container;
}
}
使用示例
package com.morris.redis.demo.pubsub;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 使用接口发布消息
*/
@RestController
@RequestMapping("/pubsub")
public class PubSubDemoController {
@Resource
private MessagePublisher publisher;
// 发布告警
@GetMapping("/alert")
public String sendAlert(@RequestParam String message) {
publisher.sendNotification("system.alert", message);
return "警报已发送";
}
// 发布通知
@GetMapping("/notify")
public String sendNotify(@RequestParam String message) {
publisher.sendNotification("notifications", message);
return "通知已发送";
}
}