下载
直接到官网下载:https://kafka.apache.org/
然后解压
config里面是配置
logs里面是日志
启动
使用zookeeper启动
kafka运行在zookeeper上,先启动zookeeper,再启动kafka
- 先启动zookeeper,kafka里面内置了zookeeper,不需要我们再去下载,直接使用内置的就可以
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
可能报错:命令语法不正确。
解决方法:只需要让文件夹名字短一点就行
出现这样结果就是成功启动
- 然后启动kafka
bin\windows\kafka-server-start.bat config\server.properties
如果这次启动失败了,关掉zookeeper重新启动 某位大佬的解决方法
使用Kraft启动
生成uuid
格式化目录
启动
也可以自定义集群id,在“格式化目录”时,把uuid换成自己定义的集群id就行
注意路径的变化
使用
创建主题
D:\MY\kafka\kafka3.7\bin\windows>kafka-topics.bat --create --topic dello --bootstrap-server localhost:9092
Created topic dello.
查看主题:
D:\MY\kafka\kafka3.7\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
dello
在springboot中使用
导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafka是一个消息中间件,在生产者和消费者中间,所以需要配置3类资料:
- kafka自己的资料
- 生产者
- 消费者
配置文件中设置:
#自己的地址
spring.kafka.bootstrap-servers=localhost:9092
- 新建一个生产者的类:
@Component
public class EventProducer {
//前面导入依赖+配置文件就会自动装配kafka
@Resource
public KafkaTemplate<String,String> kafkaTemplate;
//发送事件,在hello主题上发一个i am a banana消息
public void sentEvent(){
kafkaTemplate.send("hello","i am a banana");
}
public void sentEvent02(){
Message<String> message = MessageBuilder.withPayload("i am a banana too").setHeader(KafkaHeaders.TOPIC,
"hello02").build();
kafkaTemplate.send(message);
}
public void sentEvent03(){
//里面放一些信息,消费者可以接收到
Headers headers = new RecordHeaders();
headers.add("color","yellow".getBytes(StandardCharsets.UTF_8));
ProducerRecord producerRecord = new ProducerRecord<>(
"hello03",0,System.currentTimeMillis(),"who","banana",headers
);
kafkaTemplate.send(producerRecord);
}
//使用这个发送方法,要在配置文件中新增默认topic的配置
//spring.kafka.template.default-topic=default-topic
public void sentEvent04(){
kafkaTemplate.sendDefault("banana");
}
}
不同的send方法本质都是把消息转换成 ProducerRecord 形式
进行测试:
@SpringBootTest
class Base01ApplicationTests {
@Autowired
private EventProducer eventProducer;
@Test
void test01() {
eventProducer.sentEvent();
}
}
不报错就是成功了
- 新建一个消费者的类:
//被初始化之后就一直在监听,默认监听到新发来的消息
@Component
public class EventConsumer {
//这会开辟一个线程一直执行监听工作
@KafkaListener(topics = "hello", groupId = "hello-group")
public void onEvent(String event){
System.out.println("监听到:" + event);
}
}
运行启动类,开启监听线程,然后运行测试类,发送一个消息,结果:
监听到:i am a banana
如果想让一个新的消费者组groupId 开始消费当前主题,使用earliest就能读取到历史消息,(只限于新的消费者组,之前没消费过这个主题的)
生产者发送消息后
每一个send方法的返回结果都是CompletableFuture<SendResult<K, V>>
public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
return this.observeSend(producerRecord);
}
发送消息是异步的,即:生产者发完消息就去干别的事情,但是马上就能拿到CompletableFuture
,CompletableFuture
里面没有内容,他表示的是未来的消息
,如果消息成功发送了,CompletableFuture
里面才会有内容
拿到CompletableFuture
里面内容的方法有:
- 阻塞方式拿到:里面的
get()
方法是阻塞的
public void sentEvent04(){
CompletableFuture<SendResult<String, String>> completableFuture = kafkaTemplate.sendDefault("banana");
try {
SendResult<String, String> sendResult = completableFuture.get();
if(sendResult.getRecordMetadata() != null){
System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- 非阻塞方式:
thenAccept()
方法注册回调函数,发送完成就执行这个回调函数,回调函数的返回结果还是CompletableFuture
类型,可以继续执行回调函数
public void sentEvent05(){
CompletableFuture<SendResult<String, String>> completableFuture = kafkaTemplate.sendDefault("banana");
try {
completableFuture.thenAccept((t) -> {
if(t.getRecordMetadata() != null){
System.out.println("消息发送成功:"+t.getRecordMetadata().toString());
}
}).exceptionally((t) -> {
t.printStackTrace();
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}