新建maven项目,引入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
java kafka生产者
public class JavaProducer {
// kafka地址
public static final String bootstrapServer = "localhost:9092";
// topic主题
public static final String topic = "test";
public static void main(String[] args) {
Properties properties = new Properties();
// 指定key和消息体value的编码方式
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers",bootstrapServer);
// 创建并配置生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 创建消息,并指定分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,"test message 033");
// 发送消息
kafkaProducer.send(producerRecord);
// 关闭生产者客户端
kafkaProducer.close();
}
}
一个消息的生产主要包括四个步骤:
1.配置和创建生产者实例
2.配置和创建消息
3.发送消息
4.关闭生产者客户端实例
Kafka生产者配置项
kafka生产者有三个必填配置
bootstrap.servers
:指定broker地址清单key.serializer
:key的序列化方式,消费者对应的需要配置反序列化方式value.serializer
:value的序列化方式,消费者对应的需要配置反序列化方式
还有一些非必填配置,参照org.apache.kafka.clients.producer.ProducerConfig
类。
消息的创建
消息主要包括以下属性,其中topic和value是必填项,其余是选填项
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
………
}
对应的,ProducerRecord也提供了多个构造方法
消息的发送
生产者实例和消息实例都构建完成之后,就可以发送了
发送消息主要由三种模式:发后即忘(fire-and-forget)、同步(sync)、异步(async)
上面kafkaProducer.send(producerRecord);
就是发后即忘,他只管发送消息,至于有没有发送成功不关心,这就有消息丢失的可能。
事实上send方法并非是void类型的,而是Future<RecordMetadata>
类型,并且提供了两个重载方法
所以同步发送模式就可以利用返回的Future
对象实现:
Future<RecordMetadata> send = kafkaProducer.send(producerRecord);
RecordMetadata recordMetadata = send.get();
异步发送方式则可以利用send的重载方法,指定一个callback回调函数
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
// 异常处理
} else {
System.out.println(metadata);
}
});
RecordMetadata
RecordMetadata
对象包含了消息的一些元数据信息:
public final class RecordMetadata {
/**
* Partition value for record without partition assigned
*/
public static final int UNKNOWN_PARTITION = -1;
private final long offset;
// The timestamp of the message.
// If LogAppendTime is used for the topic, the timestamp will be the timestamp returned by the broker.
// If CreateTime is used for the topic, the timestamp is the timestamp in the corresponding ProducerRecord if the
// user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
// producer.
private final long timestamp;
private final int serializedKeySize;
private final int serializedValueSize;
private final TopicPartition topicPartition;
//…………
}
关闭生产者客户端示例
通常情况下,一个KafkaProducer
不会只发送单条消息。在发送完所有的消息后,调用KafkaProducer#close()
方法关闭KafkaProducer
实例来回收资源。close()
方法会阻塞等待所有发送请求完成后再关闭KafkaProducer
。
KafkaProducer还提供了一个带超时时间的重载方法,如果使用这个重载方法,则只会等待指定的超时时间,如果超过了这个时间,即使还有消息未发送完成,也会强行退出。我们一般使用无参的close()方法。
java kafka消费者
public class JavaConsumer {
public static final String bootstrapServer = "localhost:9092";
public static final String topic = "test";
public static final String group_id = "test-group2";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers",bootstrapServer);
properties.put("group.id",group_id);
// 创建消费者客户端
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅主题
kafkaConsumer.subscribe(Collections.singletonList(topic));
// 循环消费消息
try{
while (isRunning.get()){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 处理consumerRecord
}
}
}catch (Exception e){
// 处理异常
}finally {
kafkaConsumer.close();
}
}
}
消费者的消费逻辑主要包括以下几个步骤:
1.配置消费者客户端参数并创建消费者实例
2.订阅主题
3.拉取消息并消费
4.提交消费位移
5.关闭消费者实例
配置消费者客户端
必填参数:
bootstrap.servers
:指定broker地址清单key.deserialize
:key反序列化方式,与生产者序列化方式对应value.deserializer
:value反序列化方式,与生产者序列化方式对应group.id
:消费者组名
更多配置参org.apache.kafka.clients.consumer.ConsumerConfig
订阅主题与分区
调用KafkaConsumer#subscribe()
方法来订阅主题。
一个消费者可以订阅一个或多个主题。
KafkaConsumer#subscribe()
有四个重载方法,可以以集合的方式或者正则表达式的方式来订阅主题。
kafkaConsumer.subscribe(Arrays.asList("topic1"));
kafkaConsumer.subscribe(Pattern.compile("topic*"));
消费者还可以调用KafkaConsumer#assign(Collection<TopicPartition> partitions)
方法来直接订阅某些主题的特定分区
取消订阅则使用KafkaConsumer#unsubscribe()
方法
消息的消费
kafka消费消息是一个不断轮询的过程,在上面的代码中可以看出,消费者消费消息就是重复的调用poll()
方法,poll()
方法返回的则是所订阅的主题(分区)上的一组消息。
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
poll()`方法接收一个超时时间参数`timeout`,在消费者的缓冲区里没有可用数据时会发生阻塞,阻塞时间为`timeout
ConsumerRecord
消费者消费到的消息类型为ConsumerRecord,相对于ProducerRecord,ConsumerRecord的内容更加丰富一些
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
/**
* @deprecated checksums are no longer exposed by this class, this constant will be removed in Apache Kafka 4.0
* (deprecated since 3.0).
*/
@Deprecated
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional<Integer> leaderEpoch;
//………………
}
位移提交
控制或关闭消费
KafkaConsumer#pause()
:暂停某些分区在拉取操作时返回数据给客户端
KafkaConsumer#resume()
:恢复某些分区向客户端返回数据
KafkaConsumer#paused()
:返回被暂停的分区集合
KafkaConsumer#close()
:关闭消费者