springboot集成kafka
- 初始化springboot环境
如果在java版本中找不到8的话,把Server URL改成 https://start.aliyun.com
然后什么都不选直接创建
- pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.17</version>
</dependency>
- 创建application
如果新建的项目中没有resources文件夹的话,这样创建
application.yml具体内容
server:
port: 9090
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 127.0.0.1:9092 #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
consumer:
group-id: myGroup
enable-auto-commit: true
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
producer:
retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
- logback配置文件
创建logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 logs/job 设置为相对项目目录-->
<property name="log.path" value="logs/job" />
<!-- 日志输出格式 时间 线程 日志级别 类 方法 对应的行数 输出信息 这样设置后输出格式如下 -->
<!-- 15:09:27.204 [http-nio-8080-exec-10] DEBUG c.e.s.l.TestLog - [getVersion,26] - debug详细信息 -->
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
<!-- 控制台输出 appender -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!-- 日志内容输出格式设置为定义好的 log.pattern-->
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<!-- 系统日志输出 appender class 中的log.pattern 表示日志滚动输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 日志首次输出的文件地址 -->
<file>${log.path}/info.log</file>
<!-- 滚动输出策略:基于时间创建日志文件 ,这样第二天输出的日志,就会按照 fileNamePattern 新建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<!-- 日志内容输出格式设置为定义好的 log.pattern-->
<pattern>${log.pattern}</pattern>
</encoder>
<!-- 日志内容输出过滤器 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_debug" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/debug.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/debug.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>DEBUG</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>ERROR</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 系统模块日志级别控制 name 设置为你自己的项目根路径 如com.example.logback-->
<!-- level 设置日志输出的级别为debug 这样系统在进行日志输出时 只要级别在 debug 之后都可以打印 -->
<!-- 日志输出级别 trace< debug < info< warn < error -->
<logger name="com.example.logback" level="debug" />
<!-- Spring日志级别控制-->
<logger name="org.springframework" level="warn" />
<!-- kafka日志级别控制-->
<logger name="org.apache.kafka.clients" level="info" />
<!--系统操作日志 root 根路径的日志级别 info -->
<root level="info">
<!-- 将定义好的几个日志输出 追加到 root 上 -->
<!-- console 控制台输出 -->
<appender-ref ref="console" />
<!-- console info级别输出 -->
<appender-ref ref="file_info" />
<!-- console debug级输出 -->
<appender-ref ref="file_debug" />
<!-- console error级输出 -->
<appender-ref ref="file_error" />
</root>
</configuration>
- 消息体
public class Message {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
- 生产者消息封装
@Component
public class KafkaProduct {
static Logger logger = LoggerFactory.getLogger(KafkaProduct.class);
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProduct(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void product(String msg) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(msg);
message.setSendTime(new Date());
logger.info("消息发送: {}", JSONUtil.toJsonStr(message));
kafkaTemplate.send("cousumer01", JSONUtil.toJsonStr(message));
}
}
- 消费者接受
@Component
public class KafkaConsumer {
static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = {"cousumer01"})
public void listen(ConsumerRecord<?, ?> record) {
Optional.ofNullable(record.value())
.ifPresent(message -> {
log.info("记录record : {}", record);
log.info("消费消息message : {}", message);
});
}
}
- 接口定义触发
@RestController
public class InterfaceController {
@Resource
private KafkaProduct kafkaProduct;
@GetMapping("/sendMsg")
public String sendMessage(@RequestParam("msg") String msg) {
kafkaProduct.product(msg);
return "发送成功";
}
}
- 测试
控制台输出
简单应用是这样,但是这里缺乏很多东西。比如消息丢失在不同场景怎么处理?怎么保证消费者不会多次消费接收到同一条消息?