架构第113集:网关服务器、Cassandra数据库、Redis缓存、Kafka消息队列、Elasticsearch客户端

发布于:2025-04-21 ⋅ 阅读:(22) ⋅ 点赞:(0)

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

“一个基于锁和条件变量(Condition)实现的简易版 Future,用来在某个线程中等待结果,直到被另一个线程显式唤醒并传递结果。”

🔵 举个简单使用场景

假设你在做 异步RPC调用异步消息处理这类事情:

  • 线程A 发起请求,但不知道什么时候结果返回,于是 await() 等待。

  • 线程B 收到响应时,调用 signal(result) 把数据塞进来并唤醒线程A。

小小示意:

CondFuture<String> condFuture = new CondFuture<>();

// 线程A:请求并等待
new Thread(() -> {
    try {
        String result = condFuture.await(5, TimeUnit.SECONDS);
        System.out.println("Got result: " + result);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

// 线程B:稍后响应
new Thread(() -> {
    try {
        Thread.sleep(1000); // 假装处理了 1秒
        condFuture.signal("Response is here!");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

输出:

Got result: Response is here!

流量、高并发 IoT 项目(车联网平台)的标准实践

优化点

说明

Cassandra 分库分节点

xx、xx、业务库分开,减少热数据干扰,提升查询性能

Redis 主备分离

热数据实时存储 + 备份灾容,快速读写,减少单点故障

Kafka producer/batch 配置

批量提交,延迟优化,内存缓冲提升消息发送吞吐量

Kafka consumer 手动ack + concurrency

保证消息消费可靠性,并通过多线程加速消费处理

Elasticsearch restclient

轻量高效,适合大数据量检索,降低TCP连接开销

高阶版 JT808 平台服务部署架构图

模块

优化策略

JT808服务

Netty线程模型+心跳检测+过期剔除

Redis

主备双写,主节点故障快速切换备节点

Cassandra

按业务分库,数据分离,提升查询效率

Kafka

批量提交+异步ACK+3节点副本保障高可用

Elasticsearch

异步写入+批量提交+慢查询优化

消费者组

线程池消费,超时监控,重试机制

超实用部署建议

点位

建议

Redis

主备节点使用keepalived+vip做高可用切换

Cassandra

每个库单独维护表,设置合理TTL(过期时间)清理旧数据

Kafka

生产者开启幂等性 (enable.idempotence=true) ,防止重复投递

ES

热数据分离,老数据定期归档,写入前做Bulk压缩

JT808服务器

增加防粘包拆包处理,最大包长校验,保护系统

全链路

接入链路追踪系统(如Skywalking、Zipkin),监控数据流向

Topic属性设置建议(重点!)

属性

建议值

解释

replication.factor

3

防止节点宕机丢数据

min.insync.replicas

2

写入必须至少2副本成功

acks

all

生产者端保证强一致性

enable.idempotence

true

开启幂等性,防止重复消息

retention.ms

7天(604800000)或按业务定

保留时间够补偿

cleanup.policy

delete(默认)

保证磁盘可控,避免膨胀

segment.ms

1小时

切小日志段,提升查询速度

📋 Kafka 参数调优表(超详细版)

🛠️ Producer 参数调优

参数

建议配置

作用解释

调优思路

acks all

等所有副本确认才算写成功

最强数据可靠性

retries 3 ~ 5

发送失败自动重试次数

防止瞬时抖动丢消息

enable.idempotence true

开启幂等性,避免重复投递

高一致性必开

batch.size 32KB ~ 64KB

单批次最大字节数

调大提高吞吐量,减少IO次数

linger.ms 5 ~ 10

批量发送延迟(毫秒)

稍微延迟换更大批次(减少压Kafka)

buffer.memory 64MB ~ 128MB

生产者内存缓冲池大小

内存富裕就调大(抗突发)

compression.type lz4

 或 snappy

压缩算法

降低网络带宽,提升吞吐

max.request.size 1MB

 或更大

单条消息最大尺寸

避免超大消息被拒

request.timeout.ms 30s

请求超时毫秒数

保证重试/失败及时切换

delivery.timeout.ms 60s

允许最大发送时间

配合 retries 效果好


🛠️ Consumer 参数调优

参数

建议配置

作用解释

调优思路

fetch.min.bytes 1KB ~ 10KB

最小抓取字节

抓取更多消息,减少拉取次数

fetch.max.bytes 5MB ~ 10MB

单次最大拉取量

合理拉大,防止吞吐低

fetch.max.wait.ms 500ms

最长等待时间

配合 batch 消费更流畅

max.poll.records 500 ~ 1000

每次拉取最大记录数

批量处理提升效率

session.timeout.ms 10s ~ 20s

消费组心跳超时时间

保持平稳Rebalance

heartbeat.interval.ms 3s ~ 5s

心跳间隔

配合session.timeout

enable.auto.commit false

关闭自动提交

手动控制 offset,确保精确一次

max.partition.fetch.bytes 1MB

单分区最大拉取量

高分区场景要调大


🛠️ Broker 集群端参数调优

参数

建议配置

作用解释

调优思路

num.network.threads 3 ~ 8

网络线程数

跟broker流量量级有关

num.io.threads 8 ~ 16

IO线程数(磁盘/网络)

跟磁盘/吞吐相关

log.dirs

多磁盘挂载

日志存储目录

多路径并发刷盘更快

log.segment.bytes 512MB ~ 1GB

分段文件大小

文件过小影响性能

log.retention.hours 72h

(3天)或按需

日志保留时间

结合业务定策略

message.max.bytes 1MB

 或更大

单消息最大字节数

跟 producer 端对应

replica.fetch.max.bytes 10MB

副本同步最大字节

防止副本落后太多

socket.request.max.bytes 100MB

socket请求最大字节数

保护broker防止OOM

auto.create.topics.enable false

禁止自动创建 topic

统一topic管理

【实战Tips】

  • 吞吐优先
    → 调大 batch.size、buffer.memory,使用压缩。

  • 可靠性优先
    → 开启 acks=all、幂等、合理设置 retries。

  • 高并发低延迟
    → 合理调 max.poll.records、fetch参数。

  • 集群容灾
    → Replication Factor = 3,ISR列表控制好(min.insync.replicas=2)。

  • 异常处理
    → 配置 DLQ(死信队列)+ 自定义拦截器(如 ProducerInterceptor 抓异常)。

📋 Kafka 流量压测工具推荐表

1. 官方自带工具 - kafka-producer-perf-test.sh

工具位置

一般在安装包里的:

$KAFKA_HOME/bin/kafka-producer-perf-test.sh

基础使用示例

./kafka-producer-perf-test.sh \
  --topic test-topic \
  --num-records 1000000 \
  --record-size 512 \
  --throughput -1 \
  --producer-props bootstrap.servers=localhost:9092

参数

说明

--topic

压测用的 Topic

--num-records

要发送的消息总数

--record-size

每条消息字节大小(比如 JT808 标准报文一般几十到几百字节)

--throughput

限流速率(条数/秒),-1表示不限速

--producer-props

传入Kafka Producer的连接参数

✅ 输出结果:TPS、发送延迟、吞吐量等指标


2. 官方自带工具 - kafka-consumer-perf-test.sh

基础使用示例

./kafka-consumer-perf-test.sh \
  --topic test-topic \
  --bootstrap-server localhost:9092 \
  --messages 1000000

参数

说明

--topic

要消费的Topic

--messages

要消费的消息条数

--bootstrap-server

Kafka地址

✅ 输出结果:消费TPS、平均延迟、吞吐量


🚀 高阶压测思路

场景

工具

建议操作

单机单Topic最大吞吐

producer-perf-test.sh

配置大批量数据、-1不限速

多分区压测

producer-perf-test.sh

多开进程,发送到不同分区

压测集群消费能力

consumer-perf-test.sh

配合group.id并发消费

网络/磁盘瓶颈排查

producer/consumer + Linux iostat、sar

观察磁盘/网络IO

端到端延迟压测

自定义带时间戳的Payload

生产+消费后计算RTT


🔥 高阶技巧:配合这些一起压测更稳

  • 调大 Producer batch.sizelinger.ms → 批量发

  • Broker socket.request.max.bytes调大 → 防止大批次失败

  • 保证磁盘I/O够快(SSD最佳)

  • JVM 参数优化(-Xms -Xmx固定堆大小)

  • 配置 Topic 分区数、Replication Factor合理分摊压力


📈 实战Tips总结

  • 小消息(如JT808位置上报)→ 高TPS 压测重点

  • 大消息(报警/多媒体)→ 吞吐量/延迟 双压测

  • 消费端一定要压 → 避免只测发送忽略消费瓶颈!


网站公告

今日签到

点亮在社区的每一天
去签到