一、目的
由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。
而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中
二 、原始数据格式
JSON格式比较复杂,对象中包含数组,数组中包含对象
{
"deviceNo": "39",
"sourceDeviceType": null,
"sn": null,
"model": null,
"createTime": "2024-09-03 14:40:00",
"data": {
"cycle": 300,
"sectionList": [{
"sectionNo": 1,
"coilList": [{
"laneNo": 1,
"laneType": null,
"coilNo": 1,
"volumeSum": 3,
"volumePerson": 0,
"volumeCarNon": 0,
"volumeCarSmall": 3,
"volumeCarMiddle": 0,
"volumeCarBig": 0,
"speedAvg": 24.15,
"timeOccupancy": 0.98,
"averageHeadway": 162.36,
"averageGap": 161.63,
"speed85": 38.0
},
{
"laneNo": 8,
"laneType": null,
"coilNo": 8,
"volumeSum": 1,
"volumePerson": 0,
"volumeCarNon": 0,
"volumeCarSmall": 1,
"volumeCarMiddle": 0,
"volumeCarBig": 0,
"speedAvg": 49.43,
"timeOccupancy": 0.3,
"averageHeadway": 115.3,
"averageGap": 115.1,
"speed85": 49.43
}]
}]
}
}
三、Java代码
package com.kgc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaKafkaStatistics {
// 添加 Kafka Producer 配置
private static Properties producerProps() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "-1");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "1");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
return props;
}
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 每一个消费,都要定义不同的Group_ID
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "statistics_group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("topic_internal_data_statistics"));
ObjectMapper mapper = new ObjectMapper();
// 初始化 Kafka Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
JsonNode rootNode = mapper.readTree(record.value());
System.out.println("原始数据"+rootNode);
String device_no = rootNode.get("deviceNo").asText();
String source_device_type = rootNode.get("sourceDeviceType").asText();
String sn = rootNode.get("sn").asText();
String model = rootNode.get("model").asText();
String create_time = rootNode.get("createTime").asText();
JsonNode dataNode = rootNode.get("data");
String cycle = dataNode.get("cycle").asText();
for (JsonNode sectionStatus : dataNode.get("sectionList")) {
String section_no = sectionStatus.get("sectionNo").asText();
JsonNode coilList = sectionStatus.get("coilList");
for (JsonNode coilItem : coilList) {
String lane_no = coilItem.get("laneNo").asText();
String lane_type = coilItem.get("laneType").asText();
String coil_no = coilItem.get("coilNo").asText();
String volume_sum = coilItem.get("volumeSum").asText();
String volume_person = coilItem.get("volumePerson").asText();
String volume_car_non = coilItem.get("volumeCarNon").asText();
String volume_car_small = coilItem.get("volumeCarSmall").asText();
String volume_car_middle = coilItem.get("volumeCarMiddle").asText();
String volume_car_big = coilItem.get("volumeCarBig").asText();
String speed_avg = coilItem.get("speedAvg").asText();
String speed_85 = coilItem.get("speed85").asText();
String time_occupancy = coilItem.get("timeOccupancy").asText();
String average_headway = coilItem.get("averageHeadway").asText();
String average_gap = coilItem.get("averageGap").asText();
String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",
device_no, source_device_type, sn, model, create_time, cycle, lane_no, lane_type, section_no, coil_no, volume_sum, volume_person,
volume_car_non, volume_car_small, volume_car_middle, volume_car_big, speed_avg, speed_85, time_occupancy, average_headway, average_gap);
// 发送数据到 Kafka
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_statistics", record.key(), outputLine);
producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.commitAsync();
}
}
}
剩下的几步参考二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)这篇博客