Kafka | RabbitMQ | RocketMQ | ActiveMQ 的区别和入门案例

发布于:2024-12-18 ⋅ 阅读:(75) ⋅ 点赞:(0)

springboot,vue,springcloudalibaba课程视频,有需要可以看看

<!-- springboot,springboot整合redis,整合rocketmq视频: -->
https://www.bilibili.com/video/BV1nkmRYSErk/?vd_source=14d27ec13a4737c281b7c79463687112

<!-- springcloudalibaba,openfeign,nacos,gateway,sso视频:-->
https://www.bilibili.com/video/BV1cFDEYWEkY/?vd_source=14d27ec13a4737c281b7c79463687112

<!-- vue+springboot前后端分离视频:-->
https://www.bilibili.com/video/BV1JLSEYJETc/?vd_source=14d27ec13a4737c281b7c79463687112

<!-- shiro视频:-->
https://www.bilibili.com/video/BV1YVUmYJEPi/?vd_source=14d27ec13a4737c281b7c79463687112



以下是常用MQ消息中间件的区别,以表格形式展示:

特性/MQ Kafka RabbitMQ RocketMQ ActiveMQ
开发语言 Scala Erlang Java Java
支持的协议 自定义(基于TCP) AMQP 自定义 OpenWire、STOMP、REST、XMPP、AMQP
消息存储 磁盘;支持大量堆积 内存、磁盘;支持少量堆积 磁盘;支持大量堆积 内存、磁盘、数据库;支持少量堆积
消息事务 支持 支持 不支持 支持
负载均衡 支持 支持的不好 支持 可基于zookeeper实现负载均衡
集群方式 天然的‘Leader-Slave’无状态集群 支持简单集群,对高级集群模式支持不好 常用多对’Master-Slave’模式 支持简单集群模式,对高级集群模式支持不好
管理界面 一般 有管理后台 一般
可用性 非常高(分布式) 高(主从) 非常高(分布式) 高(主从)
消息重复 支持at least once、at most once 支持at least once、at most once 支持at least once 支持at least once
吞吐量TPS 极大 比较大 比较大
订阅形式和消息分发 发布订阅模式 direct、topic、Headers和fanout 发布订阅模式 点对点(p2p)、广播(发布-订阅)

接下来是每个MQ的入门案例代码:

Kafka入门案例

生产者代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "Hello Kafka " + i));
            }
        }
    }
}

消费者代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("my-topic"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }
}

RabbitMQ入门案例

生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQProducer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
           String message = "Hello World!";
           channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
           System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者代码

import com.rabbitmq.client.*;

public class RabbitMQConsumer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}

RocketMQ入门案例

生产者代码

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class RocketMQProducerExample {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello RocketMQ".getBytes());
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }
}

消费者代码

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMQConsumerExample {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

ActiveMQ入门案例

当然可以,以下是ActiveMQ的生产者和消费者的基本示例代码。

ActiveMQ 生产者代码示例

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProducer {
    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 建立连接
        try (Connection connection = connectionFactory.createConnection()) {
            connection.start();
            // 创建会话,使用非事务性会话和自动确认消息
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Destination destination = session.createQueue("TestQueue");
            // 创建消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 创建文本消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            // 发送消息
            producer.send(message);
            System.out.println("Sent: " + message.getText());
            // 清理资源
            producer.close();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

ActiveMQ 消费者代码示例

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumer {
    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 建立连接
        try (Connection connection = connectionFactory.createConnection()) {
            connection.start();
            // 创建会话,使用非事务性会话和自动确认消息
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Destination destination = session.createQueue("TestQueue");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received: " + textMessage.getText());
            }
            // 清理资源
            consumer.close();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

在这两个示例中,生产者连接到ActiveMQ服务器,创建一个名为"TestQueue"的队列,并向其中发送一条消息。消费者也连接到同一个队列,并等待接收消息。当生产者发送消息后,消费者会接收并打印这条消息。

确保在运行这些代码之前,你的ActiveMQ服务器已经启动,并且监听在默认的61616端口上。如果ActiveMQ服务器配置有所不同,请相应地修改连接字符串。

完结!


网站公告

今日签到

点亮在社区的每一天
去签到