镜像
docker pull rabbitmq:management
docker pull rabbitmq:4.0.7-management
docker-compose.yml
services:
rabbitmq:
image: rabbitmq:3.9.5-management
container_name: rabbitmq
restart: always
network_mode: "host"
volumes:
- /etc/localtime:/etc/localtime
- ./rabbitmq/data:/var/lib/rabbitmq
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: Test123
RABBITMQ_DEFAULT_VHOST: /test
docker-compose up -d rabbitmq
访问控制台
Spring Boot
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.0.140
port: 5672
username: admin
password: Test123
virtual-host: /test
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true
max-attempts: 5
initial-interval: 5000
批量声明队列
public class MQConstants {
public static final String TEST_QUEUE_1 = "test-queue-1";
public static final String TEST_QUEUE_2 = "test-queue-2";
}
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public List<Queue> createQueues(RabbitAdmin rabbitAdmin) {
List<Queue> queueList = new ArrayList<>();
List<String> queueNameList = getConstantsValueList(MQConstants.class);
for (String queueName : queueNameList) {
queueList.add(new Queue(queueName, true));
}
queueList.forEach(rabbitAdmin::declareQueue);
return queueList;
}
private List<String> getConstantsValueList(Class<?> clazz) {
Field[] fields = clazz.getDeclaredFields();
List<String> valueList = new ArrayList<>(fields.length);
for (Field field : fields) {
int modifiers = field.getModifiers();
if (Modifier.isStatic(modifiers) && Modifier.isFinal(modifiers)) {
try {
valueList.add(field.get(null).toString());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return valueList;
}
}
public Test {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
rabbitTemplate.convertAndSend(MQConstants.TEST_QUEUE_1, "test");
}
}
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class TestConsumer implements ChannelAwareMessageListener {
@RabbitListener(queues = MQConstants.TEST_QUEUE_1, concurrency = "10")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String body = new String(message.getBody());
System.out.println(" [x] Received: " + body);
try {
if ("error".equals(body)) {
throw new Exception("处理失败");
}
System.out.println(" [✔] Done");
} catch (Exception e) {
System.out.println(" [✖] Error: " + e.getMessage());
throw e;
}
}
}