kafka(七)——消息偏移(消费者)

发布于:2024-05-10 ⋅ 阅读:(30) ⋅ 点赞:(0)

概念

消费者消费完消息后,向_consumer_offset主题发送消息,用来保存每个分区的偏移量。

在这里插入图片描述

流程说明

  1. consumer发送JoinGroup请求;
  2. coordinator选出一个consumer作为leader,并将topics发送给leader消费者;
  3. leader consumer负责制定消费方案;
  4. leader consumer将消费方案发送给coordinator;
  5. coordinator将消费方案发送给CG中的每个consumer;
  6. 每个consumer与coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该consumer被移除,触发再平衡,或者消费者处理消息过长(max.poll.interval.ms=300s),也会触发再平衡;

适用场景

消费者数量发生变化、消费者订阅主题发生变化或者分区数量发生变化时,会触发kafka的再平衡(Rebalance),再平衡后,消费者可能被分到新的分区,为保证高可用和伸缩性,消费者需要读取每个分区最后一次偏移量。

注意:再平衡期间,群组不可用,消费者无法读取消息。

再平衡(Rebalance)

再平衡(Rebalance),是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。

触发场景

  • 消费者个数发生变化,有新的消费者或分组中的消费者停止消费;
  • 订阅的主题(topic)个数发生变化;
  • 订阅的主题分区发生变化(partition);

影响

  • 再平衡时,消费者组下的所有消费者都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配;
  • 再平衡过程会对消费者组产生非常严重的影响,所有的消费者都将停止工作,直到再平衡执行完成;

分区分配策略

Range范围分配策略

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
算法

n = 分区数量 / 消费者数量

m = 分区数量 % 消费者数量

前m个消费者消费n+1个,剩余消费者消费n个

图解

n = 2 = 8/3

m = 2 = 8%3

前2个消费者消费(2+1)个,剩余消费者消费2个。

在这里插入图片描述

RoundRobin轮询策略

将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
图解

在这里插入图片描述

Stricky粘性分配策略

在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.StickyAssignor
图解
  • 故障前

在这里插入图片描述

  • 故障后

在这里插入图片描述

代码示例

// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
{
public:
	// 消费者组再平衡回调
	void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
		std::vector<RdKafka::TopicPartition *> &partitions) 
	{
		if (RdKafka::ERR__ASSIGN_PARTITIONS == err)  // 分区分配成功
		{
			// 消费者订阅这些分区
			consumer->assign(partitions);
			// 获取消费者组本次订阅的分区数量,可以属于不同的topic
			m_partitionCount = (int)partitions.size();
		} 
		else   // 分区分配失败
		{
			// 消费者取消订阅所有的分区
			consumer->unassign();
			// 消费者订阅分区的数量为0
			m_partitionCount = 0;
		}
	}

private:
	int m_partitionCount;    // 消费者组本次订阅的分区数量
};


RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
    printf("create conf failed\n");
    return;
}

std::string errorStr = ""; 
RdKafka::RebalanceCb* rebalance_cb = new ConsumerRebalanceCb;
RdKafka::Conf::ConfResult errorCode = t_config->set("rebalance_cb", rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("set conf(rebalance_cb) failed, err:%s\n", errorStr.c_str());
    delete t_config;
    return;
}

提交方式

自动提交

参数配置

# 默认自动提交,消费者close时也会自动提交
enable.auto.comnit=true

# 自动提交周期,默认5s
auto.commit.interval.ms=5000

代码示例

RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
	// 消费消息
	ConsumeMsg_(msg);

    // 消息消费完后无需手动处理,kafka自动提交偏移
    delete msg;
}

存在的问题

如果在周期5s内发生再平衡,导致偏移量未提交,未提交的消息会被重复消费。

手动提交

参数配置

RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
    printf("create conf failed\n");
    return;
}

RdKafka::Conf* topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (NULL == topicConfig) 
{
    printf("create topic conf failed\n");
    delete t_config;
    return;
}

std::string errorStr = ""; 
RdKafka::Conf::ConfResult errorCode = topicConfig->set("enable.auto.commit", " false", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
    printf("set topic conf(enable.auto.commit) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

// 设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
errorCode = topicConfig->set("auto.offset.commit", " earliest", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
    printf("set topic conf(auto.offset.commit) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

// 默认 topic 配置,用于自动订阅 topics
errorCode = t_config->set("default_topic_conf", topicConfig, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("set conf(default_topic_conf) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

同步提交

  • 消息消费完,手动调用commitSync;
  • 在同步提交未完成的情况下发生再平衡,消息会被重复消费;
  • commitSync会阻塞直到偏移提交成功;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
    // 消费消息
    ConsumeMsg_(msg, NULL);

    // 开启手动提交
    m_consumer->commitSync(); 
    delete msg;
}

异步提交

  • 消息消费完,手动调用commitAsync;
  • commitAsync不会重试提交偏移量;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
    // 消费消息
    ConsumeMsg_(msg, NULL);

    // 开启手动提交
    m_consumer->commitAsync(); 
    delete msg;
}

存在的问题

重复消费(同步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为earliest;
  • 上次提交的偏移量为1;
  • 由于网络故障、超时等原因,2~7已消费完的情况下,8未提交成功,由于设置了参数auto.offset.commit=earliest,分区再平衡后会继续从2开始消费,会导致消息重复消费的问题;
消息丢失(异步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为latest;
  • 上次提交的偏移量为1;
  • 本次消费的偏移量范围为27,消费者立马提交了偏移量8,由于网络故障、超时等原因,27未消费完,由于设置了参数auto.offset.commit=latest,再平衡后会继续从8开始消费,会导致消息重复丢失的问题;

解决方案

根据实际场景选择同步提交还是异步提交。如果对消息可靠性要求比较高,不允许数据丢失,建议选择同步提交+“auto.offset.commit=earliest”,性能略差。