kafak

发布于:2025-08-13 ⋅ 阅读:(16) ⋅ 点赞:(0)


一、普通使用

生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 1. 配置生产者
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all"); // 确保消息持久化[6](@ref)
        props.put("retries", 3); // 重试次数

        // 2. 创建生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 3. 发送消息(异步回调)
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "Hello Kafka!");
        producer.send(record, (metadata, e) -> {
            if (e == null) {
                System.out.printf("✅ 消息发送成功: topic=%s, partition=%d, offset=%d%n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            } else {
                e.printStackTrace();
            }
        });

        // 4. 关闭连接
        producer.close();
    }
}

消费者
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 1. 配置消费者
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group"); // 消费者组ID[6](@ref)
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费

        // 2. 创建消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题

        // 3. 持续拉取消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("📩 收到消息: key=%s, value=%s (partition=%d, offset=%d)%n",
                            record.key(), record.value(), record.partition(), record.offset());
                }
            }
        } finally {
            consumer.close(); // 确保资源释放
        }
    }
}


2、SpringBoot环境下使用

生产者的配置与使用
@Configuration
public class KafkaConfig {
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }
}

@Service
public class MessageService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message); // 发送消息[4](@ref)
    }
}


消费者的配置与使用

@Configuration
@EnableKafka
public class ConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

@Service
public class MessageListener {
    @KafkaListener(topics = "test-topic", groupId = "spring-group")
    public void listen(String message) {
        System.out.println("🔔 Spring消费消息: " + message); // 自动监听主题[4,6](@ref)
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到