Thinkphp6 应用RdKafka插件封装工具类
<?php
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\Producer;
use RdKafka\TopicConf;
use RdKafka\TopicPartition;
use think\facade\Log;
class RdKafkaUtil
{
//生产者实例池,支持多集群链接
private static array $producerPool = [];
//声明一个可为空的 \RdKafka\Consumer 类型的当前消费者实例
private ?KafkaConsumer $consumer = null;
//默认配置参数,可通过config/rdkafka.php覆盖
private array $config = [
// 集群地址列表(多个地址用逗号分隔)
// 示例: 'kafka1:9092,kafka2:9092'
'bootstrap_servers' => '127.0.0.1:9092',
// 默认消息主题(需预先在Kafka创建){环境}_{业务}_{group}
'default_topic' => 'rd_think_topic',
// 消费者组标识(同一组内共享消费进度)(动态生成示例:'prod_payment_group'/'test_logs_group')
/*
* 不同业务需使用独立组名实现消息隔离
* 格式:环境_业务_组
* 环境标识 prod/test 区分生产与测试环境流量
* 业务类型 order/payment 实现业务级消息隔离
* 固定后缀 _group 统一标识消费者组属性
*/
'default_group' => 'test_rd_group',
// 消息持久化确认策略(金融场景建议设为'all')
/*
* acks=0: 不等待确认(最快但可能丢失数据)
* acks=1: Leader确认即成功(平衡方案)
* acks=all: 所有ISR副本确认(最可靠)
*/
'ack_level' => 'all',
// 网络超时(毫秒)高并发场景建议≥10秒
'socket_timeout' => 10000,
// 建议与`session.timeout.ms`(默认10秒)保持一致
'session_timeout' => 10000,
// 压缩算法(高吞吐场景建议'snappy',示例:none/gzip/snappy/lz4)
//snappy:CPU占用低,适合实时场景;gzip:压缩率高,适合离线数据处理;lz4:平衡压缩率与性能,通用推荐
"compression" => 'lz4',
'retries' => 5,
// 增加重试间隔配置(单位:毫秒)默认1秒间隔,避免高频重试导致Broker压力
'retry_backoff' => 1000
];
/**
* 构造函数(加载框架配置)
* @param array $config 自定义配置(可选)
*/
public function __construct(array $config = []){
// 合并用户配置与默认配置:ml-citation{ref="5" data="citationList"}
$this->config = array_merge($this->config, config('rdkafka', []), $config);
}
/**
* 初始化生产者实例
* @param string $instanceKey 实例标识(用于多集群区分)
*/
public function initProduce(string $instanceKey='default')
{
// 验证基础配置是否存在:ml-citation{ref="3,5" data="citationList"}
if (empty($this->config['bootstrap_servers'])) {
throw new InvalidArgumentException("必须配置bootstrap.servers参数");
}
// 创建生产者配置对象(框架初始化标准操作)
$conf = new Conf();
// 设置Kafka集群地址(必填项,多个用逗号分隔)(需与`bootstrap_servers`配置项保持一致)
$conf->set('bootstrap.servers', $this->config['bootstrap_servers']);
// 定义消息发送超时时间(单位:毫秒,建议≥业务处理峰值时间),可选值:3000-30000
$conf->set('message.timeout.ms', (string)($this->config['socket_timeout'] ?? 10000));
// 配置消息压缩算法(高吞吐场景建议'snappy',默认'none')可选值:none/gzip/snappy/lz4,
$conf->set('compression.type', $this->config['compression'] ?? 'none');
// 设置消息确认级别,可选值:0(无需确认)/1(leader确认)/all(ISR副本确认)
$conf->set('acks', $this->config['ack_level']);
// 生产者重试策略(默认3次重试,防止网络抖动导致消息丢失)建议值:3-5
$conf->set('retries', $this->config['retries'] ?? '3');
// 注册消息发送回调(记录投递成功日志)
$conf->setDrMsgCb(function ($kafka,$message){
file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
if ($message->err) {
// 集成框架日志组件
Log::error("消息发送失败", ['err_str' => $message->errstr()]);
} else {
// 生产环境建议关闭调试日志
Log::debug("消息已送达分区".$message->partition);
}
});
// 注册错误回调(记录错误日志并抛出异常中断流程)示例:$error为错误码,$reason为描述
$conf->setErrorCb(function($kafka,int $error,string $reason) {
file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($error), $reason).PHP_EOL, FILE_APPEND);
throw new RuntimeException("Kafka连接异常: $reason (错误码: $error)");
});
// 将生产者实例存入对象池(单例模式管理),避免重复创建
if (!isset(self::$producerPool[$instanceKey])) {
$producer = new Producer($conf);
// 动态添加Broker节点(应对集群扩容场景)
$producer->addBrokers($this->config['bootstrap_servers']);
self::$producerPool[$instanceKey] = $producer;
}
}
/**
* 发送消息到指定主题
* @param string $message 消息内容 建议JSON格式
* @param string $topic 目标主题(默认使用配置主题)
* @param string $instance 指定生产者实例
* @param string $messageKey 设置消息键实现分区顺序
* @return bool 发送是否成功
*/
public function sendMessage(string $message,string $topic = '',string $instance = 'default',string $messageKey=''): bool
{
// 获取生产者实例(多实例隔离逻辑)
if (!isset(self::$producerPool[$instance])) {
throw new RuntimeException("生产者实例{$instance}未初始化,请先调用initProduce()");
}
// 从对象池获取生产者实例(实现多实例隔离)
$producer = self::$producerPool[$instance];
// 确定目标主题(优先级:参数 > 配置 > 默认值)
$topicName = $topic ?: $this->config['default_topic'];
// 创建主题配置对象(默认自动选择分区)
$topicConfig = new TopicConf();
// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$topicConfig->set('request.required.acks', $this->config['ack_level'] ?? '1');
// 创建主题生产者对象(自动处理分区路由)
$producerTopic = $producer->newTopic($topicName,$topicConfig);
// 发送消息到随机分区(RD_KAFKA_PARTITION_UA表示自动分配)
// $messageKey 消息键(相同键分配到同一分区,如订单ID)
$partition = RD_KAFKA_PARTITION_UA;
$producerTopic->produce($partition, 0, $message,$messageKey);
//立即触发网络发送(非阻塞模式)
$producer->poll(0);
// 等待消息队列清空,单位:毫秒(最大等待2秒)
$result = $producer->flush(2000);
//返回发送结果(RD_KAFKA_RESP_ERR_NO_ERROR表示成功)
return $result === RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* 批量发送消息到指定主题
* @param string $topicName 目标主题名称
* @param array $messages 消息内容数组
* @param string $instanceKey 生产者实例标识
* @return bool 全部发送成功返回true
*/
public function sendBatchMessages(string $topicName, array $messages, string $instanceKey = 'default'): bool
{
try {
// 校验生产者实例是否存在(实现多实例隔离机制)
if (!isset(self::$producerPool[$instanceKey])) {
throw new Exception("生产者实例{$instanceKey}未初始化,请检查initProduce调用");
}
// 从实例池获取已初始化的生产者对象
$producer = self::$producerPool[$instanceKey];
// 配置主题级可靠性参数(继承全局ack_level设置)可选值:0/1/all
$topicConf = new TopicConf();
$topicConf->set('request.required.acks', $this->config['ack_level'] ?? '1');
// 获取主题句柄(自动处理分区路由和元数据同步)
$topic = $producer->newTopic($topicName,$topicConf);
// 批量发送消息核心逻辑(循环处理每条消息)
foreach ($messages as $msg) {
// 向随机分区发送消息
$topic->produce(RD_KAFKA_PARTITION_UA, 0, is_array($msg)?json_encode($msg):$msg,$msg['key'] ?? null);
// 每发送100条触发一次网络请求(平衡吞吐与延迟)
if (count($messages) % 100 === 0) {
// 非阻塞模式立即发送
$producer->poll(0);
}
}
// 统一触发剩余消息发送(避免消息积压)
$producer->poll(0);
// 等待所有消息确认(超时时间建议≥批量大小×单条处理时间)
$result = $producer->flush(3000); // 单位:毫秒,示例:3秒
// 返回发送结果(严格校验错误码)
return $result === RD_KAFKA_RESP_ERR_NO_ERROR;
} catch (Exception $e) {
// 错误处理双写日志(文件日志与系统日志分离)
$errorMsg = "Kafka批量发送失败[主题:".$topicName."]: " . $e->getMessage();
file_put_contents("./err_cb.log", date('[Y-m-d H:i:s] ') . $errorMsg . PHP_EOL, FILE_APPEND);
Log::error($errorMsg);
return false;
}
}
/**
* 初始化消费者实例
* @param string|array $topicName 单个主题或主题数组
* @param string $groupId 消费者组ID(覆盖默认配置)
* @throws \RdKafka\Exception
*/
public function initConsumer($topicName,string $groupId = ''): void
{
// 创建Kafka配置对象(用于设置消费者参数)
$conf = new Conf();
// 设置集群地址(支持多节点容灾,格式:"host1:port1,host2:port2")
if (empty($this->config['bootstrap_servers'])) {
throw new InvalidArgumentException("缺少必要配置项:bootstrap_servers");
}
// 设置Kafka集群地址(从配置中读取bootstrap_servers参数)
$conf->set('bootstrap.servers', $this->config['bootstrap_servers']);
// 设置消费者组ID(优先使用传入参数,否则用默认配置)消费者组唯一标识(实现消费进度协同管理)
$finalGroupId = $groupId ?: ($this->config['group_id'] ?? 'default_group');
if (empty($finalGroupId)) {
throw new RuntimeException("消费者组ID不能为空");
}
$conf->set('group.id', $finalGroupId);
// 定义消费偏移量策略,可选值:earliest/latest/none('earliest'表示从最早未消费消息开始)
$conf->set('auto.offset.reset', 'earliest');
// 配置长轮询参数(避免消费者被误判离线)
//会话超时时间(毫秒),超时则触发消费者组重平衡
$conf->set('session.timeout.ms', strval($this->config['session_timeout'] ?? 30000));
//心跳间隔时间(毫秒),保持消费者在线状态
$conf->set('heartbeat.interval.ms', strval($this->config['heartbeat_interval'] ?? 10000));
// 初始化Kafka消费者实例(绑定配置参数)
$this->consumer = new KafkaConsumer($conf);
// 确定订阅主题(支持多主题数组)
$finalTopic = $topicName ?: ($this->config['default_topic'] ?? 'default_topic');
// 示例:["topic1", "topic2"]
$this->consumer->subscribe([$finalTopic]);
// 记录初始化日志(用于调试与监控)
Log::info("Kafka消费者初始化完成", [
'topic' => $finalTopic,
'group_id' => $finalGroupId,
'cluster' => $this->config['bootstrap_servers']
]);
}
/**
* 消费单条消息
* @Notes:
* @Interface consumeMessage
* @param int $timeout 超时时间(单位:毫秒),示例值:6000(6秒)
* @param bool $autoCommit 是否自动提交偏移量,可选值:true/false
* @return string|null
* @throws \RdKafka\Exception
*/
public function consumeMessage(int $timeout=12000, bool $autoCommit = true): ?string
{
// 1. 校验消费者实例初始化状态(依赖initConsumer前置调用)
if (!$this->consumer) {
throw new RuntimeException("消费者实例未初始化,请先调用initConsumer()");
}
// 2. 配置偏移量提交策略(true=自动提交,false=手动提交)
$topicConf = new TopicConf();
$topicConf->set('auto.commit.enable', $autoCommit ? 'true' : 'false');
// 3. 拉取消息(超时单位:毫秒,建议设置≥消费者心跳间隔)
$message = $this->consumer->consume($timeout);
// 4. 处理空消息场景(需结合业务重试机制)
if ($message === null) {
Log::debug("无可用消息,触发长轮询等待");
return null;
}
$return_msg = null;
// 处理消息消费状态码
switch ($message->err) {
// 成功消费消息
case RD_KAFKA_RESP_ERR_NO_ERROR:
$return_msg = $message->payload;
break;
// 分区无新消息
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
Log::info("分区{$message->partition}已消费完毕,等待新消息");
break;
// 消费超时
case RD_KAFKA_RESP_ERR__TIMED_OUT:
Log::warning("消费超时,检查网络或调整timeout参数");
break;
// 其他错误(如网络问题、鉴权失败等)
default:
if ($this->isRetryableError($message->err)) {
throw new RetryableException("可重试错误: " . $message->errstr());
}
Log::error("致命错误: " . $message->errstr());
throw new FatalException("消息消费失败");
}
// 7. 手动提交偏移量(需严格保证消息处理成功后再提交)
if (!$autoCommit && $message->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
$this->consumer->commit([
new TopicPartition(
$message->topic_name,
$message->partition,
$message->offset + 1 // 提交下一个待消费偏移量
)
]);
}
// 仅在成功消费时返回消息内容
if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
return $return_msg;
}
return null;
}
/**
* 批量消费消息
* @param int $batchSize 每批处理数量
* @param int $timeout 单次消费超时
* @param callable $callback 批量处理回调
* @throws \RdKafka\Exception
*/
public function consumeBatchMessages(int $batchSize,int $timeout,callable $callback,$autoCommit = false): void
{
// 校验消费者实例是否初始化(依赖initConsumer方法初始化)
if (!$this->consumer) throw new RuntimeException("消费者未初始化");
$messages = [];
// 用于手动提交的偏移量记录
$offsets = [];
// 每处理N条消息后清理内存
$counter = 0;
while (count($messages) < $batchSize) {
if ($msg = $this->consumeMessage($timeout)) {
$messages[] = $msg;
}
if (++$counter % 50 === 0) {
// 主动触发GC
gc_collect_cycles();
}
}
// 手动提交偏移量(当有消息时)
if (!$autoCommit && !empty($offsets)) {
$this->consumer->commit($offsets);
}
call_user_func($callback, $messages);
}
/**
* 优雅关闭消费者
* @Notes:
* @Interface closeConsumer
* @throws \RdKafka\Exception
*/
public function closeConsumer(): void
{
if ($this->consumer) {
// 取消订阅所有主题(停止接收新消息)
$this->consumer->unsubscribe();
// 执行优雅关闭(等待未完成操作,如偏移量提交)
$this->consumer->close();
// 释放资源(避免内存泄漏)
$this->consumer = null;
}
}
/**
* 可重试错误判断逻辑
* @Notes:
* @Interface isRetryableError
* @param int $errCode
* @return bool
*/
private function isRetryableError(int $errCode): bool {
// 根据业务需求扩展错误码列表
$retryableCodes = [
RD_KAFKA_RESP_ERR__TRANSPORT,
RD_KAFKA_RESP_ERR__TIMED_OUT
];
return in_array($errCode, $retryableCodes);
}
}
class FatalException extends LogicException
{
public function __construct(string $message = "", int $code = 0, $previous = null)
{
parent::__construct($message, $code, $previous);
}
}
<?php
// 可重试异常(用于网络抖动、临时资源不足等场景)
class RetryableException extends RuntimeException {
public function __construct(string $message = "", int $code = 0, $previous = null) {
parent::__construct($message, $code, $previous);
}
}