本文将手把手指导如何在 CentOS 7 系统中安装并配置 JDK、Apache Zookeeper、Apache Kafka 以及 Apache Flink,实现分布式流处理平台的基本搭建,并支持外部访问。
业务场景
随着互联网金融的发展,信用卡支付场景日益多样化,用户的交易行为呈现出高频、碎片化特征,欺诈行为也愈发隐蔽与智能化。传统的离线风控手段已难以及时识别异常交易,导致欺诈风险上升,信用卡机构面临严重的资金损失与用户信任危机。
为此,构建了一套基于 Apache Flink 的 实时反欺诈指标计算平台,以 Kafka 为数据传输中枢,结合 Flink 的流处理能力,实时处理交易数据、行为日志,提取用户行为特征,执行规则判断及模型计算,最终将欺诈风险评分结果快速写入 Redis,供风控系统和后台展示查询,实现毫秒级预警与实时干预。
该平台具备高吞吐、低延迟、可扩展、可插拔等特点,可支持多业务线并发接入,保障金融交易系统在复杂环境下的安全与稳定。
技术架构图
🧱 一、环境准备
✅ 1. 更新系统
sudo yum update -y
✅ 2. 安装 wget 和 unzip
sudo yum install wget unzip -y
☕ 二、安装 JDK(OpenJDK 8)
Flink、Kafka、Zookeeper 都需依赖 Java 环境。
安装命令:
sudo yum install java-1.8.0-openjdk-devel -y
验证安装:
java -version
输出应类似于:
openjdk version "1.8.0_xx"
🐘 三、安装 Zookeeper(Apache Zookeeper 3.8.4)
Zookeeper 是 Kafka 的依赖组件。
✅ 1. 下载并解压
cd /opt
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz
mv apache-zookeeper-3.8.4-bin zookeeper
✅ 2. 创建配置文件
cd /opt/zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg
默认配置即可用于测试,也可调整如下:
tickTime=2000
dataDir=/opt/zookeeper/data
clientPort=2181
✅ 3. 创建数据目录并启动
mkdir /opt/zookeeper/data
bin/zkServer.sh start
✅ 4. 开放端口
sudo firewall-cmd --permanent --add-port=2181/tcp
sudo firewall-cmd --reload
🦄 四、安装 Kafka(Kafka 2.2.1 + Scala 2.12)
✅ 1. 下载并解压
cd /opt
wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -zxvf kafka_2.12-2.2.1.tgz
mv kafka_2.12-2.2.1 kafka
✅ 2. 修改配置文件
编辑 config/server.properties
:
# 修改以下项
broker.id=0
# 服务器的ip
listeners=PLAINTEXT://192.168.206.133:9092
zookeeper.connect=localhost:2181
log.dirs=/opt/kafka/logs
✅ 3. 启动 Kafka
先启动 Zookeeper(如果未启动):
/opt/zookeeper/bin/zkServer.sh start
再启动 Kafka:
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties
✅ 4. 开放 Kafka 端口
sudo firewall-cmd --permanent --add-port=9092/tcp
sudo firewall-cmd --reload
🌊 五、安装 Flink(Flink 1.18.0)
✅ 1. 下载并解压
cd /opt
wget https://downloads.apache.org/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
mv flink-1.18.0 flink
✅ 2. 修改配置文件
编辑 conf/flink-conf.yaml
:
jobmanager.rpc.address: 0.0.0.0
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
rest.port: 8081
如果部署为集群,可设置:
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
✅ 3. 启动 Flink
cd /opt/flink
bin/start-cluster.sh
✅ 4. 验证启动状态
jps
你应能看到:
StandaloneSessionClusterEntrypoint
TaskManagerRunner
✅ 5. 开放 Flink Web UI 端口
sudo firewall-cmd --permanent --add-port=8081/tcp
sudo firewall-cmd --reload
✅ 6. 访问 Flink
浏览器打开:
http://your_server_ip:8081/
📌 六、测试 Kafka 与 Flink 连通性
Flink 任务运行时需要的 Kafka 连接器类在远程 Flink 集群中不存在。需要在远程 Flink 集群安装 Kafka 连接器
# 下载 Kafka 连接器 JAR 文件
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.1.0-1.18/flink-connector-kafka-3.1.0-1.18.jar
cp flink-connector-kafka-3.1.0-1.18.jar /opt/flink/lib/
# 下载 flink-connector-base(如果没有)
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base/1.18.0/flink-connector-base-1.18.0.jar
cp flink-connector-base-1.18.0.jar /opt/flink/lib/
# 下载 kafka-clients(如果版本不匹配)
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.6.0/kafka-clients-3.6.0.jar
cp kafka-clients-3.6.0.jar /opt/flink/lib/
联通测试方式一:
如果你希望验证 Flink 能消费 Kafka 数据流,可使用以下命令在 Kafka 中创建 Topic 并发送测试数据,再用 Flink 示例任务消费它。
Kafka 创建 Topic:
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Kafka 发送消息:
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
Flink 运行 KafkaSource 示例代码(你需要自行写 Job 或提供 JAR 包)。
联通测试方式二:
package com.example.controller;
import com.example.config.FlinkConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* Flink测试控制器
*/
@RestController
@RequestMapping("/api/flink")
@Slf4j
public class FlinkTestController {
@Autowired
private FlinkConfig flinkConfig;
/**
* 测试Flink连接
*/
@PostMapping("/test-connection")
public Map<String, Object> testFlinkConnection() {
Map<String, Object> result = new HashMap<>();
try {
log.info("正在测试Flink连接到 {}:{}",
flinkConfig.getRemote().getHost(),
flinkConfig.getRemote().getPort());
// 创建远程执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
flinkConfig.getRemote().getHost(),
flinkConfig.getRemote().getPort()
);
if (env != null) {
result.put("status", "success");
result.put("message", "Flink连接测试成功");
result.put("flinkHost", flinkConfig.getRemote().getHost());
result.put("flinkPort", flinkConfig.getRemote().getPort());
log.info("Flink连接测试成功");
} else {
result.put("status", "error");
result.put("message", "无法创建Flink执行环境");
}
} catch (Exception e) {
log.error("Flink连接测试失败", e);
result.put("status", "error");
result.put("message", "Flink连接测试失败: " + e.getMessage());
result.put("error", e.getClass().getSimpleName());
}
return result;
}
}
✅ 七、总结
组件 | 默认端口 | 外部可访问配置 |
---|---|---|
Zookeeper | 2181 | 配置文件已支持 |
Kafka | 9092 | listeners=0.0.0.0:9092 |
Flink | 8081 | rest.address=0.0.0.0 |