SpringBoot集成kafka详解

发布于:2025-09-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、使用开发工具创建SpringBoot项目

我使用的idea

可以直接在最开始建项目的时候选择所需依赖

我直接选择了kafka和lombok,如下图所示是idea帮我自动引入的依赖

二、编写配置文件application.yml

# 连接Kafka
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    # 生产者 key value的序列化方式
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者 key value的反序列化方式
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #指定消费者组的 group_id
      group-id: kafka-test

三、创建生产者接口

package com.example.producerserver.controller;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author 荣_rjy
 * @create 2023/9/6 16:26
 * @describe
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Resource
    private KafkaTemplate<String, String> kafka;

    @PostMapping
    public String data(@RequestBody String msg) {
        // 通过Kafka发出数据
        kafka.send("test", msg);
        return "ok";
    }
}

四、创建消费者

package com.example.producerserver.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * @author 荣_rjy
 * @create 2023/9/6 16:27
 * @describe
 */
@Configuration
public class KafkaConsumer {
    // 指定要监听的 topic
    @KafkaListener(topics = "test")
    public void consumeTopic(String msg) {
        // 参数: 从topic中收到的 value值
        System.out.println("收到的信息: " + msg);
    }
}

五、测试生产者

六、启动 kafka

配置文件的方式启动(本地)

#cmd 命令直接输入zkServer启动zookeeper服务
zkServer
#cmd 命令直接输入zkCli启动zookeeper客户端
zkCli
#cmd 命令定位到kafka的安装目录 我的安装目录是  E:\kafka\kafka_2.12-3.5.1
#cmd 命令再该目录中启动kafka
bin\windows\kafka-server-start.bat config\server.properties

linux 远程启动 kafka

配置文件的形式启动

进入该文件夹下

可以看到该文件夹下有如下文件数据

输入命令可以启动 kafka,但是不建议如此,因为需要在配置文件中进行一定的配置,本地的 linux 服务器并没有配置好

sh kafka_start.sh

使用 docker 启动 kafka

# 首先安装 kafka 需要用到的 zookeeper 
docker pull wurstmeister/zookeeper
# 运行zookeeper容器
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# 安装kafka容器
docker pull wurstmeister/kafka
# 运行kafka容器(需要注意的是,如果该kafka需要远程连接,如该kafka安装到了linux服务器上,但是需要本地的项目去连接该服务时,KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092   需要改成KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你对应的需要对外暴露的ip地址:9092),否则,本地项目在连接kafka时会去连接localhost地址的kafka服务
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka


网站公告

今日签到

点亮在社区的每一天
去签到