阿里云和腾讯云RocketMQ 发消息和消费消息客户端JAVA接口

发布于:2025-07-10 ⋅ 阅读:(17) ⋅ 点赞:(0)

一、RocketMQ 概述

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点,广泛应用于订单交易、消息推送、流计算、日志收集等场景。

核心特点

  1. 分布式架构:支持集群部署,可水平扩展

  2. 高吞吐量:单机可支持10万级TPS

  3. 低延迟:毫秒级消息投递

  4. 高可用性:支持主从复制,自动故障转移

  5. 消息可靠性:支持消息持久化,确保不丢失

  6. 丰富的消息模式:支持普通消息、顺序消息、事务消息、定时消息等

二、核心概念

1. 基本组件

组件 说明
NameServer 轻量级注册中心,负责Broker的注册与发现
Broker 消息存储与转发服务器,负责消息存储、投递和查询
Producer 消息生产者,负责发送消息
Consumer 消息消费者,负责消费消息
Topic 消息主题,用于消息分类
Message Queue 消息队列,Topic的分区单位
Tag 消息标签,用于消息二级分类
Group 生产者组/消费者组,用于集群管理

一、阿里云rocketMQ

使用阿里云 ONS SDK
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>2.0.5.Final</version> <!-- 推荐最新版本 -->
</dependency>

获取阿里云 RocketMQ 配置

  • Endpointhttp://{YourInstanceId}.mq-internet.aliyuncs.com:80

  • AccessKey:阿里云账号的 AccessKey ID 和 AccessKey Secret

  • Topic:消息主题(需在阿里云控制台创建)

  • Group ID:消费者组(需在控制台创建)

1、发消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class AliyunMQProducer {
    public static void main(String[] args) {
        // 1. 配置 Producer
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");
        properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");
        properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");
        properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Producer Group ID

        // 2. 创建 Producer
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        // 3. 创建消息
        Message msg = new Message(
            "YourTopic",  // Topic
            "YourTag",    // Tag
            "Hello Aliyun RocketMQ!".getBytes()  // Body
        );

        // 4. 发送消息
        producer.send(msg);
        System.out.println("消息发送成功!");

        // 5. 关闭 Producer
        producer.shutdown();
    }
}

2、消费MQ

import com.aliyun.openservices.ons.api.*;
import java.util.Properties;

public class AliyunMQConsumer {
    public static void main(String[] args) {
        // 1. 配置 Consumer
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");
        properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");
        properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");
        properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Consumer Group ID

        // 2. 创建 Consumer
        Consumer consumer = ONSFactory.createConsumer(properties);
        
        // 3. 订阅 Topic 和 Tag(* 表示所有 Tag)
        consumer.subscribe("YourTopic", "*", new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("收到消息: " + new String(message.getBody()));
                return Action.CommitMessage; // 消费成功
            }
        });

        // 4. 启动 Consumer
        consumer.start();
        System.out.println("消费者已启动,等待消息...");
    }
}

 

  1. 阿里云 ONS SDK 更稳定,推荐使用(比 Apache RocketMQ 客户端更适配阿里云环境)。

  2. Topic 和 Group ID 需先在阿里云控制台创建,否则会报错。

  3. 生产环境建议配置重试机制和日志监控,避免消息丢失。

  4. 消费模式

    • 集群消费(CLUSTERING):同 Group ID 的多个 Consumer 分摊消息(默认)。

    • 广播消费(BROADCASTING):同 Group ID 的每个 Consumer 都收到所有消息。

二、腾讯云RocketMQ

import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import lombok.extern.slf4j.Slf4j;

/**
 * 腾讯云rocketMQ服务类
  
 */
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class RocketTXMqService {

	@Value("${rocketmq.namespace:-1}")
	private String namespace;
	@Value("${rocketmq.producer.group:-1}")
	private String groupName;
	@Value("${rocketmq.producer.access-key:-1}")
	private String accessKey;
	@Value("${rocketmq.producer.secret-key:-1}")
	private String secretKey;
	@Value("${rocketmq.name-server:-1}")
	private String nameserver;

	// MQ生产者
	private DefaultMQProducer producer;

	// MQ实例化消费者push
	private DefaultMQPushConsumer pushConsumer;

	// MQ实例化消费者pull
	private DefaultLitePullConsumer pullConsumer;

	/**
	 * 创建生产者
	 * 
	 * @return
	 */
	public DefaultMQProducer getProducer() {

		if (null == producer) {
			// 实例化消息生产者Producer
			producer = new DefaultMQProducer(namespace, groupName,
					new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
			);
			// 设置NameServer的地址
			producer.setNamesrvAddr(nameserver);
			try {
				// 启动Producer实例
				producer.start();
			} catch (MQClientException e) {
				e.printStackTrace();
			}
		}
		return producer;
	}

	/**
	 * 同步发送 发送消息
	 */
	public void syncSend(String topic, String tag, String data) {
		producer = getProducer();
		// 发送消息
		SendResult sendResult = null;
		try {
			// 创建消息实例,设置topic和消息内容
			Message msg = new Message(topic, tag, data.getBytes(RemotingHelper.DEFAULT_CHARSET));
			sendResult = producer.send(msg);
			log.info("埋点信息发送腾讯云MQ:" + data);
			log.info("发送腾讯云MQ接口返回状态sendResult:" + sendResult);
		} catch (UnsupportedEncodingException e) {
			log.error("UnsupportedEncodingException:" + e.getMessage());
		} catch (MQClientException e) {
			log.error("MQClientException:" + e.getMessage());
		} catch (RemotingException e) {
			log.error("RemotingException:" + e.getMessage());
		} catch (MQBrokerException e) {
			log.error("MQBrokerException:" + e.getMessage());
		} catch (InterruptedException e) {
			log.error("InterruptedException:" + e.getMessage());
		}
	}

	/**
	 * 创建push消费者
	 * 
	 * @return
	 */
	public DefaultMQPushConsumer getPushConsumer() {

		if (null == pushConsumer) {// 实例化消费者
			pushConsumer = new DefaultMQPushConsumer(namespace, groupName,
					new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL权限
			// 设置NameServer的地址
			pushConsumer.setNamesrvAddr(nameserver);
		}
		return pushConsumer;
	}

	/**
	 * 创建pull 消费者
	 * 
	 * @return
	 */
	public DefaultLitePullConsumer getPullConsumer() {

		if (null == pullConsumer) {// 实例化消费者
			// 实例化消费者
			pullConsumer = new DefaultLitePullConsumer(namespace, groupName,
					new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
			// 设置NameServer的地址
			pullConsumer.setNamesrvAddr(nameserver);
			// 设置从第一个偏移量开始消费
			pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		}
		return pullConsumer;
	}

	/**
	 * push方式订阅消费
	 * 
	 * @param topicName
	 */
	public void pushConsumer(String topicName) {

		pushConsumer = this.getPushConsumer();
		if (null != pushConsumer) {

			try {
				pushConsumer.subscribe(topicName, "*");
				// 注册回调实现类来处理从broker拉取回来的消息
				pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
					// 消息处理逻辑
					log.info("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
					// 标记该消息已经被成功消费, 根据消费情况,返回处理状态
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				});
				// 启动消费者实例
				pushConsumer.start();
			} catch (MQClientException e) {
				log.error("push MQClientException:" + e.getMessage());
			}
		}
	}

	/**
	 * pull方式订阅消费
	 * 
	 * @param topicName
	 */
	public void pullConsumer(String topicName) {
		pullConsumer = this.getPullConsumer();

		if (null != pullConsumer) {
			try {
				// 订阅topic
				pullConsumer.subscribe(topicName, "*");
				// 启动消费者实例
				pullConsumer.start();
			} catch (MQClientException e) {
				log.error(" pull MQClientException:" + e.getMessage());
			}
			try {
				log.info("Consumer Started.%n");
				while (true) {
					// 拉取消息
					List<MessageExt> messageExts = pullConsumer.poll();
					log.info("%s%n", messageExts);
				}
			} finally {
				pullConsumer.shutdown();
			}
		}
	}

}


网站公告

今日签到

点亮在社区的每一天
去签到