目录
一. 定义实体类
1.1 交换机
交换机类中的核心属性
- 交换机名:交换机的唯一身份标识
- 交换机类型:默认为直连交换机
- 交换机是否持久化:默认不持久化(存储硬盘)
- 其他功能:通过选项增加其他功能
public class Exchange {
//唯一身份标识
private String name;
//交换机类型
private ExchangeType type = ExchangeType.DIRECT;
//是否持久化
private boolean durable = false;
//提供额外的选项
private Map<String, Object> arguments = new HashMap<>();
}
交换机类型
使用枚举设置交换机的三种类型
- 0:直接交换机
- 1:扇形交换机
- 2:主题交换机
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
ExchangeType(int type) {
this.type = type;
}
public int getType() {
return type;
}
}
1.2 队列
队列类中的核心属性
- 队列名称:唯一标识符
- 持久性:默认非持久化
- 自动删除:当无绑定交换机时自动删除
- 扩展属性:支持附加功能扩展
- 消费者统计:实时统计当前订阅消费者数量
- 消费进度:追踪当前消费者处理位置
public class MSGQueue {
//唯一标识
private String name;
//是否持久化保存
private boolean durable = false;
//是否自动删除,如果没有交换机使用,会自动删除这个队列
private boolean autoDelete = false;
//扩展属性(序列化为json字符串,然后写入数据库)
private Map<String, Object> arguments = new HashMap<>();
//使用list记录包含的消费者
private List<ConsumerEnv> consumerEnvs = new ArrayList<>();
//记录到哪个消费者了(线程安全)
private AtomicInteger consumerSeq = new AtomicInteger();
}
1.3 绑定
绑定类中的重要属性:
- 交换机名
- 队列名
- 绑定的关系
不存在持久化,如果交换机或者队列其中一个消失了,那么持久化也没有意义
public class Binding {
private String exchangeName;
private String queueName;
private String bindingKey;
}
1.4 消息
消息类中的重要属性
- 版本号:用于验证消息是否经过修改
- 信息头:记录消息ID及其关联关系
- 内容:保存消息的二进制数据(不是json,需要进行序列化和反序列化)
- 辅助属性:标记消息在文件中的具体位置
- 逻辑删除:这个消息是否有效
public class Message implements Serializable {
// 版本
private static final long serialVersionUID = 1L;
// 信息头
//创建实例,避免范围null指针
private BasicProperties basicProperties = new BasicProperties();
// 内容部分
private byte[] body;
// 辅助属性
/*
* 起始位置和结束位置:【offsetBeg,offsetEnd)
* */
private long offsetBeg = 0;
private long offsetEnd = 0;
//0x1表示有效,0x0表示删除a
private byte isValid = 0x1;
请求头类
public class BasicProperties implements Serializable {
// 唯一身份标识
private String messageId;
/* 对应关系:和binding匹配
交换机类型是DIRECT,routingKey为目标队列的名字
交换机类型是FANOUT,routingKey为无(不使用)
交换机类型是TOPIC,routingKey和bindingKey绑定
*/
private String routingKey;
// 是否持久化。1-不持久化,2-持久化
private int deliverMode = 1;
}
使用工厂方式创建消息(封装创建细节)
public static Message createMessage(String routingKey,byte[] body ,BasicProperties basicProperties){
Message message = new Message();
if(basicProperties!=null){
message.setBasicProperties(basicProperties);
}
message.setRoutingKey(routingKey);
message.setMessageId("M-"+ UUID.randomUUID());
message.body = body;
return message;
}
这里即使BasicProperties 中存在值,也会覆盖掉其中的routingKey 和MessageId,保证Id的唯一性和明确参数的优先级,(如果传入的routingKey和basicProperties中的routingKey值不一致,以传入的值为准)便于管理
二. 格式转换
2.1 Json字符串
使用HashMap存储额外信息,但是这些信息都是需要存储在数据库中,数据库中无法存储对象
所以需要将HashMap对象转为Json字符串格式存储
private Map<String, Object> arguments = new HashMap<>();
对象---->字符串
public String getArguments(){
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return "{}";
}
在存储的时候,调用这个方法就可以将对象转变为字符串类型,然后进行存储
字符串---->Json对象
// 输入为字符串但是设置为json格式
public void setArguments(String argumentsJson) {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String,Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
从数据库中取出的数据(字符串),将字符串类型的数据进行格式转换,才能进行使用
2.2 序列化和反序列化
使用Json本质上也是文本格式(文本类型数据),序列化后的二进制数据体积更小,传输速率更快,在处理大量数据的时候更有优势,Json需要对不同类型的数据定义不同的转换逻辑,反序列化可以直接还原成原来的对象类型,更高效
只要继承Serializable类,就表示这个类就可以进行序列化和反序列化writeObject方法
序列化
public static byte[] toBytes(Object object) throws IOException {
try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
try(ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)){
// writeObject方法将数据序列化,写入outputStream中,因为outputStream与byteArrayOutputStream关联
// 最终写入byteArrayOutputStream
outputStream.writeObject(object);
}
return byteArrayOutputStream.toByteArray();
}
}
- 通过writeObject方法将数据序列化
- 这个字节流首先被写入ObjectOutputStream 的缓冲区,由于关联ByteArrayOutputStream
- 最终会被写入到内存中的 ByteArrayOutputStream 缓冲区里
反序列化
public static Object fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
Object object = null;
try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)){
try(ObjectInputStream objectInputStream =new ObjectInputStream(byteArrayInputStream)){
object = objectInputStream.readObject();
}
}
return object;
}
- 通过readObject方法将数据序列化
- 这个字节流首先被写入ObjectInputStream 的缓冲区,由于关联ByteArrayInputStream
- 最终会被写入到内存中的 ByteArrayOutputStream 缓冲区里
三. 异常处理
MqException类通过继承Exception类
public class MqException extends Exception {
public MqException (String reason){
super(reason);
}
}
这样的异常处理粒度较粗,好处就是用户可以自定义异常的原因