为了简单,我们使用docker 容器开启rabbitmq作为服务
1 安装centos docker
1. 卸载旧版本(如有)
sudo yum remove -y docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-engine
2. 安装依赖包
sudo yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
3. 添加 Docker 官方 YUM 仓库
sudo yum-config-manager \
--add-repo \
https://download.docker.com/linux/centos/docker-ce.repo
如果你访问国外镜像很慢,可以使用国内镜像,例如阿里
- 修改 Docker 仓库地址为腾讯镜像
sudo sed -i 's+https://download.docker.com+https://mirrors.tencent.com/docker-ce+' /etc/yum.repos.d/docker-ce.repo
- 验证是否替换成功
cat /etc/yum.repos.d/docker-ce.repo```
- 清理缓存并重新安装
# 清理旧缓存
sudo yum clean all
# 生成新的元数据缓存(缓存所有的yum元数据,时间很长,很慢)
sudo yum makecache
# 缓存指定的yum源,推荐
# --disablerepo=*:禁用所有仓库
# --enablerepo=docker-ce-stable:只启用 docker-ce-stable
# --skip-unavailable:跳过无法访问的仓库(避免报错)
sudo yum makecache --enablerepo=docker-ce-stable --disablerepo=* --skip-unavailable
# 安装 Docker
sudo yum install -y docker-ce docker-ce-cli containerd.io
4. 安装 Docker Engine
sudo yum install -y docker-ce docker-ce-cli containerd.io
5. 启动并启用 Docker 服务
# 启动 Docker
sudo systemctl start docker
# 设置开机自启
sudo systemctl enable docker
6. 验证安装
sudo docker --version
运行docker rabbitmq
设置国内镜像
需要国内镜像
修改 Docker 配置文件(推荐)
很多镜像无法使用,的用心找下,不然也无法下载镜像
# 1. 创建或编辑配置文件
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": [
"https://docker-0.unsee.tech"
]
}
EOF
# 重新加载配置
sudo systemctl daemon-reload
# 重启 Docker
sudo systemctl restart docker
# 查看是否生效
docker info | grep -A 2 "Registry Mirrors"
运行rabbitMQ
如果执行不成功,无法下载,则可能你没有公共的DNS,添加
两种方式
方式1,vi打开 /etc/resolv.conf
search lan
nameserver 192.168.111.1
nameserver 8.8.8.8
nameserver 114.114.114.114
方式2
# 编辑 resolv.conf,添加公共 DNS
sudo tee /etc/resolv.conf <<-'EOF'
nameserver 8.8.8.8
nameserver 114.114.114.114
options timeout:1
EOF
docker run -d \
--name rabbitmq \
--hostname rabbitmq \
-p 5672:5672 \
-p 5671:5671 \
-p 15672:15672 \
-p 15671:15671 \
-p 15692:15692 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=SecurePass123! \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
--mount source=rabbitmq_data,target=/var/lib/rabbitmq \
--mount source=rabbitmq_log,target=/var/log/rabbitmq \
--restart=unless-stopped \
rabbitmq:3.13-management
启动成功查看
访问
http://192.168.126.3:15672/#/
admin
SecurePass123!
go代码实现
五种消息模型
简单队列:点对点
特点:一个生产者发送消息到队列,仅有一个消费者接收并处理消息。
适用场景:一对一的简单通信场景。
下面的代码只有一个生产者,一个消费者
package main
import (
"log"
"strconv"
"sync"
"time"
"github.com/streadway/amqp"
)
func main() {
// 1. 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 2. 创建 channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 3. 声明队列(确保队列存在)
queue, err := ch.QueueDeclare(
"Q1", // 队列名
true, // 持久化:重启后队列仍存在
false, // 自动删除:当最后一个消费者断开时,不自动删除
false, // 排他性:非排他,其他连接也可使用
false, // 阻塞:不阻塞
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 4. 启动多个消费者(使用 WaitGroup 等待)
var wg sync.WaitGroup
wg.Add(1) // 启动 1 个消费者
go func() {
defer wg.Done()
consume("consumer-1", conn) // 每个消费者使用自己的 channel
}()
// 5. 生产者:发送消息
go func() {
i := 0
for {
str := "Hello World" + strconv.Itoa(i)
err := ch.Publish(
"", // 交换机:默认交换机
queue.Name, // 路由键:队列名
false, // mandatory:如果没有匹配的队列,不返回消息
false, // immediate:不立即投递
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(str),
},
)
if err != nil {
log.Printf("Failed to publish a message: %v", err)
return
}
i++
time.Sleep(200 * time.Millisecond)
}
}()
// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)
wg.Wait() // 这里不会退出,除非消费者退出
}
// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {
// 每个消费者使用独立的 channel
ch, err := conn.Channel()
if err != nil {
log.Printf("[%s] Failed to open channel: %v", consumerName, err)
return
}
defer ch.Close()
// 消费消息
msgs, err := ch.Consume(
"Q1", // 队列名
consumerName, // 消费者标签(唯一标识)
true, // autoAck:自动确认(消息处理完后自动从队列删除)
false, // exclusive:非排他
false, // noLocal:不接收本连接发送的消息(AMQP 未强制支持)
false, // noWait:不等待服务器确认
nil, // args
)
if err != nil {
log.Printf("[%s] Failed to consume: %v", consumerName, err)
return
}
// 持续接收消息
for msg := range msgs {
// 使用锁或 log 打印,避免并发输出乱序
log.Printf("[%s] Received: %s", consumerName, msg.Body)
// 模拟处理时间
time.Sleep(100 * time.Millisecond)
}
log.Printf("[%s] Consumer stopped.", consumerName)
}
工作队列模式
特点:多个消费者共享同一队列,消息按轮询(默认)或公平分配(能者多劳)机制处理。
适用场景:任务分发与负载均衡,如分布式任务处理
仅仅是在简单队列基础上,增加了一个消费队列,两个队列轮流消费
go func() {
defer wg.Done()
consume("consumer-1", conn) // 每个消费者使用自己的 channel
}()
go func() {
defer wg.Done()
consume("consumer-2", conn)
}()
完整版
package main
import (
"log"
"strconv"
"sync"
"time"
"github.com/streadway/amqp"
)
func main() {
// 1. 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 2. 创建 channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 3. 声明队列(确保队列存在)
queue, err := ch.QueueDeclare(
"Q1", // 队列名
true, // 持久化:重启后队列仍存在
false, // 自动删除:当最后一个消费者断开时,不自动删除
false, // 排他性:非排他,其他连接也可使用
false, // 阻塞:不阻塞
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 4. 启动多个消费者(使用 WaitGroup 等待)
var wg sync.WaitGroup
wg.Add(2) // 启动 2 个消费者
go func() {
defer wg.Done()
consume("consumer-1", conn) // 每个消费者使用自己的 channel
}()
go func() {
defer wg.Done()
consume("consumer-2", conn)
}()
// 5. 生产者:发送消息
go func() {
i := 0
for {
str := "Hello World" + strconv.Itoa(i)
err := ch.Publish(
"", // 交换机:默认交换机
queue.Name, // 路由键:队列名
false, // mandatory:如果没有匹配的队列,不返回消息
false, // immediate:不立即投递
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(str),
},
)
if err != nil {
log.Printf("Failed to publish a message: %v", err)
return
}
i++
time.Sleep(200 * time.Millisecond)
}
}()
// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)
wg.Wait() // 这里不会退出,除非消费者退出
}
// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {
// 每个消费者使用独立的 channel
ch, err := conn.Channel()
if err != nil {
log.Printf("[%s] Failed to open channel: %v", consumerName, err)
return
}
defer ch.Close()
// 消费消息
msgs, err := ch.Consume(
"Q1", // 队列名
consumerName, // 消费者标签(唯一标识)
true, // autoAck:自动确认(消息处理完后自动从队列删除)
false, // exclusive:非排他
false, // noLocal:不接收本连接发送的消息(AMQP 未强制支持)
false, // noWait:不等待服务器确认
nil, // args
)
if err != nil {
log.Printf("[%s] Failed to consume: %v", consumerName, err)
return
}
// 持续接收消息
for msg := range msgs {
// 使用锁或 log 打印,避免并发输出乱序
log.Printf("[%s] Received: %s", consumerName, msg.Body)
// 模拟处理时间
time.Sleep(100 * time.Millisecond)
}
log.Printf("[%s] Consumer stopped.", consumerName)
}
发布/订阅模式
特点:生产者通过交换机将消息广播至所有绑定该交换机的队列,每个队列对应一个消费者。
适用场景:消息多副本分发,如日志同步。
fanout 模式
package main
import (
"log"
"strconv"
"sync"
"time"
"github.com/streadway/amqp"
)
const exchangeName = "go-exchange"
func main() {
// 1. 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 2. 创建 channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
exchangeName, // 交换机名:默认交换机
"fanout", // 类型:直连 direct, topic , headers, fanout
true, // 持久化:重启后交换机仍存在
false, // 自动删除:当最后一个绑定断开时,不自动删除
false, // 内部:不用于客户端应用
false, // 无等待:不等待服务器确认
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %v", err)
}
// 4.
var wg sync.WaitGroup
wg.Add(2) // 启动 2 个消费者
go func() {
defer wg.Done()
subscribe(conn, exchangeName) // 每个消费者使用自己的 channel
}()
go func() {
defer wg.Done()
subscribe(conn, exchangeName) // 每个消费者使用自己的 channel
}()
// 5. 生产者:发送消息
go func() {
i := 0
for {
str := "Hello World" + strconv.Itoa(i)
err := ch.Publish(
exchangeName, // 交换机:默认交换机
"", // routing key:路由键,fanout 类型不需要
false, // mandatory:如果没有匹配的队列,不返回消息
false, // immediate:不立即投递
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(str),
},
)
if err != nil {
log.Printf("Failed to publish a message: %v", err)
return
}
i++
time.Sleep(200 * time.Millisecond)
}
}()
// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)
wg.Wait() // 这里不会退出,除非消费者退出
}
func subscribe(conn *amqp.Connection, exchangeName string) {
// 每个消费者使用独立的 channel
ch, err := conn.Channel()
if err != nil {
log.Printf("Failed to open channel: %v", err)
return
}
defer ch.Close()
// 声明队列(确保队列存在)
queue, err := ch.QueueDeclare(
"", // 队列名
false, // 持久化:重启后队列仍存在
true, // 自动删除:当最后一个消费者断开时,不自动删除
true, // 排他性:非排他,其他连接也可使用
false, // 阻塞:不阻塞
nil, // 额外参数
)
if err != nil {
log.Printf("Failed to declare a queue: %v", err)
return
}
// 绑定队列到交换机
err = ch.QueueBind(
queue.Name, // 队列名
"", // 路由键:fanout 类型不需要
exchangeName, // 交换机名
false, // 不等待服务器确认
nil, // 额外参数
)
defer ch.QueueDelete(queue.Name, false, false, false)
//调用消费者
consume("consumer-haha", conn, queue.Name)
}
// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection, queueName string) {
// 每个消费者使用独立的 channel
ch, err := conn.Channel()
if err != nil {
log.Printf("[%s] Failed to open channel: %v", consumerName, err)
return
}
defer ch.Close()
// 消费消息
msgs, err := ch.Consume(
queueName, // 队列名
consumerName, // 消费者标签(唯一标识)
true, // autoAck:自动确认(消息处理完后自动从队列删除)
false, // exclusive:非排他
false, // noLocal:不接收本连接发送的消息(AMQP 未强制支持)
false, // noWait:不等待服务器确认
nil, // args
)
if err != nil {
log.Printf("[%s] Failed to consume: %v", consumerName, err)
return
}
// 持续接收消息
for msg := range msgs {
// 使用锁或 log 打印,避免并发输出乱序
log.Printf("[%s] Received: %s", consumerName, msg.Body)
// 模拟处理时间
time.Sleep(100 * time.Millisecond)
}
log.Printf("[%s] Consumer stopped.", consumerName)
}
每次获取两条消息,这是因为有两个subscribe()携程在收消息