需求说明:
- 创建 Kafka 生产者,采用异步的方式生产者将数据发送到 Kafka Broker
一、消息的异步发送API:
1.1.异步发送含义:
- 1.所谓的异步发送是指
将外部的数据发送到队列中,不管队列中的数据有没有发送到kafka集群,main线程会把数据一批的一批的发送到队列中
1.2.编码实现异步发送:
a.创建kafka工程
b.导入kafka依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
c.实现无回调功能的异步发送:
- 1.实现生产者发送消息
package com.qun.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
//0.属性配置
Properties properties = new Properties();
//连接到Kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,hadoop104:9092,hadoop105:9092");
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//1.创建生产者对象
KafkaProducer<String, String> KafkaProducer = new KafkaProducer<>(properties);//first是代表的发送到的主题
//2.发送数据
for (int i = 0; i < 5; i++) {
KafkaProducer.send(new ProducerRecord<>("firstopic", "jianqun" + i));//参数1:数据发送到哪个主题; 参数2:发送的数据内容
}
//3.关闭资源
KafkaProducer.close();
}
}
- 2.代码测试:
- 在服务器上q启动kafka的消费者,并指定消费的主题:
bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic firstopic
- 执行生产者代码:
- 查看消费者消费的消息:
- 在服务器上q启动kafka的消费者,并指定消费的主题:
d.带回调函数的异步发送流程:
- 1.什么叫回调函数:
- 带回调函数的异步发送就是指生产者发送数据后,对列会返回数据所在的队列、分区等数据
- 回调函数会在生产者收到 ack 时被调用,为异步调用,回调函数方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),
如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败
- 2.编码实现生产者发送消息:
package com.qun.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop103:9092,hadoop104:9092,hadoop105:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 添加回调
kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i), new Callback() {//该方法在Producer收到ack时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
}
);
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
// 5. 关闭资源
kafkaProducer.close();
}
}
- 3.测试:
- 启动kafka消费者:
- 执行生产者代码,可以看到在控制台上输出的内容:
- 启动kafka消费者:
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试
二、同步发送API:
2.1.同步发送概念:
- 1.同步发送就是发送到对列中的数据,必须全部发送到Broke后,生产者才可以继续发送数据到队列中
2.2.代码实现:
- 1.只需在异步发送的基础上,再调用一下 get()方法即可
package com.qun.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerTongBu {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,hadoop104:9092,hadoop105:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<>("firstopic", "jianqun +++ " + i)).get();
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
// 5. 关闭资源
kafkaProducer.close();
}
}