【RabbitWQ】基于 Java 实现轻量级消息队列(二)

发布于:2025-08-30 ⋅ 阅读:(25) ⋅ 点赞:(0)

目录

一. 定义实体类

1.1 交换机

1.2 队列

1.3 绑定

1.4 消息

二. 格式转换

2.1 Json字符串

2.2 序列化和反序列化

三. 异常处理


一. 定义实体类

1.1 交换机

交换机类中的核心属性

  • 交换机名:交换机的唯一身份标识
  • 交换机类型:默认为直连交换机
  • 交换机是否持久化:默认不持久化(存储硬盘)
  • 其他功能:通过选项增加其他功能
public class Exchange {
    //唯一身份标识
    private String name;
    //交换机类型
    private ExchangeType type = ExchangeType.DIRECT;
    //是否持久化
    private boolean durable = false;
    //提供额外的选项
    private Map<String, Object> arguments = new HashMap<>();
}

交换机类型

使用枚举设置交换机的三种类型

  • 0:直接交换机
  • 1:扇形交换机
  • 2:主题交换机
public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);

    private final int type;

    ExchangeType(int type) {
        this.type = type;
    }

    public int getType() {
        return type;
    }
}

1.2 队列

队列类中的核心属性

  • 队列名称:唯一标识符
  • 持久性:默认非持久化
  • 自动删除:当无绑定交换机时自动删除
  • 扩展属性:支持附加功能扩展
  • 消费者统计:实时统计当前订阅消费者数量
  • 消费进度:追踪当前消费者处理位置
public class MSGQueue {
    //唯一标识
    private String name;
    //是否持久化保存
    private boolean durable = false;
    //是否自动删除,如果没有交换机使用,会自动删除这个队列
    private boolean autoDelete = false;
    //扩展属性(序列化为json字符串,然后写入数据库)
    private Map<String, Object> arguments = new HashMap<>();
    //使用list记录包含的消费者
    private List<ConsumerEnv> consumerEnvs = new ArrayList<>();
    //记录到哪个消费者了(线程安全)
    private AtomicInteger consumerSeq = new AtomicInteger();
}

1.3 绑定

绑定类中的重要属性:

  • 交换机名
  • 队列名
  • 绑定的关系

不存在持久化,如果交换机或者队列其中一个消失了,那么持久化也没有意义

public class Binding {
    private String exchangeName;
    private String queueName;
    private String bindingKey;
}

1.4 消息

消息类中的重要属性

  • 版本号:用于验证消息是否经过修改
  • 信息头:记录消息ID及其关联关系
  • 内容:保存消息的二进制数据(不是json,需要进行序列化和反序列化)
  • 辅助属性:标记消息在文件中的具体位置
  • 逻辑删除:这个消息是否有效

public class Message implements Serializable {
//    版本
    private static final long serialVersionUID = 1L;
//    信息头
    //创建实例,避免范围null指针
    private BasicProperties basicProperties = new BasicProperties();

//    内容部分
    private byte[] body;

//    辅助属性
    /*
    * 起始位置和结束位置:【offsetBeg,offsetEnd)
    * */
    private long offsetBeg = 0;
    private long offsetEnd = 0;
    //0x1表示有效,0x0表示删除a
    private byte isValid = 0x1;

请求头类

public class BasicProperties implements Serializable {
//    唯一身份标识
    private String messageId;
/*    对应关系:和binding匹配
    交换机类型是DIRECT,routingKey为目标队列的名字
    交换机类型是FANOUT,routingKey为无(不使用)
    交换机类型是TOPIC,routingKey和bindingKey绑定
    */

    private String routingKey;
//    是否持久化。1-不持久化,2-持久化
    private int deliverMode = 1;
}

使用工厂方式创建消息(封装创建细节)


    public static Message createMessage(String routingKey,byte[] body ,BasicProperties basicProperties){
        Message message = new Message();
        if(basicProperties!=null){
            message.setBasicProperties(basicProperties);
        }
        message.setRoutingKey(routingKey);
        message.setMessageId("M-"+ UUID.randomUUID());
        message.body = body;
        return message;

    }

这里即使BasicProperties 中存在值,也会覆盖掉其中的routingKey 和MessageId,保证Id的唯一性和明确参数的优先级,(如果传入的routingKey和basicProperties中的routingKey值不一致,以传入的值为准)便于管理

二. 格式转换

2.1 Json字符串

使用HashMap存储额外信息,但是这些信息都是需要存储在数据库中,数据库中无法存储对象

所以需要将HashMap对象转为Json字符串格式存储

    private Map<String, Object> arguments = new HashMap<>();

对象---->字符串

    public String getArguments(){
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.writeValueAsString(arguments);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "{}";
    }

在存储的时候,调用这个方法就可以将对象转变为字符串类型,然后进行存储


字符串---->Json对象

//    输入为字符串但是设置为json格式
    public void setArguments(String argumentsJson) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String,Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

从数据库中取出的数据(字符串),将字符串类型的数据进行格式转换,才能进行使用

2.2 序列化和反序列化

使用Json本质上也是文本格式(文本类型数据),序列化后的二进制数据体积更小,传输速率更快,在处理大量数据的时候更有优势,Json需要对不同类型的数据定义不同的转换逻辑,反序列化可以直接还原成原来的对象类型,更高效

只要继承Serializable类,就表示这个类就可以进行序列化和反序列化writeObject方法

序列化

    public static byte[] toBytes(Object object) throws IOException {
        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
            try(ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)){
//                writeObject方法将数据序列化,写入outputStream中,因为outputStream与byteArrayOutputStream关联
//                最终写入byteArrayOutputStream
                outputStream.writeObject(object);
            }
            return byteArrayOutputStream.toByteArray();
        }
    }
  1. 通过writeObject方法将数据序列化
  2. 这个字节流首先被写入ObjectOutputStream 的缓冲区,由于关联ByteArrayOutputStream
  3. 最终会被写入到内存中的 ByteArrayOutputStream 缓冲区里

反序列化

    public static Object fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
        Object object = null;
        try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)){
            try(ObjectInputStream objectInputStream =new ObjectInputStream(byteArrayInputStream)){
               object = objectInputStream.readObject();
            }
        }
        return object;
    }
  1. 通过readObject方法将数据序列化
  2. 这个字节流首先被写入ObjectInputStream 的缓冲区,由于关联ByteArrayInputStream
  3. 最终会被写入到内存中的 ByteArrayOutputStream 缓冲区里

三. 异常处理

MqException类通过继承Exception类

public class MqException extends Exception {
    public MqException (String reason){
        super(reason);
    }
}

这样的异常处理粒度较粗,好处就是用户可以自定义异常的原因