Java 拉取并解析Kafka的Topic,Insert到InfluxDB

发布于:2024-10-18 ⋅ 阅读:(10) ⋅ 点赞:(0)

Maven依赖

<dependencies>
    <!-- Kafka Client -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>

    <!-- InfluxDB Client -->
    <dependency>
        <groupId>com.influxdb</groupId>
        <artifactId>influxdb-client-java</artifactId>
        <version>6.9.0</version>
    </dependency>
</dependencies>

Kafka 消费者 + InfluxDB 插入逻辑

package com.ruoyi.datainterface;

import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaToInfluxDB {

       // InfluxDB 配置
    private static final String INFLUX_URL = "http://localhost:8086";
    private static final String TOKEN = "your-influxdb-token";
    private static final String ORG = "your-org";
    private static final String BUCKET = "your-bucket";

    // Kafka 配置
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "your-kafka-topic";
    private static final String GROUP_ID = "your-consumer-group";


    public static void main(String[] args) {
        // 初始化 InfluxDB 客户端
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(INFLUX_URL, TOKEN.toCharArray(), ORG, BUCKET);
        WriteApi writeApi = influxDBClient.getWriteApi();

        // 初始化 Kafka 消费者
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    // 解析 Kafka 消息并转换为 Point 对象
                    Point point = parseMessageToPoint(record.value());

                    // 使用 WriteApi 写入数据
                    writeApi.writePoint(point);
                    System.out.println("Inserted: " + point.toLineProtocol());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
            influxDBClient.close();
        }
    }

    /**
     * 将 Kafka 消息解析为 InfluxDB Point 对象。
     * 消息格式为:device001|1633990422|23.5|45.1|78.3
     */
    private static Point parseMessageToPoint(String message) {
        String[] parts = message.split("\\|");
        if (parts.length < 3) {
            throw new IllegalArgumentException("Invalid message format: " + message);
        }

        String deviceId = parts[0];  // 设备号作为标签
        String timestamp = parts[1]; 

        // 构建 Point 对象
        Point point = Point.measurement("device_data")
                .addTag("device_id", deviceId)
                .addTag("timestamp",timestamp);

        // 动态添加点位字段
        for (int i = 2; i < parts.length; i++) {
            point.addField("point" + (i - 1), Double.parseDouble(parts[i]));
        }

        return point;
    }
}