消费者和消费者组
消费者组
Kafka的消费者从属于消费者组,一个组里面的消费者订阅的是同一个主题,每个消费者负责从这个主题读取部分消息。
- topic1有4个分区,创建消费C1,群组G1,C1是G1中唯一的消费者,订阅topic1,那么消费者C1将收到topic1中4个分区所有的数据
- 消费者组G1中添加消费者C2,那么每个消费者收到两个分区的数据
- 如果消费者组中消费者超过分区数,则有部分消费者将空闲
- 多个应用订阅同一个topic,如果想要获取topic所有数据,则需要保证每个应用都有自己的消费者组
消费者组和分区再均衡
当一个新的消费者加入消费者组或者一个消费者关闭或崩溃离开消费组,原本由他读取的分区分配给其他消费者读取。
分区的所有权从一个消费者转移到另一个消费者的行为成为再均衡
再均衡类型
- 主动再均衡:主动再均衡期间,所有消费者都会停止读取数据,放弃分区所有权,重新加入消费者组,重新分配分区。
- 协作再均衡:增量再均衡,将一个消费者的部分分区重新分配给另一个消费者,其他消费者继续读取没有被重新分配的分区。
消费者会向被指定为群组协调器的broker
发送心跳,以此来保持群组成员关系和对分区所有权的关系。
创建消费者
消费者参数
bootstrap.servers
:用于指定链接Kafka集群的参数key.deserializer
:类名,反序列化key的值value.deserializer
:类名,反序列化value的值fetch.min.bytes
:消费者从服务器获取记录的最小字节数,默认1字节,broker收到消费者获取数据请求时,如果数据量小于fetch.min.bytes
的大小,会等待有足够数据才会进行返回fetch.max.wait.ms
:让Kafka等待指定时间,就进行返回,默认500毫秒fetch.max.bytes
:指定了Kafka返回数据的最大字节数,用来限制消费者存放数据的内存大小。记录是分批发送给客户端的,如果批次大小超过了该属性限制,那么限制将被忽略,为了保证消费者能够正常梳理数据。max.poll.records
:用于控制单次调用poll
方法返回记录的条数。用来控制单次轮询时需要处理的记录的条数。max.partition.fetch.bytes
:用于指定服务器从每个分区返回给消费者最大字节数。session.timeout.ms
和heartbeat.interval.ms
:session.timeout.ms
指定了消费者可以在多长时间,heartbeat.interval.ms
:指定了消费者向协调器发送心跳的频率。max.poll.interval.ms
:指定了消费者被认定”死亡“之前多长时间内不发起轮询。default.api.timeout.ms
:调用消费者API没有显示的指定超时时间,那么消费者会在调用其他API时,指定这个属性,默认1分钟。request.timeout.ms
:指定了消费者在收到broker响应前,可以等待的最长时间。默认30秒auto.offset.reset
:指定了消费者在读取一个没有偏移量或偏移量无效的分区时,该做何处理。- latest:如果没有有效偏移量,从最新的记录开始消费
- earliest:如果没有有效偏移量,从起始位置开始消费
enable.auto.commit
:指定了是否自动提交偏移量,默认trueclient.id
:用于标识从客户端发送请求group.instance.id
:用于消费者组的固定名称
提交和偏移量
更新分区当前位置的操作叫做偏移量提交。
Kafka不会提交每一条记录,消费者会将已成功处理的最后一条消息提交给Kafka并假定该消息之前的消息都已经成功处理。