【Kafka】Kafka写入数据

发布于:2025-03-19 ⋅ 阅读:(22) ⋅ 点赞:(0)

不管是把Kafka作为消息队列还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者,一个可以从Kafka读取数据的消费者。

生产者

  • 创建一个ProducerRecord对象,包含目标topic和发送的内容;另外可以指定 键、分区、时间戳或标头
  • 对数据进行分区;
    • 如果没有显示指定分区,数据将会传给分区器,确定往哪个主题和分区发送数据。
    • 消息添加到一个消息批次,该批次所有的消息被发送到同一个主题和分区;有一个独立线程复杂把消息批次发送给目标broker。
  • broker接收到消息后会返回响应
    • 写入成功:返回信息包含主题和分区以及偏移量
    • 写入失败:生产者会重新发送消息,如果重试之后还是失败,会返回错误信息

image

创建生产者

必须的参数信息

  • bootstrap.servers:可以由多个host:port组长,用来建立初始Kafka集群连接。不需要所有的broker信息,建立后可以从给定的broker中获取其他broker信息。
  • key.serializer:类名,用来序列化消息的键。
  • value.serializer:类名,用来序列化消息的值。
	Properties kafkaProps = new Properties();

        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        producer = new KafkaProducer<String,String>(kafkaProps);

发送消息类型

  • 发送并忘记:消息发送给服务器,并不关心是否送达。如果发送失败/超时,会导致消息丢失
  • 同步发送:一般来说,生产者是异步,调用send()方法发送消息,会返回一个Future对象,调用get()方法等待完成,这样可以在发送下一个消息时,直到当前消息是否发送成功
  • 异步发送:调用send()方法,调用指定函数,当服务器返回响应时,会触发该函数。

其他参数

  • client.id:客户端表示符,broker用它来识别从客户端发送的消息
  • acks:指定生产者在多个少分区副本收到消息的情况下才会认为消息写入成功。
    • 0:生产者不会收到broker的确认消息
    • 1:只要集群收到首领副本的消息就会响应写入成功
    • all:所有副本全部收到消息时,生产者才会收到写入成功的响应。
  • buffer.memory:生产者发送给服务器的消息内存缓冲区大小。
  • compression.type:默认生产者发送数据未经压缩,
    • snappy:提供较好的性能和客观的压缩比
    • gzip:占用CPU较多,提高了压缩比
    • lz4
    • zstd
  • batch.size:一个批次可以使用的内存大小,按照字节数
  • max.in.filght.request.per.connection:指定了生产者在收到服务器响应之前可以发送多少个消息批次。值越大,占用内存越多,吞吐量升高。
  • max.request.size:生产者发送的请求的大小,限制了单条最大消息的大小。
  • receive.buffer.bytessend.buffer.bytes:这两个参数分别指定了TCP socket接收和发送数据包缓冲区大小。
  • enable.idempotence:精确一次性
    消息传递时间

有几个参数控制在调用send方法后多长时间可以直到消息发送成功与否。

  • max.block.ms:用于控制调用send()或通过partitionsFor()显式的请求元数据时,生产者可以发生阻塞的时间。
  • delivery.timeout.ms:用于控制从消息准备好发送,到broker响应或放弃发送所花费的时间。(该参数通常配置成愿意等待的最长时间,通常几分钟;这样只要生产者还有时间,就可以重试发送消息)
  • request.timeout.ms:用于控制生产则发送消息时,等待服务器响应的时间。
  • retry.backoff.ms:生产者收到来自服务器的错误消息,重试的时间间隔。
  • retries:失败时重试次数
  • linger.ms:指定生产者在发送消息批次之前等待多久。

分区

生产者对象中包含了主题、记录的键和值。默认情况下键=null
键有两种用途

  • 作为消息的附加消息,与消息保存在一起
  • 用来确定消息应该被写入主题的哪个分区
    • 具有相同键的消息被写入相同分区,根据键进行哈希算法,根据哈希将消息映射到特定分区
    • 如果键为null,并使用了默认分区,记录被随机发给主题的分区。分区器使用轮询调度算法将消息均衡的分布到各个分区中。
    • 使用键进行分区的问题:如果要使用键来映射分区,最好在创建主题时将分区规划好,不再增加新的分区,否则会导致新纪录被写到其他分区。

标头

可以在不该表键值对的情况下向标头添加一些有关记录的元数据。标头指明了记录数据的来源,可以在不解析消息体的情况下根据标头来跟组/路由消息

配额和节流

Kafka可以限制生产者消息和消费消息的速率,通过配额机制来实现。
配额类型

  • 生产:限制了客户端发送消息的速率
  • 消费:限制了消费端消费消息的速率
  • 请求:限制了broker用于处理客户端请求时间的百分比

如果异步调用send()方法,且发送速率超过了broker接收的速率,那么消息将被放入客户端内存队列,如果一直发送,耗尽了内存缓冲区,并且等待时间超过了超时时间,会抛出TimeoutExcption异常。


网站公告

今日签到

点亮在社区的每一天
去签到