go 使用rabbitMQ

发布于:2025-08-30 ⋅ 阅读:(17) ⋅ 点赞:(0)

为了简单,我们使用docker 容器开启rabbitmq作为服务

1 安装centos docker

1. 卸载旧版本(如有)

sudo yum remove -y docker \
                  docker-client \
                  docker-client-latest \
                  docker-common \
                  docker-latest \
                  docker-latest-logrotate \
                  docker-logrotate \
                  docker-engine

2. 安装依赖包

sudo yum install -y yum-utils \
  device-mapper-persistent-data \
  lvm2

3. 添加 Docker 官方 YUM 仓库

sudo yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo

如果你访问国外镜像很慢,可以使用国内镜像,例如阿里

  1. 修改 Docker 仓库地址为腾讯镜像
sudo sed -i 's+https://download.docker.com+https://mirrors.tencent.com/docker-ce+' /etc/yum.repos.d/docker-ce.repo


  1. 验证是否替换成功
cat /etc/yum.repos.d/docker-ce.repo```
  1. 清理缓存并重新安装
# 清理旧缓存
sudo yum clean all

# 生成新的元数据缓存(缓存所有的yum元数据,时间很长,很慢)
sudo yum makecache

# 缓存指定的yum源,推荐
# --disablerepo=*:禁用所有仓库
# --enablerepo=docker-ce-stable:只启用 docker-ce-stable
# --skip-unavailable:跳过无法访问的仓库(避免报错)
sudo  yum makecache --enablerepo=docker-ce-stable --disablerepo=* --skip-unavailable

# 安装 Docker
sudo yum install -y docker-ce docker-ce-cli containerd.io

4. 安装 Docker Engine

sudo yum install -y docker-ce docker-ce-cli containerd.io

5. 启动并启用 Docker 服务

# 启动 Docker
sudo systemctl start docker

# 设置开机自启
sudo systemctl enable docker

6. 验证安装

sudo docker --version

运行docker rabbitmq

设置国内镜像

需要国内镜像
修改 Docker 配置文件(推荐)
很多镜像无法使用,的用心找下,不然也无法下载镜像

# 1. 创建或编辑配置文件
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
  "registry-mirrors": [
    "https://docker-0.unsee.tech"
  ]
}
EOF
# 重新加载配置
sudo systemctl daemon-reload

# 重启 Docker
sudo systemctl restart docker

# 查看是否生效
docker info | grep -A 2 "Registry Mirrors"

运行rabbitMQ

如果执行不成功,无法下载,则可能你没有公共的DNS,添加
两种方式
方式1,vi打开 /etc/resolv.conf

search lan
nameserver 192.168.111.1
nameserver 8.8.8.8 
nameserver 114.114.114.114

方式2

# 编辑 resolv.conf,添加公共 DNS
sudo tee /etc/resolv.conf <<-'EOF'
nameserver 8.8.8.8
nameserver 114.114.114.114
options timeout:1
EOF

docker run -d \
  --name rabbitmq \
  --hostname rabbitmq \
  -p 5672:5672 \
  -p 5671:5671 \
  -p 15672:15672 \
  -p 15671:15671 \
  -p 15692:15692 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=SecurePass123! \
  -e RABBITMQ_DEFAULT_VHOST=my_vhost \
  --mount source=rabbitmq_data,target=/var/lib/rabbitmq \
  --mount source=rabbitmq_log,target=/var/log/rabbitmq \
  --restart=unless-stopped \
  rabbitmq:3.13-management

启动成功查看
在这里插入图片描述
访问

http://192.168.126.3:15672/#/
admin
SecurePass123!

go代码实现

五种消息模型

简单队列:点对点

‌特点‌:一个生产者发送消息到队列,仅有一个消费者接收并处理消息。 ‌
‌适用场景‌:一对一的简单通信场景。

下面的代码只有一个生产者,一个消费者

package main

import (
	"log"
	"strconv"
	"sync"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	// 1. 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 2. 创建 channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 3. 声明队列(确保队列存在)
	queue, err := ch.QueueDeclare(
		"Q1",  // 队列名
		true,  // 持久化:重启后队列仍存在
		false, // 自动删除:当最后一个消费者断开时,不自动删除
		false, // 排他性:非排他,其他连接也可使用
		false, // 阻塞:不阻塞
		nil,   // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 4. 启动多个消费者(使用 WaitGroup 等待)
	var wg sync.WaitGroup
	wg.Add(1) // 启动 1 个消费者

	go func() {
		defer wg.Done()
		consume("consumer-1", conn) // 每个消费者使用自己的 channel
	}()


	// 5. 生产者:发送消息
	go func() {
		i := 0
		for {
			str := "Hello World" + strconv.Itoa(i)
			err := ch.Publish(
				"",         // 交换机:默认交换机
				queue.Name, // 路由键:队列名
				false,      // mandatory:如果没有匹配的队列,不返回消息
				false,      // immediate:不立即投递
				amqp.Publishing{
					ContentType: "text/plain",
					Body:        []byte(str),
				},
			)
			if err != nil {
				log.Printf("Failed to publish a message: %v", err)
				return
			}
			i++
			time.Sleep(200 * time.Millisecond)
		}
	}()

	// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)
	wg.Wait() // 这里不会退出,除非消费者退出
}

// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {
	// 每个消费者使用独立的 channel
	ch, err := conn.Channel()
	if err != nil {
		log.Printf("[%s] Failed to open channel: %v", consumerName, err)
		return
	}
	defer ch.Close()

	// 消费消息
	msgs, err := ch.Consume(
		"Q1",         // 队列名
		consumerName, // 消费者标签(唯一标识)
		true,         // autoAck:自动确认(消息处理完后自动从队列删除)
		false,        // exclusive:非排他
		false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)
		false,        // noWait:不等待服务器确认
		nil,          // args
	)
	if err != nil {
		log.Printf("[%s] Failed to consume: %v", consumerName, err)
		return
	}

	// 持续接收消息
	for msg := range msgs {
		// 使用锁或 log 打印,避免并发输出乱序
		log.Printf("[%s] Received: %s", consumerName, msg.Body)
		// 模拟处理时间
		time.Sleep(100 * time.Millisecond)
	}

	log.Printf("[%s] Consumer stopped.", consumerName)
}

工作队列模式

‌特点‌:多个消费者共享同一队列,消息按轮询(默认)或公平分配(能者多劳)机制处理。 ‌
‌适用场景‌:任务分发与负载均衡,如分布式任务处理

仅仅是在简单队列基础上,增加了一个消费队列,两个队列轮流消费

	go func() {
		defer wg.Done()
		consume("consumer-1", conn) // 每个消费者使用自己的 channel
	}()
	go func() {
		defer wg.Done()
		consume("consumer-2", conn)
	}()

完整版

package main

import (
	"log"
	"strconv"
	"sync"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	// 1. 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 2. 创建 channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 3. 声明队列(确保队列存在)
	queue, err := ch.QueueDeclare(
		"Q1",  // 队列名
		true,  // 持久化:重启后队列仍存在
		false, // 自动删除:当最后一个消费者断开时,不自动删除
		false, // 排他性:非排他,其他连接也可使用
		false, // 阻塞:不阻塞
		nil,   // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 4. 启动多个消费者(使用 WaitGroup 等待)
	var wg sync.WaitGroup
	wg.Add(2) // 启动 2 个消费者

	go func() {
		defer wg.Done()
		consume("consumer-1", conn) // 每个消费者使用自己的 channel
	}()
	go func() {
		defer wg.Done()
		consume("consumer-2", conn)
	}()

	// 5. 生产者:发送消息
	go func() {
		i := 0
		for {
			str := "Hello World" + strconv.Itoa(i)
			err := ch.Publish(
				"",         // 交换机:默认交换机
				queue.Name, // 路由键:队列名
				false,      // mandatory:如果没有匹配的队列,不返回消息
				false,      // immediate:不立即投递
				amqp.Publishing{
					ContentType: "text/plain",
					Body:        []byte(str),
				},
			)
			if err != nil {
				log.Printf("Failed to publish a message: %v", err)
				return
			}
			i++
			time.Sleep(200 * time.Millisecond)
		}
	}()

	// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)
	wg.Wait() // 这里不会退出,除非消费者退出
}

// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {
	// 每个消费者使用独立的 channel
	ch, err := conn.Channel()
	if err != nil {
		log.Printf("[%s] Failed to open channel: %v", consumerName, err)
		return
	}
	defer ch.Close()

	// 消费消息
	msgs, err := ch.Consume(
		"Q1",         // 队列名
		consumerName, // 消费者标签(唯一标识)
		true,         // autoAck:自动确认(消息处理完后自动从队列删除)
		false,        // exclusive:非排他
		false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)
		false,        // noWait:不等待服务器确认
		nil,          // args
	)
	if err != nil {
		log.Printf("[%s] Failed to consume: %v", consumerName, err)
		return
	}

	// 持续接收消息
	for msg := range msgs {
		// 使用锁或 log 打印,避免并发输出乱序
		log.Printf("[%s] Received: %s", consumerName, msg.Body)
		// 模拟处理时间
		time.Sleep(100 * time.Millisecond)
	}

	log.Printf("[%s] Consumer stopped.", consumerName)
}

发布/订阅模式

‌特点‌:生产者通过交换机将消息广播至所有绑定该交换机的队列,每个队列对应一个消费者。 ‌
‌适用场景‌:消息多副本分发,如日志同步。

fanout 模式

package main

import (
	"log"
	"strconv"
	"sync"
	"time"

	"github.com/streadway/amqp"
)

const exchangeName = "go-exchange"

func main() {
	// 1. 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 2. 创建 channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	err = ch.ExchangeDeclare(
		exchangeName, // 交换机名:默认交换机
		"fanout",     // 类型:直连 direct,  topic , headers, fanout
		true,         // 持久化:重启后交换机仍存在
		false,        // 自动删除:当最后一个绑定断开时,不自动删除
		false,        // 内部:不用于客户端应用
		false,        // 无等待:不等待服务器确认
		nil,          // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare an exchange: %v", err)
	}

	// 4.
	var wg sync.WaitGroup
	wg.Add(2) // 启动 2 个消费者

	go func() {
		defer wg.Done()
		subscribe(conn, exchangeName) // 每个消费者使用自己的 channel
	}()
	go func() {
		defer wg.Done()
		subscribe(conn, exchangeName) // 每个消费者使用自己的 channel
	}()
	// 5. 生产者:发送消息
	go func() {
		i := 0
		for {
			str := "Hello World" + strconv.Itoa(i)
			err := ch.Publish(
				exchangeName, // 交换机:默认交换机
				"",           //  routing key:路由键,fanout 类型不需要
				false,        // mandatory:如果没有匹配的队列,不返回消息
				false,        // immediate:不立即投递
				amqp.Publishing{
					ContentType: "text/plain",
					Body:        []byte(str),
				},
			)
			if err != nil {
				log.Printf("Failed to publish a message: %v", err)
				return
			}
			i++
			time.Sleep(200 * time.Millisecond)
		}
	}()

	// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)
	wg.Wait() // 这里不会退出,除非消费者退出
}

func subscribe(conn *amqp.Connection, exchangeName string) {
	// 每个消费者使用独立的 channel
	ch, err := conn.Channel()
	if err != nil {
		log.Printf("Failed to open channel: %v", err)
		return
	}
	defer ch.Close()
	// 声明队列(确保队列存在)
	queue, err := ch.QueueDeclare(
		"",    //  队列名
		false, // 持久化:重启后队列仍存在
		true,  // 自动删除:当最后一个消费者断开时,不自动删除
		true,  // 排他性:非排他,其他连接也可使用
		false, // 阻塞:不阻塞
		nil,   // 额外参数

	)
	if err != nil {
		log.Printf("Failed to declare a queue: %v", err)
		return
	}
	// 绑定队列到交换机
	err = ch.QueueBind(
		queue.Name,   // 队列名
		"",           // 路由键:fanout 类型不需要
		exchangeName, // 交换机名
		false,        // 不等待服务器确认
		nil,          // 额外参数
	)
	defer ch.QueueDelete(queue.Name, false, false, false)
	//调用消费者
	consume("consumer-haha", conn, queue.Name)

}

// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection, queueName string) {
	// 每个消费者使用独立的 channel
	ch, err := conn.Channel()
	if err != nil {
		log.Printf("[%s] Failed to open channel: %v", consumerName, err)
		return
	}
	defer ch.Close()

	// 消费消息
	msgs, err := ch.Consume(
		queueName,    // 队列名
		consumerName, // 消费者标签(唯一标识)
		true,         // autoAck:自动确认(消息处理完后自动从队列删除)
		false,        // exclusive:非排他
		false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)
		false,        // noWait:不等待服务器确认
		nil,          // args
	)
	if err != nil {
		log.Printf("[%s] Failed to consume: %v", consumerName, err)
		return
	}

	// 持续接收消息
	for msg := range msgs {
		// 使用锁或 log 打印,避免并发输出乱序
		log.Printf("[%s] Received: %s", consumerName, msg.Body)
		// 模拟处理时间
		time.Sleep(100 * time.Millisecond)
	}

	log.Printf("[%s] Consumer stopped.", consumerName)
}

每次获取两条消息,这是因为有两个subscribe()携程在收消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述