请移步至最后的:2024-12-27日调整实现方式,26日实现的有bug,但是可以作为参考。
2024-12-27日调整实现方式
1.上边的方案,可以实现topic中每个分区都正常有数据的情况,但是如果部分分区没有数据,或者指定的消费时间戳位置,位于部分分区最后一条数据的后边,还是会报上边的错误。所以调整了实现方式,改为通过时间戳获取每个分区的消费offset,使用 scan.startup.specific-offsets 方式指定每个分区的消费位置启动。
2.步骤:① 根据时间戳,获取每个分区的消费位置,此时如果分区正常且连续,返回的 util.Map[TopicPartition, OffsetAndTimestamp] 中 OffsetAndTimestamp != null,此时当前分区直接使用返回的 offset 就行,但是如果 topic 中没有数据,或者数据的最后一条数据的时间在时间戳之前,会导致OffsetAndTimestamp == null,则需要第二步获取每个分区的起止 offset;
② 获取每个分区的起止offset,如果①中返回的OffsetAndTimestamp == null,则该分区使用结束offset即可,即等待分区的新数据到来。
③ 融合 ①、② 步的分区offset信息,生成完整分区的 HashMap[TopicPartition,Long],然后拼接 kafka sql 中的 scan.startup.specific-offsets
3.根据时间戳,获取每个分区的消费位置,这一步会返回分区时间戳后第一条offset,或者null:
// 根据时间戳获取分区 offset
def offsetsFromTimes(brokers: String, topic: String,timestamp: Long, group: String): util.Map[TopicPartition, OffsetAndTimestamp] ={
val props = new Properties();
props.put("bootstrap.servers", brokers)
props.put("group.id", group)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String,String](props);
var parOffsets: util.Map[TopicPartition, OffsetAndTimestamp] = null
try {
// 订阅 topic
consumer.subscribe(Collections.singletonList(topic))
// 获取并订阅全部分区
var assigment = consumer.assignment()
while (assigment.size() == 0) {
consumer.poll(Duration.ofMillis(1000L));
assigment = consumer.assignment();
}
// println(assigment)
val earliestOffset = consumer.beginningOffsets(assigment)
val lastOffset = consumer.endOffsets(assigment)
// println("assigment: " + assigment + ",topic earliestOffset:" + earliestOffset + ",topic endOffsets:" + lastOffset)
val map = new util.HashMap[TopicPartition,Long]()
val partitionIter = assigment.iterator()
while (partitionIter.hasNext){
map.put(new TopicPartition(topic,partitionIter.next().partition()),timestamp)
}
parOffsets = consumer.offsetsForTimes(map.asInstanceOf[JMap[TopicPartition,java.lang.Long]])
// val iter = parOffsets.keySet().iterator()
// while (iter.hasNext){
// val ele = iter.next()
// if(parOffsets.get(ele) != null){
// println(ele.topic() + " " + ele.partition() + " " + parOffsets.get(ele).offset() + " " + parOffsets.get(ele).timestamp())
// }else {
// println(ele.topic() + " " + ele.partition() + " " + null + " " + null)
// }
// }
consumer.close();
}
parOffsets
}
4.获取每个分区的最早、最晚offset,为 3 中为 null 的分区候补:
// 获取各个分区最早、最晚的 offset
def earliestLatestOffsetMap(brokers: String, topic: String,group: String): util.Map[TopicPartition, (Long,Long)] ={
// 存储返回的各个分区的最早,最晚 offset+时间戳
val retMap = new util.HashMap[TopicPartition, (Long,Long)]()
val props = new Properties()
props.put("bootstrap.servers", brokers);
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
val consumer = new KafkaConsumer[String,String](props);
try {
// 订阅 topic
consumer.subscribe(Collections.singletonList(topic))
// 获取并订阅全部分区
var assigment = consumer.assignment()
while (assigment.size() == 0) {
consumer.poll(Duration.ofMillis(1000L));
assigment = consumer.assignment();
}
// println(assigment)
val earliestOffset = consumer.beginningOffsets(assigment)
val lastOffset = consumer.endOffsets(assigment)
// println("assigment: " + assigment + ",topic earliestOffset:" + earliestOffset + ",topic endOffsets:" + lastOffset)
// 将最早的 offset 放入 retMap
val earliestOffsetIter = earliestOffset.entrySet().iterator()
while (earliestOffsetIter.hasNext){
val ele = earliestOffsetIter.next()
retMap.put(ele.getKey,(ele.getValue,0L))
}
// 将最晚的 offset 放入 retMap
val lastOffsetIter = lastOffset.entrySet().iterator()
while (lastOffsetIter.hasNext){
val ele = lastOffsetIter.next()
val startOffset = retMap.get(ele.getKey)
retMap.put(ele.getKey,(startOffset._1,ele.getValue))
}
consumer.close();
}
retMap
}
5.融合:
def getSeekParOffset(brokers: String, topic: String,timestamp: Long, group: String): util.HashMap[TopicPartition,Long] = {
// 根据时间戳获取分区 offset
val partitionOffsetByStampMap = offsetsFromTimes(brokers,topic,timestamp,group)
println("partitionOffsetByStampMap" + partitionOffsetByStampMap)
// 获取分区的最早、最晚 offset
val elOffsetMap = earliestLatestOffsetMap(brokers,topic,group)
println("elOffsetMap" + elOffsetMap)
// 根据 partitionOffsetByStampMap、elOffsetMap,获取最终的各分区订阅 offset Map
// ① 对于能够用时间戳获取到 offset 的分区,则直接采用 offset
// ② 对于使用时间戳没有获取到 offset 的分区,使用分区最后的 offset(0或者过期的offset)等待新数据即可:
// 两种情况:a、使用的时间戳落后于分区最后一条数据的时间戳;b、分区从来没有过数据或者数据都过期了
val toSeekPartitionOffset = new util.HashMap[TopicPartition,Long]()
// 遍历根据时间戳获取到的 offset
val offsetStampMapIter = partitionOffsetByStampMap.entrySet().iterator()
while(offsetStampMapIter.hasNext){
val ele = offsetStampMapIter.next()
if(ele.getValue != null){
toSeekPartitionOffset.put(ele.getKey,ele.getValue.offset())
}
}
// 遍历 elOffsetMap
val elOffsetMapIter = elOffsetMap.entrySet().iterator()
while (elOffsetMapIter.hasNext){
val ele = elOffsetMapIter.next()
if(!toSeekPartitionOffset.containsKey(ele.getKey)){
toSeekPartitionOffset.put(ele.getKey,ele.getValue._2)
}
}
toSeekPartitionOffset
}
6.拼接 specific-offsets:
def getSpecificOffset(brokers: String, topic: String,timestamp: Long, group: String): String = {
val seekOffset = getSeekParOffset(brokers: String, topic: String,timestamp: Long, group: String)
// 遍历 seekOffset,拼接 specific-offsets
// 如果最后一条数据在 toAssignmentTime 之前,则使用 latest-offset 启动消费,否则使用凌晨两点
val seekOffsetIter = seekOffset.entrySet().iterator()
var specificOffsets = ""
while(seekOffsetIter.hasNext){
val ele = seekOffsetIter.next()
// partition:0,offset:1;partition:1,offset:2;
specificOffsets = specificOffsets + "partition:" + ele.getKey.partition() + ",offset:" + ele.getValue + ";"
}
specificOffsets
}
7.生成 kafka sql 中,scan 位置的 sql 片段:
/**
* 根据 startFrom,判断是从什么位置消费。
*
* @param startFrom:earliest-offset,latest-offset,group-offsets,timestamp
* @return
*/
def getKafkaSQLScanStr(startFrom: String): String = {
var scanStartup = ""
if(Character.isDigit(startFrom.trim()(0))){
scanStartup =
"'scan.startup.mode' = 'timestamp'," +
s"'scan.startup.timestamp-millis' = '${startFrom.trim()}',"
}else if(startFrom.contains("partition")){
scanStartup =
"'scan.startup.mode' = 'specific-offsets'," +
s"'scan.startup.specific-offsets' = '${startFrom.trim()}',"
}else {
scanStartup =
s"'scan.startup.mode' = '${startFrom}',"
}
scanStartup
}
2024-12-26日实现
1 背景
1.使用 flink-1.16 的 table sql 消费 kafka数据,并使用 sql 计算指标,然后写入 doris;
2.指标计算时,需要统计当日数据条数,考虑到作业异常退出被重新拉起时,需要从零点开始消费,所以指定 'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳' 方式创建 kafka table:
s""" |CREATE TABLE qysfxjKafkaTable ( |xlid STRING, |available_status STRING, |sendtime STRING, |`ts` TIMESTAMP(3) METADATA FROM 'timestamp' |) WITH ( |'connector' = 'kafka', |'topic' = '${param.getProperty("qysfxjTopic")}', |'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳' |'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}', |'properties.bootstrap.servers' = '${param.getProperty("brokers")}', |'properties.auto.offset.reset' = 'earliest', |'json.ignore-parse-errors' = 'true', |'json.fail-on-missing-field' = 'false', |'format' = 'json') |""".stripMargin
3.启动时报 kakfa 的错误,Invalid negative offset,即 flink 使用了一个不正确的 offset 到 kafka 消费数据,经过排查 topic 中最新一条数据的时间,在今日零点之前,也就是说,kafka table sql 中指定今日零点的时间戳,落后于 kafka 最新数据的时间;
2 解决方案
1.两种解决方案,① 从检查点启动作业;② 根据 kafka 数据时间,调整消费的时间;考虑到第一次启动可能 topic 也没有数据,且如果检查点失败会导致作业无法从检查点恢复的情况,决定采用 ② 方案解决;
2.方案步骤
1.使用 kafka java api,获取 topic 中最后一条数据,根据数据的时间戳初始化创建 kafka table sql 的启动时间;
2.获取到 kafka 最后一条数据的场景有两种:① kafka 中最新一条数据时间早于零点(报错的场景);② kafka 中最新一条数据时间晚于零点;
3.根据以上步骤,实现代码,代码会返回一个时间戳,0或者最后一条数据时间戳:
def getTopicLatestRecordTimeStamp(brokers: String,topic: String): Long ={
var retMillis = 0L
val props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
val consumer = new KafkaConsumer[String,String](props);
try {
// 订阅 topic
consumer.subscribe(Collections.singletonList(topic))
// 获取并订阅全部分区
var assigment = consumer.assignment()
while (assigment.size() == 0) {
consumer.poll(Duration.ofMillis(1000L))
assigment = consumer.assignment()
}
println(assigment)
val earliestOffset = consumer.beginningOffsets(assigment)
val lastOffset = consumer.endOffsets(assigment)
println("assigment: " + assigment + ",topic earliestOffset:" + earliestOffset + ",topic endOffsets:" + lastOffset)
// 遍历所有分区,获取最新的 offset
val lastOffsetIter = lastOffset.entrySet().iterator()
while (lastOffsetIter.hasNext){
val ele = lastOffsetIter.next()
if(ele.getValue != 0L){
// 分区有数据
consumer.seek(ele.getKey, ele.getValue - 1)
val records = consumer.poll(1000).iterator()
while (records.hasNext){
val next = records.next()
if(next.timestamp() > retMillis){
retMillis = next.timestamp()
}
System.out.println("Timestamp of last record: " + next.timestamp())
}
}
}
retMillis
} finally {
consumer.close();
}
}
4.根据返回的时间戳,于当日零点判断,如果返回的时间戳早于零点,使用 latest-offset,返回的时间戳晚于当日零点,使用零点启动即可,以下代码返回使用的是时间戳启动,还是 latest-offset 启动:
if(parameterTool.get("qysfxjTopicStartFrom","latest").equals("latest")){
val toAssignmentTime = TimeHandler.getMidNightMillions()
val latestTime = KfkUtil.getTopicLatestRecordTimeStamp(pro.get("brokers").toString,pro.get("qysfxjTopic").toString)
// 如果最后一条数据在 toAssignmentTime 之前,则使用 latest-offset 启动消费
if(toAssignmentTime > latestTime){
pro.put("qysfxjTopicStartFrom","latest-offset")
}else {
pro.put("qysfxjTopicStartFrom",(toAssignmentTime).toString)
}
}else {
pro.put("qysfxjTopicStartFrom",parameterTool.get("qysfxjTopicStartFrom"))
}
5.根据时间戳,还是 latest-offset,生成 sql 中的 scan 片段:
/**
* 根据 startFrom,判断是从什么位置消费。
*
* @param startFrom:earliest-offset,latest-offset,group-offsets,timestamp
* @return
*/
def getKafkaSQLScanStr(startFrom: String): String = {
var scanStartup = ""
if(Character.isDigit(startFrom.trim()(0))){
scanStartup =
"'scan.startup.mode' = 'timestamp'," +
s"'scan.startup.timestamp-millis' = '${startFrom.trim()}',"
}else {
scanStartup =
s"'scan.startup.mode' = '${startFrom}',"
}
scanStartup
}
6.完整table sql 拼接:
val qysfxjKafkaSource =
s"""
|CREATE TABLE qysfxjKafkaTable (
|xlid STRING,
|available_status STRING,
|sendtime STRING,
|`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
|) WITH (
|'connector' = 'kafka',
|'topic' = '${param.getProperty("qysfxjTopic")}',
|${TXGJUtils.getKafkaSQLScanStr(qysfxjTopicStartFrom)}
|'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}',
|'properties.bootstrap.servers' = '${param.getProperty("brokers")}',
|'properties.auto.offset.reset' = 'earliest',
|'json.ignore-parse-errors' = 'true',
|'json.fail-on-missing-field' = 'false',
|'format' = 'json')
|""".stripMargin