Docker部署kafka实操+Java中访问

发布于:2025-08-13 ⋅ 阅读:(12) ⋅ 点赞:(0)

Docker部署kafka实操+Java中访问

1.compose.yaml脚本结构

docker-compose.yml

version: '3.8'

services:
  zookeeper:
    image: bitnami/zookeeper:3.8
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - zookeeper_data:/bitnami/zookeeper

  kafka:
    image: bitnami/kafka:3.4
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST_IP}:9092
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
    volumes:
      - kafka_data:/bitnami/kafka
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

启用服务:ip需要指定成自己的

HOST_IP=192.168.17.17 docker compose up -d

启动成功之后会有两个服务:

2.kafka基本操作

1.创建主题:

docker exec -it kafka kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --replication-factor 1 \
  --partitions 3 \
  --topic aaa-topic

成功截图:

2.发消息

docker exec -it kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic test-topic

测试截图:

3.消费消息

docker exec -it kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic aaa-topic \
  --from-beginning

测试截图:

4.主题列表

docker exec -it kafka kafka-topics.sh --list \
  --bootstrap-server localhost:9092

测试截图:

3.Springboot中访问

依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

配置文件:

# kafka配置
kafka:
  servers: 192.168.17.17:9092
  topicName: test-topic

Java代码配置:

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.servers}")
    private String kafkaServers;

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        Properties properties = new Properties();

        // 配置Kafka服务器地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);

        // 配置序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 可选配置
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 数据可靠性配置
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量发送大小
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 延迟发送时间

        return new KafkaProducer<>(properties);
    }
}

测试controller:

@RestController
public class TestKafkaController {

    @Autowired
    private KafkaMessageService kafkaMessageService;

    /**
     * 提供HTTP接口测试消息发送
     */
    @GetMapping("/send-kafka")
    public String sendToKafka(String message) {
        kafkaMessageService.sendMessage(message);
        return "消息已发送: " + message;
    }
}

测试service

public interface KafkaMessageService {
    public void sendMessage(String message);
}


@Service
public class KafkaMessageServiceImpl implements KafkaMessageService {

    @Resource
    private KafkaProducer<String, String> kafkaProducer;

    @Value("${kafka.topicName:}")
    private String topicName;

    @Override
    public void sendMessage(String message) {
        try {
            // 创建消息记录
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
            // 发送消息
            kafkaProducer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("消息发送成功: " +
                            "主题=" + metadata.topic() +
                            ", 分区=" + metadata.partition() +
                            ", 偏移量=" + metadata.offset());
                } else {
                    System.err.println("消息发送失败: " + exception.getMessage());
                }
            });

            // 刷新生产者,确保消息被发送
            kafkaProducer.flush();
        } catch (Exception e) {
            System.err.println("发送消息时发生错误: " + e.getMessage());
        }
    }
}

请求http://127.0.0.1:8080/send-kafka?message=hello 即可发送成功


网站公告

今日签到

点亮在社区的每一天
去签到