1、生产者配置
nacos中增加配置,和公共spring-kafka配置字段有区分
需要发送压缩消息时,使用该配置类发送即可
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
/**
* @date 2025/3/31 13:50
* @description:
*/
@Configuration
public class KafkaTemplateConfig {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private KafkaTemplate<String, String> compressedKafkaTemplate;
@Value("${spring.kafka.producer.compression-format:lz4}")
private String compressionFormat;
@Value("${spring.kafka.producer.max-request-size:10485760}")
private String maxRequestSize;
@PostConstruct
public void initProducerConfig() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaTemplate.getProducerFactory().getConfigurationProperties());
Map<String, Object> configs = new HashMap<>(2);
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionFormat);
configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,maxRequestSize );
factory.updateConfigs(configs);
compressedKafkaTemplate = new KafkaTemplate<>(factory);
}
public KafkaTemplate<String, String> getCompressedKafkaTemplate() {
return compressedKafkaTemplate;
}
}
2、使用
关键代码:
kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody)
.addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),
failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.chint.anneng.finance.common.log.http.HttpLogExtendParam;
import com.chint.anneng.finance.common.log.kafka.KafkaTemplateConfig;
import com.chint.anneng.finance.common.utils.thread.wrapper.ThreadPoolExecutorMdcWrapper;
import com.chint.anneng.finance.portal.log.api.model.param.ApiOperateLogParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @date 2025/2/10 17:25
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "finance.kafka-biz.log", havingValue = "true", matchIfMissing = true)
public class XXKafkaLogCollector implements LogCollector {
private static final ThreadPoolExecutor executor = new ThreadPoolExecutorMdcWrapper(0, Runtime.getRuntime().availableProcessors() * 2,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NameThreadFactory("finance.kafka-biz.log."), new ThreadPoolExecutor.AbortPolicy());
@Autowired
private KafkaTemplateConfig kafkaTemplateConfig;
@Value("${finance.kafka.log-topic:finance_http_biz_log}")
private String logTopic;
@Override
public void sendHttpLog(HttpLogExtendParam requestLog) {
if (!Boolean.TRUE.equals(requestLog.getLogConfig().getSave2Db())) {
return;
}
executor.submit(() -> {
try {
requestLog.setBizSucCode(null);
requestLog.setBizSucKey(null);
requestLog.setDecryptHandlerCode(null);
requestLog.setLogConfig(null);
String messageBody = JSONObject.toJSONString(requestLog);
kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody)
.addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),
failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));
} catch (Exception e) {
log.warn("http请求日志[{}]发送到日志中心失败,请通过日志文件查看日志内容", requestLog.getRequestNo(), e);
}
});
}
@Override
public void sendApiLog(ApiOperateLogParam apiOperateLog) {
}
}