在企业数据管理中,联系人信息与其关联的标签数据存储的量比较大,而业务系统又需要实时获取包含完整标签信息的联系人数据。本文将介绍如何使用Flink CDC技术,实现联系人表与联系人标签表的数据实时同步,并组装成统一格式写入Elasticsearch,为业务查询提供高效支持。欢迎评论区交流~。
业务场景与需求分析
我们的系统中存在三张核心表:
- 企业表(
company_info_*
):存储企业基本信息,包含多个地区分表 - 联系人表(
company_linkman
):存储企业联系人的基本信息,每条记录有唯一的id
作为主键 - 联系人标签表(
company_linkman_tag
):存储联系人与标签的关联关系,通过linkman_id
与联系人表关联
业务需求是:实时同步这两张表的数据,组装成{linkman:"联系人信息", tagIds:["标签ID1","标签ID2"]}
格式的文档,最终写入Elasticsearch,确保查询时能够根据标签信息以及联系人基本信息检索出对应的联系人信息。
技术选型
考虑到需求的实时性和数据关联的复杂性,我们选择以下技术栈:
- Flink 1.17.1:作为流处理引擎,提供强大的状态管理和时间窗口能力
- Flink CDC:基于Debezium的变更数据捕获工具,能实时捕获MySQL表的变更
- RocksDB:作为Flink的状态后端,提供高效的本地状态存储
- Elasticsearch:作为最终的数据存储,支持高效的全文检索
实现方案详解
1. 整体架构设计
整个数据同步流程分为6个核心步骤,形成闭环数据流:
数据源层:
- MySQL数据库中的三张核心表(企业表、联系人表、联系人标签表)
- 通过Flink CDC的Debezium连接器实时捕获表中数据的增删改操作(binlog解析)
数据接入层:
- Flink应用通过
MySqlSource
读取CDC变更事件 - 事件格式为JSON,包含操作类型(c/u/d/r)、前后数据快照、表名等元信息
- Flink应用通过
数据过滤层:
- 过滤无效事件(如格式错误、不相关操作类型)
- 仅保留需要同步的表数据(联系人表、标签表)
数据分组层:
- 按
linkman_id
对数据流进行分组(KeyBy操作) - 确保同一联系人的所有变更(包括基本信息和标签)进入同一个处理节点
- 按
状态处理层:
- 基于Flink的MapState维护状态数据:
- 联系人基本信息状态(
linkmanState
) - 联系人标签集合状态(
linkmanTagsState
)
- 联系人基本信息状态(
- 通过定时器和批处理机制触发数据组装(避免频繁输出)
- 基于Flink的MapState维护状态数据:
数据输出层:
- 将组装后的
{linkman_id, linkman_name, tagIdList}
格式数据写入Elasticsearch(本文没有体现) - 支持通过Kafka作为中间缓冲区(可选,根据集群规模调整,在正式环境中采用kafka缓冲区,通过logstash读取kafak内容并进行格式转换后写入到Elasticserach,在本文中未作体现)
- 将组装后的
2. 核心代码实现
2.1 初始化Flink环境与CDC源
首先,我们需要初始化Flink执行环境,并配置MySQL CDC源,指定需要监控的表:
// 初始化执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置检查点(容错机制)
configureCheckpointing(env);
// 配置RocksDB状态后端(本地状态存储)
configureRocksDBStateBackend(env);
// 创建MySQL CDC源
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
.hostname(CommonConstants.MYSQL_HOSTNAME)
.port(CommonConstants.MYSQL_POST)
.databaseList(MYSQL_DATABASE)
.tableList("db.company_linkman,db.company_linkman_tag") // 仅监听需要的表
.username(CommonConstants.MYSQL_USERNAME)
.password(CommonConstants.MYSQL_PASSWORD)
.deserializer(new JsonDebeziumDeserializationSchema()) // 输出JSON格式
.serverTimeZone("Asia/Shanghai")
.startupOptions(StartupOptions.latest()) // 从最新位置开始同步
.build();
// 读取CDC数据
DataStream<String> cdcStream = env.fromSource(
mysqlSource,
WatermarkStrategy.noWatermarks(),
"mysql-cdc-source"
).setParallelism(1);
2.2 数据过滤与分组
对CDC捕获的数据进行过滤,只保留增、删、改、读四种操作,并按联系人ID进行分组:
// 过滤有效事件
DataStream<String> filteredStream = cdcStream
.filter(new RichFilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
try {
JSONObject json = JSON.parseObject(value);
String op = json.getString("op");
// 只处理增、删、改、快照读操作
return "c".equals(op) || "u".equals(op) || "d".equals(op) || "r".equals(op);
} catch (Exception e) {
getRuntimeContext().getMetricGroup().counter("invalid-events").inc();
return false;
}
}
})
.name("filter-valid-events");
// 按联系人ID分组(核心:确保同一联系人的所有数据进入同一处理节点)
KeyedStream<String, String> keyedStream = filteredStream
.keyBy(event -> {
try {
JSONObject json = JSON.parseObject(event);
JSONObject source = json.getJSONObject("source");
String table = source.getString("table");
JSONObject data = "d".equals(json.getString("op")) ?
json.getJSONObject("before") : json.getJSONObject("after");
// 按表类型提取联系人ID
if ("company_linkman".equals(table)) {
return data.getString("id"); // 联系人表用自身ID
} else if ("company_linkman_tag".equals(table)) {
return data.getString("linkman_id"); // 标签表用关联的联系人ID
}
} catch (Exception e) {
e.printStackTrace();
}
return "unknown";
});
2.3 状态管理设计
使用Flink的MapState维护联系人与标签的状态信息,同时配置状态过期时间(避免内存溢出):
@Override
public void open(Configuration parameters) {
// 配置状态TTL(生存时间):3天未更新则自动清理
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(3))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 写入时更新过期时间
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
// 联系人基本信息状态(key: linkman_id, value: 联系人JSON字符串)
MapStateDescriptor<String, String> linkmanDesc = new MapStateDescriptor<>(
"linkmanState", String.class, String.class);
linkmanDesc.enableTimeToLive(ttlConfig);
linkmanState = getRuntimeContext().getMapState(linkmanDesc);
// 联系人标签集合状态(key: linkman_id, value: 标签ID集合)
MapStateDescriptor<String, Set<String>> tagsDesc = new MapStateDescriptor<>(
"linkmanTagsState",
TypeInformation.of(String.class),
TypeInformation.of(new TypeHint<Set<String>>() {})
);
tagsDesc.enableTimeToLive(ttlConfig);
linkmanTagsState = getRuntimeContext().getMapState(tagsDesc);
}
2.4 核心处理逻辑
通过KeyedProcessFunction
实现联系人与标签的关联处理,支持双向触发同步:
@Override
public void processElement(String event, Context ctx, Collector<String> out) throws Exception {
String linkmanId = ctx.getCurrentKey();
if ("unknown".equals(linkmanId)) {
return;
}
try {
JSONObject eventJson = JSON.parseObject(event);
String table = eventJson.getJSONObject("source").getString("table");
String op = eventJson.getString("op");
JSONObject data = "d".equals(op) ?
eventJson.getJSONObject("before") : eventJson.getJSONObject("after");
// 根据表类型处理数据
switch (table) {
case "company_linkman":
handleLinkman(data, op); // 处理联系人信息变更
break;
case "company_linkman_tag":
handleLinkmanTag(data, op); // 处理标签关联变更
break;
}
// 批处理逻辑:达到阈值(50条)或超时(5秒)则触发输出
Integer count = updateCounter.get(linkmanId) == null ? 1 : updateCounter.get(linkmanId) + 1;
updateCounter.put(linkmanId, count);
if (count >= BATCH_SIZE_THRESHOLD) {
// 达到批量阈值,立即处理
processAndEmit(linkmanId, out);
resetBatchState(linkmanId);
} else {
// 注册定时器,确保超时后处理
long fireTime = ctx.timerService().currentProcessingTime() + BUFFER_TIME_MS;
ctx.timerService().registerProcessingTimeTimer(fireTime);
timers.put(linkmanId, fireTime);
}
} catch (Exception e) {
getRuntimeContext().getMetricGroup().counter("processing-errors").inc();
e.printStackTrace();
}
}
2.5 联系人与标签的处理方法
分别实现联系人表和标签表的状态更新逻辑:
// 处理联系人信息(新增/更新/删除)
private void handleLinkman(JSONObject data, String op) throws Exception {
if (data == null) return;
String linkmanId = data.getString("id");
if ("d".equals(op)) {
// 删除操作:清理状态
linkmanState.remove(linkmanId);
linkmanTagsState.remove(linkmanId);
} else {
// 新增/更新:保存联系人信息
data.put("doc_type", "linkman");
linkmanState.put(linkmanId, data.toJSONString());
}
}
// 处理标签关联(新增/更新/删除)
private void handleLinkmanTag(JSONObject data, String op) throws Exception {
if (data == null) return;
String tagId = data.getString("id");
String linkmanId = data.getString("linkman_id");
String deleteFlag = data.getString("delete_flag");
// 获取当前标签集合(不存在则初始化)
Set<String> tagIds = linkmanTagsState.get(linkmanId);
if (tagIds == null) tagIds = new HashSet<>();
// 更新标签集合
if ("d".equals(op) || "1".equals(deleteFlag)) {
tagIds.remove(tagId); // 删除标签
} else {
tagIds.add(tagId); // 添加标签
}
// 保存更新后的标签集合(空集合则清理状态)
if (tagIds.isEmpty()) {
linkmanTagsState.remove(linkmanId);
} else {
linkmanTagsState.put(linkmanId, tagIds);
}
}
2.6 数据组装与输出
将联系人信息与标签集合组装成目标格式,输出到Elasticsearch:
private void processAndEmit(String linkmanId, Collector<String> out) throws Exception {
// 仅当联系人信息存在时输出(避免孤立标签)
if (linkmanState.contains(linkmanId)) {
String linkmanJsonStr = linkmanState.get(linkmanId);
JSONObject linkmanJson = JSON.parseObject(linkmanJsonStr);
// 组装标签ID列表
Set<String> tagIds = linkmanTagsState.get(linkmanId);
List<String> tagList = tagIds != null ? new ArrayList<>(tagIds) : new ArrayList<>();
// 构建输出格式
JSONObject result = new JSONObject();
result.put("linkman_id", linkmanJson.getString("id"));
result.put("linkman_name", linkmanJson.getString("name"));
result.put("tagIdList", tagList);
out.collect(result.toJSONString()); // 输出到ES(实际使用时替换为ES Sink)
} else if (linkmanTagsState.contains(linkmanId)) {
// 监控指标:存在标签但无联系人的异常情况
getRuntimeContext().getMetricGroup().counter("orphaned-tags").inc();
}
}
3. 关键技术点解析
3.1 双向触发同步机制
本方案的核心优势是双向触发:
- 当联系人信息更新时,自动携带当前最新标签集合输出
- 当标签信息更新时,自动触发对应联系人的完整数据重新输出
这种机制确保了无论更新的是主数据还是关联数据,最终结果始终保持一致。
3.2 状态管理与过期策略
通过配置状态TTL(生存时间),避免长期运行的任务出现状态膨胀:
- 未更新的状态会在3天后自动清理
- 基于RocksDB的本地存储,支持大规模状态数据
3.3 批处理与定时触发
为平衡实时性和性能,采用混合触发策略:
- 批量触发:短时间内同一联系人变更达到50条时立即处理
- 定时触发:超过5秒未达到批量阈值时,强制触发处理
既避免了高频小批量写入的性能损耗,又保证了数据的实时性(延迟不超过5秒)。
部署与优化建议
检查点配置:建议检查点间隔设为1分钟,确保故障恢复时数据不丢失
env.enableCheckpointing(60000); // 1分钟一次检查点
并行度调整:根据数据量设置并行度(建议与Kafka分区数匹配),避免数据倾斜
状态后端优化:
- 生产环境建议使用RocksDB并配置本地SSD存储
- 开启预定义优化选项:
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
监控告警:关注以下指标:
invalid-events
:无效事件计数器(异常增长需排查数据格式)orphaned-tags
:孤立标签计数器(异常增长需排查数据关联逻辑)processing-errors
:处理错误计数器(需立即排查)
完整代码
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.*;
/**
* 企业-联系人-标签-联系方式同步程序
* 基于Flink 1.17.1实现,支持联系人与标签双向触发同步
* 确保联系人信息中始终包含最新的标签ID数组
*/
public class CompanyContactSync2 {
// 配置参数 - 根据实际环境修改
private static final String MYSQL_DATABASE = "db";
private static final String[] MYSQL_TABLES = {
MYSQL_DATABASE + ".company_info_area1",
MYSQL_DATABASE + ".company_info_area2",
MYSQL_DATABASE + ".company_info_area3",
MYSQL_DATABASE + ".company_info_area4",
MYSQL_DATABASE + ".company_linkman",
MYSQL_DATABASE + ".company_linkman_tag",
MYSQL_DATABASE + ".company_linkman_information"
};
public static final String TABLE_NAME = "db.company_linkman,db.company_linkman_tag,db.company_linkman_information" ;
private static final String KAFKA_BOOTSTRAP_SERVERS = "IP:9092";
private static final String KAFKA_TOPIC = "es-company-sync";
private static final String CHECKPOINT_PATH = "hdfs:///flink/checkpoints/es-sync";
private static final int PARALLELISM = 1;
private static final int CHECKPOINT_INTERVAL = 60000; // 1分钟
private static final long BUFFER_TIME_MS = 5000; // 5秒缓冲
private static final int BATCH_SIZE_THRESHOLD = 50; // 批量处理阈值
private static final long STATE_TTL_DAYS = 3; // 状态过期时间
public static void main(String[] args) throws Exception {
try {
System.setProperty("HADOOP_USER_NAME", "root");
// 1. 初始化执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 配置检查点
// configureCheckpointing(env);
// 3. 配置RocksDB状态后端
// configureRocksDBStateBackend(env);
// 4. 设置并行度
env.setParallelism(PARALLELISM);
Properties props = new Properties();
props.setProperty("useSSL", "false");
props.setProperty("allowPublicKeyRetrieval", "true");
// 5. 创建MySQL CDC源
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
.hostname(CommonConstants.MYSQL_HOSTNAME)
.port(CommonConstants.MYSQL_POST)
.databaseList(MYSQL_DATABASE)
.tableList(TABLE_NAME)
.username(CommonConstants.MYSQL_USERNAME)
.password(CommonConstants.MYSQL_PASSWORD)
.jdbcProperties(props)
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai")
.startupOptions(StartupOptions.latest())
.build();
// 6. 读取CDC数据
DataStream<String> cdcStream = env.fromSource(
mysqlSource,
WatermarkStrategy.noWatermarks(),
"mysql-cdc-source"
)
.setParallelism(1);
// 7. 过滤有效事件
DataStream<String> filteredStream = cdcStream
.filter(new RichFilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
try {
JSONObject json = JSON.parseObject(value);
String op = json.getString("op");
// 只处理增、删、改、读操作
return "c".equals(op) || "u".equals(op) || "d".equals(op) || "r".equals(op);
} catch (Exception e) {
getRuntimeContext().getMetricGroup().counter("invalid-events").inc();
return false;
}
}
})
.name("filter-valid-events");
// 8. 按联系人ID分组 - 核心改进:统一使用联系人ID作为key,确保相关数据进入同一处理节点
KeyedStream<String, String> keyedStream = filteredStream
.keyBy(event -> {
try {
JSONObject json = JSON.parseObject(event);
JSONObject source = json.getJSONObject("source");
String table = source.getString("table");
JSONObject after = json.getJSONObject("after");
JSONObject before = json.getJSONObject("before");
JSONObject data = "d".equals(json.getString("op")) ? before : after;
// 所有相关表都按联系人ID分组
if (table.contains("company_info_")) {
// 公司表使用特殊标识,不影响联系人分组
return "COMPANY_" + data.getString("company_info_id");
} else if ("company_linkman".equals(table)) {
// 联系人表直接使用联系人ID
return data.getString("id");
} else if ("company_linkman_tag".equals(table) ||
"company_linkman_information".equals(table)) {
// 标签和联系方式表使用关联的联系人ID
return data.getString("linkman_id");
}
} catch (Exception e) {
e.printStackTrace();
}
return "unknown";
});
// 9. 处理数据,转换为ES文档格式
DataStream<String> esDocStream = keyedStream
.process(new CompanyContactProcessor())
.name("company-contact-processor");
// 10. 打印输出(实际使用时可替换为KafkaSink)
esDocStream.print();
// 执行任务
env.execute("Company-Contact ES Sync Job");
} catch (Exception e) {
e.printStackTrace();
}
}
// 配置检查点
private static void configureCheckpointing(StreamExecutionEnvironment env) {
env.enableCheckpointing(CHECKPOINT_INTERVAL);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(30000);
checkpointConfig.setCheckpointTimeout(60000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setTolerableCheckpointFailureNumber(3);
}
// 配置RocksDB状态后端
private static void configureRocksDBStateBackend(StreamExecutionEnvironment env) throws Exception {
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(CHECKPOINT_PATH, true);
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
env.setStateBackend(rocksDbBackend);
}
/**
* 处理企业、联系人、标签和联系方式数据
* 核心改进:实现联系人与标签双向触发同步
*/
private static class CompanyContactProcessor extends KeyedProcessFunction<String, String, String> {
// 状态存储
private transient MapState<String, String> companyState; // 公司信息 key: companyId
private transient MapState<String, String> linkmanState; // 联系人信息 key: linkmanId
private transient MapState<String, Set<String>> linkmanTagsState; // 联系人标签ID集合 key: linkmanId
private transient MapState<String, List<String>> linkmanMethodsState; // 联系方式 key: linkmanId
private transient MapState<String, String> linkmanToCompanyState; // 联系人到公司的映射 key: linkmanId
// 批处理和定时器状态
private transient MapState<String, Integer> updateCounter;
private transient MapState<String, Long> timers;
@Override
public void open(Configuration parameters) {
// 配置状态TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(STATE_TTL_DAYS))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
// 初始化公司状态
MapStateDescriptor<String, String> companyDesc = new MapStateDescriptor<>(
"companyState", String.class, String.class);
companyDesc.enableTimeToLive(ttlConfig);
companyState = getRuntimeContext().getMapState(companyDesc);
// 初始化联系人状态
MapStateDescriptor<String, String> linkmanDesc = new MapStateDescriptor<>(
"linkmanState", String.class, String.class);
linkmanDesc.enableTimeToLive(ttlConfig);
linkmanState = getRuntimeContext().getMapState(linkmanDesc);
// 初始化联系人标签状态
MapStateDescriptor<String, Set<String>> tagsDesc = new MapStateDescriptor<>(
"linkmanTagsState", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Set<String>>() {}));
tagsDesc.enableTimeToLive(ttlConfig);
linkmanTagsState = getRuntimeContext().getMapState(tagsDesc);
// 初始化联系方式状态
MapStateDescriptor<String, List<String>> methodsDesc = new MapStateDescriptor<>(
"linkmanMethodsState", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<List<String>>() {}));
methodsDesc.enableTimeToLive(ttlConfig);
linkmanMethodsState = getRuntimeContext().getMapState(methodsDesc);
// 初始化联系人到公司的映射
MapStateDescriptor<String, String> linkmanToCompanyDesc = new MapStateDescriptor<>(
"linkmanToCompanyState", String.class, String.class);
linkmanToCompanyDesc.enableTimeToLive(ttlConfig);
linkmanToCompanyState = getRuntimeContext().getMapState(linkmanToCompanyDesc);
// 初始化计数器
updateCounter = getRuntimeContext().getMapState(
new MapStateDescriptor<>("updateCounter", String.class, Integer.class));
// 初始化定时器
timers = getRuntimeContext().getMapState(
new MapStateDescriptor<>("timers", String.class, Long.class));
}
@Override
public void processElement(String event, Context ctx, Collector<String> out) throws Exception {
String linkmanId = ctx.getCurrentKey();
if ("unknown".equals(linkmanId) || linkmanId.startsWith("COMPANY_")) {
// 处理公司数据(不影响联系人-标签同步逻辑)
if (linkmanId.startsWith("COMPANY_")) {
processCompanyEvent(event, out);
}
return;
}
try {
JSONObject eventJson = JSON.parseObject(event);
String table = eventJson.getJSONObject("source").getString("table");
String op = eventJson.getString("op");
JSONObject before = eventJson.getJSONObject("before");
JSONObject after = eventJson.getJSONObject("after");
JSONObject data = "d".equals(op) ? before : after;
// 根据不同表处理数据
switch (table) {
case "company_linkman":
handleLinkman(eventJson, data, op);
break;
case "company_linkman_tag":
handleLinkmanTag(eventJson, data, op);
break;
case "company_linkman_information":
handleLinkmanMethod(eventJson, data, op);
break;
}
// 更新计数器
Integer count = updateCounter.get(linkmanId);
count = count == null ? 1 : count + 1;
updateCounter.put(linkmanId, count);
// 达到阈值立即处理
if (count >= BATCH_SIZE_THRESHOLD) {
Long existingTimer = timers.get(linkmanId);
if (existingTimer != null) {
ctx.timerService().deleteProcessingTimeTimer(existingTimer);
}
processAndEmit(linkmanId, out);
resetBatchState(linkmanId);
return;
}
// 注册或更新定时器
long fireTime = ctx.timerService().currentProcessingTime() + BUFFER_TIME_MS;
Long existingTimer = timers.get(linkmanId);
if (existingTimer == null || fireTime > existingTimer) {
if (existingTimer != null) {
ctx.timerService().deleteProcessingTimeTimer(existingTimer);
}
ctx.timerService().registerProcessingTimeTimer(fireTime);
timers.put(linkmanId, fireTime);
}
} catch (Exception e) {
getRuntimeContext().getMetricGroup().counter("processing-errors").inc();
e.printStackTrace();
}
}
// 专门处理公司相关事件
private void processCompanyEvent(String event, Collector<String> out) throws Exception {
try {
JSONObject eventJson = JSON.parseObject(event);
String table = eventJson.getJSONObject("source").getString("table");
String op = eventJson.getString("op");
JSONObject data = "d".equals(op) ? eventJson.getJSONObject("before") : eventJson.getJSONObject("after");
if (data == null) return;
String companyId = data.getString("company_info_id");
if ("d".equals(op)) {
companyState.remove(companyId);
} else {
data.put("doc_type", "company");
companyState.put(companyId, data.toJSONString());
out.collect(data.toJSONString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 处理联系人信息
private void handleLinkman(JSONObject eventJson, JSONObject data, String op) throws Exception {
if (data == null) return;
String linkmanId = data.getString("id");
String companyId = data.getString("company_info_id");
if ("d".equals(op)) {
linkmanState.remove(linkmanId);
linkmanTagsState.remove(linkmanId);
linkmanMethodsState.remove(linkmanId);
linkmanToCompanyState.remove(linkmanId);
} else {
// 存储联系人到公司的映射
linkmanToCompanyState.put(linkmanId, companyId);
// 添加文档类型标识
data.put("doc_type", "linkman");
data.put("company_id", companyId);
linkmanState.put(linkmanId, data.toJSONString());
}
}
// 处理联系人标签 - 核心改进:无论联系人是否已存在,都更新标签状态
private void handleLinkmanTag(JSONObject eventJson, JSONObject data, String op) throws Exception {
if (data == null) return;
String tagId = data.getString("id");
String linkmanId = data.getString("linkman_id");
String deleteFlag = data.getString("delete_flag");
// 确保标签集合存在
Set<String> tagIds = linkmanTagsState.get(linkmanId);
if (tagIds == null) {
tagIds = new HashSet<>();
}
// 更新标签集合
if ("d".equals(op) || "1".equals(deleteFlag)) {
tagIds.remove(tagId);
} else {
tagIds.add(tagId);
}
// 保存更新后的标签集合
if (tagIds.isEmpty()) {
linkmanTagsState.remove(linkmanId);
} else {
linkmanTagsState.put(linkmanId, tagIds);
}
}
// 处理联系方式
private void handleLinkmanMethod(JSONObject eventJson, JSONObject data, String op) throws Exception {
if (data == null) return;
String methodId = data.getString("id");
String linkmanId = data.getString("linkman_id");
String methodJson = data.toJSONString();
List<String> methods = linkmanMethodsState.get(linkmanId);
if (methods == null) {
methods = new ArrayList<>();
}
if ("d".equals(op)) {
methods.removeIf(m -> {
JSONObject mJson = JSON.parseObject(m);
return methodId.equals(mJson.getString("id"));
});
} else {
// 先移除旧的再添加新的
methods.removeIf(m -> {
JSONObject mJson = JSON.parseObject(m);
return methodId.equals(mJson.getString("id"));
});
methods.add(methodJson);
}
if (methods.isEmpty()) {
linkmanMethodsState.remove(linkmanId);
} else {
linkmanMethodsState.put(linkmanId, methods);
}
}
// 定时器触发时处理
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
String linkmanId = ctx.getCurrentKey();
Long scheduledTime = timers.get(linkmanId);
if (scheduledTime != null && timestamp == scheduledTime) {
processAndEmit(linkmanId, out);
resetBatchState(linkmanId);
}
}
// 处理并输出数据 - 核心改进:确保即使先有标签后有联系人,也能正确关联
private void processAndEmit(String linkmanId, Collector<String> out) throws Exception {
// 处理联系人数据
if (linkmanState.contains(linkmanId)) {
String linkmanJsonStr = linkmanState.get(linkmanId);
JSONObject linkmanJson = JSON.parseObject(linkmanJsonStr);
// 添加标签ID集合 - 无论标签是在联系人之前还是之后到达
Set<String> tagIds = linkmanTagsState.get(linkmanId);
if (tagIds != null && !tagIds.isEmpty()) {
linkmanJson.put("tagIdList", new ArrayList<>(tagIds));
} else {
// 确保始终有tagIdList字段,即使为空
linkmanJson.put("tagIdList", new ArrayList<>());
}
// 添加联系方式
List<String> methods = linkmanMethodsState.get(linkmanId);
if (methods != null && !methods.isEmpty()) {
List<JSONObject> methodList = new ArrayList<>();
for (String methodStr : methods) {
methodList.add(JSON.parseObject(methodStr));
}
linkmanJson.put("methods", methodList);
}
// 输出最终格式:{linkman_id:'',linkman_name:'',tagIdList:[]}
JSONObject result = new JSONObject();
result.put("linkman_id", linkmanJson.getString("id"));
result.put("linkman_name", linkmanJson.getString("name"));
result.put("tagIdList", linkmanJson.getJSONArray("tagIdList"));
out.collect(result.toJSONString());
} else if (linkmanTagsState.contains(linkmanId)) {
// 核心改进:如果有标签但联系人还未到达,记录临时状态,等待联系人
// 这里可以根据需要添加监控指标,提示有孤立标签
getRuntimeContext().getMetricGroup().counter("orphaned-tags").inc();
}
}
// 重置批处理状态
private void resetBatchState(String linkmanId) throws Exception {
updateCounter.remove(linkmanId);
timers.remove(linkmanId);
}
}
}
总结
本文介绍的方案通过Flink CDC实现了联系人与标签数据的实时同步,核心在于:
- 实时性:基于binlog的变更捕获,确保数据延迟在秒级
- 一致性:双向触发机制+状态管理,确保关联数据始终同步
- 可靠性:完善的检查点和状态过期策略,支持7x24小时稳定运行