import java.util.*;
import java.util.concurrent.*;
// 核心接口定义
interface StreamOperator {
void open();
void processElement(Object element);
void close();
}
interface SourceFunction extends StreamOperator {
void run(SourceContext ctx);
}
interface SinkFunction extends StreamOperator {
void invoke(Object value);
}
// 运行时组件
class JobGraph {
private List<StreamOperator> operators = new ArrayList<>();
public void addOperator(StreamOperator operator) {
operators.add(operator);
}
public List<StreamOperator> getOperators() {
return operators;
}
}
class ExecutionGraph {
private List<ExecutionVertex> vertices = new ArrayList<>();
public void addVertex(ExecutionVertex vertex) {
vertices.add(vertex);
}
public List<ExecutionVertex> getVertices() {
return vertices;
}
}
class ExecutionVertex {
private StreamOperator operator;
private int parallelism;
public ExecutionVertex(StreamOperator operator, int parallelism) {
this.operator = operator;
this.parallelism = parallelism;
}
public StreamOperator getOperator() {
return operator;
}
}
// 主控节点
class JobManager {
private ResourceManager resourceManager = new ResourceManager();
private Map<String, JobMaster> runningJobs = new ConcurrentHashMap<>();
public String submitJob(JobGraph jobGraph) {
String jobId = UUID.randomUUID().toString();
JobMaster jobMaster = new JobMaster(jobId, jobGraph);
runningJobs.put(jobId, jobMaster);
jobMaster.start(resourceManager);
return jobId;
}
}
class JobMaster {
private String jobId;
private JobGraph jobGraph;
private CheckpointCoordinator checkpointCoordinator;
public JobMaster(String jobId, JobGraph jobGraph) {
this.jobId = jobId;
this.jobGraph = jobGraph;
this.checkpointCoordinator = new CheckpointCoordinator();
}
public void start(ResourceManager resourceManager) {
// 构建执行图
ExecutionGraph executionGraph = buildExecutionGraph(jobGraph);
// 申请资源
List<TaskSlot> slots = resourceManager.allocateResources(executionGraph);
// 部署任务
deployTasks(executionGraph, slots);
// 启动检查点协调器
checkpointCoordinator.start(jobId, executionGraph);
}
private ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {
ExecutionGraph executionGraph = new ExecutionGraph();
for (StreamOperator operator : jobGraph.getOperators()) {
executionGraph.addVertex(new ExecutionVertex(operator, 2)); // 默认并行度2
}
return executionGraph;
}
private void deployTasks(ExecutionGraph executionGraph, List<TaskSlot> slots) {
int slotIndex = 0;
for (ExecutionVertex vertex : executionGraph.getVertices()) {
for (int i = 0; i < vertex.getParallelism(); i++) {
Task task = new Task(vertex.getOperator());
slots.get(slotIndex++ % slots.size()).deployTask(task);
}
}
}
}
// 资源管理
class ResourceManager {
private List<TaskManager> taskManagers = new ArrayList<>();
public ResourceManager() {
// 初始化3个TaskManager
for (int i = 0; i < 3; i++) {
taskManagers.add(new TaskManager(i));
}
}
public List<TaskSlot> allocateResources(ExecutionGraph executionGraph) {
List<TaskSlot> slots = new ArrayList<>();
for (TaskManager tm : taskManagers) {
slots.addAll(tm.getAvailableSlots());
}
return slots.subList(0, Math.min(slots.size(), executionGraph.getVertices().size()));
}
}
// 工作节点
class TaskManager {
private int id;
private List<TaskSlot> slots = new ArrayList<>();
public TaskManager(int id) {
this.id = id;
// 每个TaskManager有2个slot
slots.add(new TaskSlot(id + "-1"));
slots.add(new TaskSlot(id + "-2"));
}
public List<TaskSlot> getAvailableSlots() {
return new ArrayList<>(slots);
}
}
class TaskSlot {
private String id;
private Task runningTask;
public TaskSlot(String id) {
this.id = id;
}
public void deployTask(Task task) {
this.runningTask = task;
task.start();
}
}
// 任务执行
class Task implements Runnable {
private StreamOperator operator;
private Thread executionThread;
public Task(StreamOperator operator) {
this.operator = operator;
}
public void start() {
executionThread = new Thread(this);
executionThread.start();
}
@Override
public void run() {
operator.open();
// 模拟数据处理循环
while (true) {
Object element = fetchNextElement(); // 从上游获取数据
if (element != null) {
operator.processElement(element);
}
}
}
private Object fetchNextElement() {
// 实际从网络或本地队列获取数据
return Math.random() > 0.5 ? new Object() : null;
}
}
// 容错机制
class CheckpointCoordinator {
public void start(String jobId, ExecutionGraph executionGraph) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
triggerCheckpoint(jobId, executionGraph);
}, 0, 10, TimeUnit.SECONDS); // 每10秒触发检查点
}
private void triggerCheckpoint(String jobId, ExecutionGraph executionGraph) {
System.out.println("Triggering checkpoint for job: " + jobId);
// 1. 通知所有任务开始检查点
for (ExecutionVertex vertex : executionGraph.getVertices()) {
// 实际实现中会通过RPC通知TaskManager
}
// 2. 等待所有任务确认
// 3. 持久化检查点元数据
}
}
// 示例应用
public class SimpleFlinkDemo {
public static void main(String[] args) {
// 1. 创建作业图
JobGraph jobGraph = new JobGraph();
// 创建数据源
SourceFunction source = new SourceFunction() {
@Override public void open() {}
@Override public void close() {}
@Override
public void run(SourceContext ctx) {
// 实际产生数据流
}
@Override
public void processElement(Object element) {
// 源操作符不需要处理元素
}
};
// 创建处理算子
StreamOperator mapper = new StreamOperator() {
@Override public void open() {}
@Override public void close() {}
@Override
public void processElement(Object element) {
System.out.println("Processing: " + element);
// 实际处理逻辑
}
};
// 创建输出算子
SinkFunction sink = new SinkFunction() {
@Override public void open() {}
@Override public void close() {}
@Override
public void invoke(Object value) {
System.out.println("Output: " + value);
}
@Override
public void processElement(Object element) {
invoke(element);
}
};
// 构建作业图
jobGraph.addOperator(source);
jobGraph.addOperator(mapper);
jobGraph.addOperator(sink);
// 2. 提交作业
JobManager jobManager = new JobManager();
String jobId = jobManager.submitJob(jobGraph);
System.out.println("Job submitted with ID: " + jobId);
// 保持主线程运行
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1.创建作业图list:source数据源,mapper处理算子,sink输出算子提交
2.加入jobmanager
3.jobmaster 添加一个作业 id:job,里main含有job图
4.生成执行图,里面装的是执行ExecutionVertex
5.给执行图分配slot
6.部署task
执行检查