根据企业需求,自己重新实现分区器
只需要定义类实现Partitioner接口,然后重写partition()方法即可
假设现在有一个需求,发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
package com.example.kafkademo.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口Partitioner
* 2. 实现3个方法:partition,close,configure
* 3. 编写partition方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
/**
* 重写这个方法
* @param topic 主题
* @param key 消息的key
* @param keyBytes 消息的key序列化后的字节数组
* @param value 消息的值
* @param valueBytes 消息的值序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return 信息对应的分区
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue = value.toString();
// 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
return msgValue.contains("cuihaida") ? 0 : 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
使用分区器的方法,在生产者的配置中添加分区器参数
package com.example.kafkademo.util;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class CommonUtils {
/**
* kafka生产者配置配置
* @return 配置内容
*/
public static Properties buildKafkaProperties() {
// 1. 创建kafka生产者配置对象
Properties properties = new Properties();
// 2. 给kafka的配置对象添加信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key, value初始化【必须有】
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// =========> 添加自定义分区器 <============
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafkademo.producer.MyPartitioner")
return properties;
}
}