[0688].实现kafka生产者发送消息

发布于:2025-02-13 ⋅ 阅读:(14) ⋅ 点赞:(0)

需求说明:

  • 创建 Kafka 生产者,采用异步的方式生产者将数据发送到 Kafka Broker

一、消息的异步发送API:

1.1.异步发送含义:

  • 1.所谓的异步发送是指 将外部的数据发送到队列中,不管队列中的数据有没有发送到kafka集群,main线程会把数据一批的一批的发送到队列中
    在这里插入图片描述

1.2.编码实现异步发送:

a.创建kafka工程

在这里插入图片描述
在这里插入图片描述

b.导入kafka依赖:

	<dependencies>
	 <dependency>
		 <groupId>org.apache.kafka</groupId>
		 <artifactId>kafka-clients</artifactId>
		 <version>3.0.0</version>
	 </dependency>
	</dependencies>

c.实现无回调功能的异步发送:

  • 1.实现生产者发送消息
package com.qun.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        //0.属性配置
        Properties properties = new Properties();
        //连接到Kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,hadoop104:9092,hadoop105:9092");
        //指定对应的key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //1.创建生产者对象
        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<>(properties);//first是代表的发送到的主题

        //2.发送数据
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("firstopic", "jianqun" + i));//参数1:数据发送到哪个主题;  参数2:发送的数据内容
        }

        //3.关闭资源
        KafkaProducer.close();
    }
}
  • 2.代码测试:
    • 在服务器上q启动kafka的消费者,并指定消费的主题:bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic firstopic
      在这里插入图片描述
    • 执行生产者代码:
    • 查看消费者消费的消息:
      在这里插入图片描述

d.带回调函数的异步发送流程:

  • 1.什么叫回调函数:
    • 带回调函数的异步发送就是指生产者发送数据后,对列会返回数据所在的队列、分区等数据
    • 回调函数会在生产者收到 ack 时被调用,为异步调用,回调函数方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败
      在这里插入图片描述
  • 2.编码实现生产者发送消息:
package com.qun.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
	public static void main(String[] args) throws InterruptedException {
	 	// 1. 创建 kafka 生产者的配置对象
		Properties properties = new Properties();
	 	// 2. 给 kafka 配置对象添加配置信息
	    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop103:9092,hadoop104:9092,hadoop105:9092");
	 	
	 	// key,value 序列化(必须):key.serializer,value.serializer
	 	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
		// 3. 创建 kafka 生产者对象
		KafkaProducer<String, String> kafkaProducer = new  KafkaProducer<String, String>(properties);
	 	// 4. 调用 send 方法,发送消息
	 	for (int i = 0; i < 5; i++) {
	 		// 添加回调
	 		kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i), new Callback() {//该方法在Producer收到ack时调用,为异步调用
		 		@Override
			 	public void onCompletion(RecordMetadata metadata,Exception exception) {
			 		if (exception == null) {
			 			// 没有异常,输出信息到控制台
			 			System.out.println(" 主题: " +   metadata.topic() + "->" + "分区:" + metadata.partition());
			 		} else {
					 	// 出现异常打印
						exception.printStackTrace();
					}
			 	}
			 }
			);
		 	// 延迟一会会看到数据发往不同分区
			 Thread.sleep(2);
		 }
	 	// 5. 关闭资源
	 	kafkaProducer.close();
	 }
 }
  • 3.测试:
    • 启动kafka消费者:
      在这里插入图片描述
    • 执行生产者代码,可以看到在控制台上输出的内容:
      在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试


二、同步发送API:

2.1.同步发送概念:

  • 1.同步发送就是发送到对列中的数据,必须全部发送到Broke后,生产者才可以继续发送数据到队列中

2.2.代码实现:

  • 1.只需在异步发送的基础上,再调用一下 get()方法即可
package com.qun.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerTongBu {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,hadoop104:9092,hadoop105:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<>("firstopic", "jianqun +++ " + i)).get();
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2);
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到