1、CounterSource
class CounterSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction,CheckpointListener {
/**
* current offset for exactly once semantics
*/
private Long offset = 0L;
/**
* flag for job cancellation
*/
private volatile boolean isRunning = true;
/**
* 存储 state 的变量.
*/
private ListState<Long> state;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
"state",
LongSerializer.INSTANCE));
// 从已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
if (context.isRestored()) {
System.out.println("任务出错,source 正在从 Checkpoint 重启");
for (Long l : state.get()) {
offset = l;
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.update(Collections.singletonList(offset));
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
System.out.println("=====Source算子的Checkpoint " + checkpointId + " completed.");
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
System.out.println("=====Source算子的Checkpoint " + checkpointId + " Aborted.");
}
}
2、BufferingSink
class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction, CheckpointListener {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() >= threshold) {
for (Tuple2<String, Integer> element : bufferedElements) {
System.out.println("当前阈值为=>" + threshold + ",输出元素为=>" + element);
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.update(bufferedElements);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
System.out.println("任务出错,sink 正在从 Checkpoint 重启");
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
System.out.println("=====Sink算子的Checkpoint " + checkpointId + " completed.");
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
System.out.println("=====Sink算子的Checkpoint " + checkpointId + " Aborted.");
}
}
3、完整测试用例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* source 每 300ms 下发一条数据
* sink 每攒够 10 条数据打印一次
* 即整个链路每 3s 打印一次,完成一个循环
* 设置 checkpoint 快照为 3s
*/
public class _03_OperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
DataStreamSource<Long> myExactlyOnceSource = env.addSource(new CounterSource());
// 模拟出现异常
SingleOutputStreamOperator<Tuple2<String, Integer>> map = myExactlyOnceSource.map(new MapFunction<Long, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Long value) throws Exception {
if (value == 30 || value == 50) {
int error = 1 / 0;
return new Tuple2<String, Integer>("error", 0);
}
return new Tuple2<String, Integer>("res", value.intValue());
}
});
map.addSink(new BufferingSink(10));
env.execute();
}
}