kafka自定义分区器

发布于:2025-07-02 ⋅ 阅读:(26) ⋅ 点赞:(0)

根据企业需求,自己重新实现分区器

只需要定义类实现Partitioner接口,然后重写partition()方法即可

假设现在有一个需求,发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区

package com.example.kafkademo.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 1. 实现接口Partitioner
 * 2. 实现3个方法:partition,close,configure
 * 3. 编写partition方法,返回分区号
 */
public class MyPartitioner implements Partitioner {

    /**
     * 重写这个方法
     * @param topic 主题
     * @param key 消息的key
     * @param keyBytes 消息的key序列化后的字节数组
     * @param value 消息的值
     * @param valueBytes 消息的值序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return 信息对应的分区
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
        return msgValue.contains("cuihaida") ? 0 : 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

使用分区器的方法,在生产者的配置中添加分区器参数

package com.example.kafkademo.util;

import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class CommonUtils {
    /**
     * kafka生产者配置配置
     * @return 配置内容
     */
    public static Properties buildKafkaProperties() {
        // 1. 创建kafka生产者配置对象
        Properties properties = new Properties();
        // 2. 给kafka的配置对象添加信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key, value初始化【必须有】
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        
        // =========> 添加自定义分区器 <============
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafkademo.producer.MyPartitioner")
        
        return properties;
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到