kafka发送消息,同时支持消息压缩和不压缩

发布于:2025-04-18 ⋅ 阅读:(25) ⋅ 点赞:(0)

1、生产者配置

nacos中增加配置,和公共spring-kafka配置字段有区分

需要发送压缩消息时,使用该配置类发送即可


import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

/**
 * @date 2025/3/31 13:50
 * @description:
 */
@Configuration
public class KafkaTemplateConfig {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private KafkaTemplate<String, String> compressedKafkaTemplate;

    @Value("${spring.kafka.producer.compression-format:lz4}")
    private String compressionFormat;

    @Value("${spring.kafka.producer.max-request-size:10485760}")
    private String maxRequestSize;

    @PostConstruct
    public void initProducerConfig() {
        DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaTemplate.getProducerFactory().getConfigurationProperties());

        Map<String, Object> configs = new HashMap<>(2);
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionFormat);
        configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,maxRequestSize );
        factory.updateConfigs(configs);
        compressedKafkaTemplate = new KafkaTemplate<>(factory);
    }

    public KafkaTemplate<String, String> getCompressedKafkaTemplate() {
        return compressedKafkaTemplate;
    }

}

2、使用

关键代码:

kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody)
                        .addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),
                                failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));


import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.chint.anneng.finance.common.log.http.HttpLogExtendParam;
import com.chint.anneng.finance.common.log.kafka.KafkaTemplateConfig;
import com.chint.anneng.finance.common.utils.thread.wrapper.ThreadPoolExecutorMdcWrapper;
import com.chint.anneng.finance.portal.log.api.model.param.ApiOperateLogParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @date 2025/2/10 17:25
 */
@Slf4j
@Service
@ConditionalOnProperty(name = "finance.kafka-biz.log", havingValue = "true", matchIfMissing = true)
public class XXKafkaLogCollector implements LogCollector {

    private static final ThreadPoolExecutor executor = new ThreadPoolExecutorMdcWrapper(0, Runtime.getRuntime().availableProcessors() * 2,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new NameThreadFactory("finance.kafka-biz.log."), new ThreadPoolExecutor.AbortPolicy());


    @Autowired
    private KafkaTemplateConfig kafkaTemplateConfig;

    @Value("${finance.kafka.log-topic:finance_http_biz_log}")
    private String logTopic;

    @Override
    public void sendHttpLog(HttpLogExtendParam requestLog) {
        if (!Boolean.TRUE.equals(requestLog.getLogConfig().getSave2Db())) {
            return;
        }
        executor.submit(() -> {
            try {
                requestLog.setBizSucCode(null);
                requestLog.setBizSucKey(null);
                requestLog.setDecryptHandlerCode(null);
                requestLog.setLogConfig(null);

                String messageBody = JSONObject.toJSONString(requestLog);
                kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody)
                        .addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),
                                failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));
            } catch (Exception e) {
                log.warn("http请求日志[{}]发送到日志中心失败,请通过日志文件查看日志内容", requestLog.getRequestNo(), e);
            }
        });
    }

    @Override
    public void sendApiLog(ApiOperateLogParam apiOperateLog) {

    }
}


网站公告

今日签到

点亮在社区的每一天
去签到