flink消费kafka时获取元数据信息

发布于:2024-06-14 ⋅ 阅读:(39) ⋅ 点赞:(0)

当flink消费kafka时,只需要简单配置就能使用并正常运行

 val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")


    val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    val stream1 = env.addSource(consumer1)
    stream1.print()

    env.execute("KafkaSourceStreaming")

但是,这里用的是最简单的SimpleStringSchema,所以接收到的数据只是我们所理解的一条消息里的值,其包含的时间戳、offset、topic、partition等元信息都不能正常获取,当需要该部分信息时,可以利用KafkaDeserializationSchema 接口来实现自定义的反序列化逻辑。

object KafkaSourceStreaming {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")


    val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    val stream1 = env.addSource(consumer1)
    stream1.print()

    val consumer = new FlinkKafkaConsumer("mytest",new CustomKafkaDeserializationSchema(), props)
    val stream = env.addSource(consumer)
    stream.print()

    env.execute("KafkaSourceStreaming")
  }


  /**
    * 获取kafka元数据信息
    */
  class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {
    override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
      val key = if (record.key() == null) null else new String(record.key())
      val value = new String(record.value())
      new ConsumerRecord[String, String](
        record.topic(),
        record.partition(),
        record.offset(),
        record.timestamp(),
        record.timestampType(),
        record.checksum(),
        record.serializedKeySize(),
        record.serializedValueSize(),
        key,
        value,
        record.headers(),
        record.leaderEpoch()
      )
    }

    override def isEndOfStream(nextElement: ConsumerRecord[String, String]): Boolean = false

    override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {
      TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
    }
  }
}