加群联系作者vx:xiaoda0423
仓库地址:https://webvueblog.github.io/JavaPlusDoc/
https://1024bat.cn/
// 项目结构说明(Spring Boot + Maven 简化版)
/**
* 目录结构:
* ├── Application.java
* ├── controller/OrderController.java
* ├── service/OrderService.java
* ├── service/RetryService.java
* ├── bloom/BloomFilterManager.java
* ├── kafka/OrderProducer.java
* ├── kafka/OrderConsumer.java
* ├── model/OrderMessage.java
* ├── config/KafkaConfig.java
* ├── config/MetricsConfig.java
* ├── retry/RetryTaskScheduler.java
*/
// Application.java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
// controller/OrderController.java
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired private OrderService orderService;
@PostMapping("/create")
public ResponseEntity<String> createOrder(@RequestParam Long skuId) {
orderService.createOrder(skuId);
return ResponseEntity.ok("Order request received");
}
}
// service/OrderService.java
@Service
public class OrderService {
@Autowired private BloomFilterManager bloomFilter;
@Autowired private RedisTemplate<String, Object> redis;
@Autowired private RedissonClient redisson;
@Autowired private OrderProducer producer;
@Autowired private MeterRegistry meterRegistry;
public void createOrder(Long skuId) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
if (!bloomFilter.contains(skuId)) throw new RuntimeException("SKU not exists");
String key = "sku:" + skuId;
Object cached = redis.opsForValue().get(key);
if (cached != null) return;
RLock lock = redisson.getLock("lock:sku:" + skuId);
lock.lock();
try {
// 模拟 DB 查询
Object dbData = new Object();
redis.opsForValue().set(key, dbData, 30, TimeUnit.MINUTES);
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
redis.delete(key);
} catch (InterruptedException ignored) {}
});
} finally {
lock.unlock();
}
// 发送 Kafka 消息
producer.sendOrder(new OrderMessage(skuId, UUID.randomUUID().toString()));
} finally {
sample.stop(meterRegistry.timer("order.create.timer"));
}
}
}
// bloom/BloomFilterManager.java
@Component
public class BloomFilterManager {
private BloomFilter<Long> filter = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.01);
@PostConstruct
public void init() {
// 模拟加载数据
for (long i = 1; i <= 1000; i++) filter.put(i);
}
public boolean contains(Long skuId) {
return filter.mightContain(skuId);
}
}
// kafka/OrderProducer.java
@Component
public class OrderProducer {
@Autowired private KafkaTemplate<String, String> kafka;
public void sendOrder(OrderMessage msg) {
kafka.send("order_topic", JSON.toJSONString(msg));
}
}
// kafka/OrderConsumer.java
@Component
public class OrderConsumer {
@KafkaListener(topics = "order_topic", groupId = "order_group")
public void onMessage(String message, Acknowledgment ack) {
try {
OrderMessage order = JSON.parseObject(message, OrderMessage.class);
// 处理订单逻辑
ack.acknowledge();
} catch (Exception e) {
// 可转发至 retry_topic,由 RetryService 定期处理
}
}
}
// model/OrderMessage.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {
private Long skuId;
private String orderId;
}
// config/KafkaConfig.java
@Configuration
public class KafkaConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
}
// config/MetricsConfig.java
@Configuration
public class MetricsConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config().commonTags("app", "order-service");
}
}
// retry/RetryTaskScheduler.java
@Component
public class RetryTaskScheduler {
@Scheduled(fixedDelay = 5000)
public void retry() {
// 查询 DB 或 Redis 获取重试任务,重新发送 Kafka
}
}
加 Thread.currentThread().interrupt(); // 恢复中断标志
这是一个 Java 多线程编程中非常经典的问题,关键点在于 中断标志(interrupt flag)机制是协作式的,需要手动维护或检查。
✅ 一句话解释:
Thread.currentThread().interrupt();
的作用是:在捕获 InterruptedException
异常后, “重新设置”线程的中断标志位,以便后续代码知道线程已经被中断。
🔍 背景知识:线程中断机制
在 Java 中,线程的中断(interrupt)是一种“温和的请求”,不会强制终止线程,而是通过设置一个中断标志位,让线程自己检查并响应。
调用
thread.interrupt()
:设置中断标志为 true。某些阻塞方法(如
Thread.sleep()
、wait()
、join()
)会抛出InterruptedException
,并自动清除中断标志(设为 false)。
🚨 为什么要恢复中断标志?
举个典型场景:
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 中断标志会被清除(设为 false)
// 所以需要重新设置为 true,以便上层逻辑能感知
Thread.currentThread().interrupt(); // 恢复中断状态
}
如果你不加 Thread.currentThread().interrupt();
,那么线程会“忘记”自己被中断过,上层逻辑或框架将无法感知到这次中断事件。
🧠 举个实际例子(不加就会出问题)
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
// do something
Thread.sleep(1000); // 可被中断
}
} catch (InterruptedException e) {
// 中断了,但 sleep 会清除中断标志
// 如果不恢复,这里会继续循环
// 正确做法是:
Thread.currentThread().interrupt();
}
}
✅ 总结
目的 |
说明 |
---|---|
恢复中断状态 |
InterruptedException 会清除中断标志,你要手动恢复 |
便于上层感知中断 |
保证中断信号能沿上传递,线程能优雅退出 |
遵循中断协议 |
线程设计中推荐通过检查中断状态实现协作式退出 |
✅ 上层如何感知线程被中断?
通过 Thread.currentThread().isInterrupted()
或响应 InterruptedException
:
public void outerLogic() {
try {
workerLogic(); // 内层调用
} catch (InterruptedException e) {
// ✅ 这里感知到了中断,可以做善后逻辑(释放资源、标记失败、退出等)
System.out.println("Outer: Thread was interrupted, cleaning up...");
// 也可以继续向上传递
Thread.currentThread().interrupt(); // 再次设置中断标志,方便更上层知道
}
}
✅ 内层 try-catch 如何恢复中断状态?
public void workerLogic() throws InterruptedException {
try {
// 可能阻塞的方法
Thread.sleep(1000);
} catch (InterruptedException e) {
// ⚠️ 中断标志被清除了,这里必须手动恢复
Thread.currentThread().interrupt(); // 恢复中断标志
throw e; // ❗向上抛异常,让上层处理
}
}
❗如果你不恢复中断状态,会怎么样?
中断被吃掉了,线程不会再退出:
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 没有恢复中断,线程状态被重置为未中断
// 线程后续逻辑继续执行,不会退出
}
这是最容易被忽略的 Bug:线程实际被中断了,但程序表现上“什么都没发生”。
✅ 上层如何感知线程被中断?
通过 Thread.currentThread().isInterrupted()
或响应 InterruptedException
:
public void outerLogic() {
try {
workerLogic(); // 内层调用
} catch (InterruptedException e) {
// ✅ 这里感知到了中断,可以做善后逻辑(释放资源、标记失败、退出等)
System.out.println("Outer: Thread was interrupted, cleaning up...");
// 也可以继续向上传递
Thread.currentThread().interrupt(); // 再次设置中断标志,方便更上层知道
}
}
优化点说明
类别 |
优化 |
说明 |
---|---|---|
注解写法 |
使用 |
避免不必要的包作用域暴露,提高封装性 |
构造器 |
删除无参构造器 |
Spring 默认注入不需要写明空构造器 |
命名 |
管道节点命名更清晰 |
可方便后期动态 debug |
对象重用 |
解码器每次 new 是合理的 |
编解码器 stateless 可共用,业务处理器建议 Spring 单例注入 |
内存优化 |
设置合理帧长度限制 |
40960 限制消息长度,防御恶意请求,防止 OOM |
JVM / Netty 层级调优建议
TCP 参数调优(在 Netty 启动类中):
.childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true)
对象池复用:比如 ByteBuf 和业务对象可使用
FastThreadLocal
或Recycler
。线程模型优化:合理配置 Netty Boss/Worker 线程数量,比如:
new NioEventLoopGroup(Math.max(2, Runtime.getRuntime().availableProcessors()))
关闭 Netty 的资源泄露检测(若已稳定) :
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
避免过多日志与堆栈打印,尤其是 Idle 触发过多或帧异常抛出。
Netty 的“责任链管道”,可理解为消息处理的流水线
空闲检测:xx秒无任何读写操作,则触发 xxx 事件(可用于心跳机制)
帧解码器:以 0x7e 为分隔符拆包,避免 TCP 粘包/拆包问题
最大帧长度限制为 xxx 字节,防止恶意数据造成 OOM
传入多个分隔符实现灵活识别(如 0x7e、0x7e 0x7e)
解码器:将原始二进制数据解析成 JT808 消息对象(下层逻辑使用 POJO)
编码器:将响应对象转为二进制 JT808 协议格式并发送
核心业务处理器:处理业务逻辑(登录、位置信息、心跳等)
防止 TCP 粘包,按 0x7e 拆包
粘包(Sticky Packet)和拆包(Half Packet)是 TCP 网络编程中的常见问题,特别是你在做像 JT808 协议这种基于 TCP 长连接的数据传输时,必须考虑并解决这两个问题。
🚩 一句话理解
粘包: 多个消息黏在一起,一次接收收到了多个。
拆包: 一个消息被拆成多段发出,需要多次接收才能完整拼回。
🎯 为什么会出现粘包/拆包?
因为 TCP 是流式协议,不像 UDP 一包一发,它不关心消息边界,只负责“字节流”的可靠传输。比如:
应用层连续发送两条消息 → TCP 可能合并发送(粘包) 。
消息太大,超过了一次 TCP 报文长度 → TCP 可能分多次发送(拆包) 。
📦 粘包/拆包 Java 示例
我们用 Java 的 ServerSocket + Socket 模拟:
🧪 示例一:粘包现象
👨💻 客户端连续发送两条消息:
Socket socket = new Socket("localhost", 8080); OutputStream out = socket.getOutputStream(); out.write("Hello".getBytes()); out.write("World".getBytes()); out.flush();
👨💻 服务端一次性读取收到:
ServerSocket serverSocket = new ServerSocket(8080); Socket client = serverSocket.accept(); InputStream in = client.getInputStream(); byte[] buffer = new byte[1024]; int len = in.read(buffer); System.out.println(new String(buffer, 0, len)); // 输出可能是 "HelloWorld"(粘包),看不到分隔
🧪 示例二:拆包现象
👨💻 客户端发送一条大消息:
byte[] largeMsg = new byte[10000]; // 比 MTU 或缓冲区大 Arrays.fill(largeMsg, (byte)'A'); out.write(largeMsg); out.flush();
👨💻 服务端多次才能读取完:
byte[] buffer = new byte[1024]; int total = 0; while ((len = in.read(buffer)) != -1) { total += len; // 模拟接收多次才拼完整 } System.out.println("Total bytes received: " + total);
🧩 怎么解决粘包 / 拆包?
✅ 方式一:使用分隔符(如 JT808 用 0x7e)
pipeline.addLast(new DelimiterBasedFrameDecoder(40960, Unpooled.wrappedBuffer(new byte[]{0x7e})));
报文结构:
[0x7e][消息体][0x7e]
解码器自动识别每一条完整的帧,避免粘包/拆包
✅ 方式二:使用固定长度帧
pipeline.addLast(new FixedLengthFrameDecoder(20)); // 每20字节一帧
适用于消息长度固定的协议。
✅ 方式三:消息头 + 长度字段
自定义协议格式:
| 消息头 | 长度字段 | 数据内容 | | 2字节 | 4字节 | N字节 |
你可以用
LengthFieldBasedFrameDecoder
:pipeline.addLast(new LengthFieldBasedFrameDecoder( 65536, 2, 4, 0, 0));
📌 总结对比表
类型
说明
场景
粘包
多条消息黏一起
TCP合并发送
拆包
一条消息被拆
TCP 分片、缓存不够
解决方案
分隔符、定长、长度字段
取决于协议设计
报文结构设计(符合 JT808 协议)
| 起始位 | 消息头(12字节) | 消息体(N字节) | CRC校验码 | 结束位 | | 0x7e | 消息ID + 属性 + 终端手机号 + 流水号 | 业务数据 | 1字节 | 0x7e |
📦 示例:完整 Java 客户端(模拟构造 JT808 报文)
public class JT808ClientWithCRC { public static void main(String[] args) throws Exception { Socket socket = new Socket("127.0.0.1", 9000); OutputStream out = socket.getOutputStream(); byte[] body = new byte[]{0x01, 0x02, 0x03, 0x04}; // 示例消息体 byte[] message = buildJT808Packet((short) 0x0200, body); // 0x0200 = 位置信息 out.write(message); out.flush(); System.out.println("已发送 JT808 报文: " + bytesToHex(message)); socket.close(); } // 构造完整 JT808 报文(含 CRC、起止位) private static byte[] buildJT808Packet(short msgId, byte[] body) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); baos.write(0x7e); // 起始位 ByteArrayOutputStream headerAndBody = new ByteArrayOutputStream(); // 消息 ID (2字节) headerAndBody.write((msgId >> 8) & 0xff); headerAndBody.write(msgId & 0xff); // 消息体属性 (2字节) - 此处只填消息长度 int bodyLen = body.length; headerAndBody.write((bodyLen >> 8) & 0xff); headerAndBody.write(bodyLen & 0xff); // 终端手机号(6字节 BCD 编码)示例:12345678901 byte[] phoneBCD = bcd("12345678901"); headerAndBody.write(phoneBCD, 0, 6); // 流水号(2字节) headerAndBody.write(0x00); headerAndBody.write(0x01); // 消息体 headerAndBody.writeBytes(body); byte[] content = headerAndBody.toByteArray(); // CRC 校验(从 消息ID 到 消息体 之前的所有字节) byte crc = calcChecksum(content); baos.writeBytes(content); baos.write(crc); baos.write(0x7e); // 结束位 return baos.toByteArray(); } // BCD 编码(手机号等) private static byte[] bcd(String digits) { if (digits.length() % 2 != 0) { digits = "0" + digits; } byte[] result = new byte[digits.length() / 2]; for (int i = 0; i < digits.length(); i += 2) { result[i / 2] = (byte) (((digits.charAt(i) - '0') << 4) | (digits.charAt(i + 1) - '0')); } return result; } // 校验码(JT808 采用异或校验) private static byte calcChecksum(byte[] bytes) { byte checksum = bytes[0]; for (int i = 1; i < bytes.length; i++) { checksum ^= bytes[i]; } return checksum; } private static String bytesToHex(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) sb.append(String.format("%02X ", b)); return sb.toString(); } }
🔍 报文示例(打印结果)
7E 02 00 00 04 01 23 45 67 89 01 00 01 01 02 03 04 XX 7E ↑ 手机号(BCD) ↑ CRC
02 00
:消息 ID(0x0200 = 位置信息)00 04
:消息体长度(4字节)01 23 45 67 89 01
:终端手机号(BCD 编码)00 01
:流水号01 02 03 04
:消息体XX
:CRC 校验值7E
:起止标识