当flink消费kafka时,只需要简单配置就能使用并正常运行
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
val stream1 = env.addSource(consumer1)
stream1.print()
env.execute("KafkaSourceStreaming")
但是,这里用的是最简单的SimpleStringSchema,所以接收到的数据只是我们所理解的一条消息里的值,其包含的时间戳、offset、topic、partition等元信息都不能正常获取,当需要该部分信息时,可以利用KafkaDeserializationSchema
接口来实现自定义的反序列化逻辑。
object KafkaSourceStreaming {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
val stream1 = env.addSource(consumer1)
stream1.print()
val consumer = new FlinkKafkaConsumer("mytest",new CustomKafkaDeserializationSchema(), props)
val stream = env.addSource(consumer)
stream.print()
env.execute("KafkaSourceStreaming")
}
/**
* 获取kafka元数据信息
*/
class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
val key = if (record.key() == null) null else new String(record.key())
val value = new String(record.value())
new ConsumerRecord[String, String](
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
key,
value,
record.headers(),
record.leaderEpoch()
)
}
override def isEndOfStream(nextElement: ConsumerRecord[String, String]): Boolean = false
override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {
TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
}
}
}