文章目录
chunjun涉及到的操作有注册metric、同步metric、metric判断。
一. 管理(注册、同步)metric
1. BaseRichInputFormat对metric的管理
注册metric的操作主要是在open方法中,主要逻辑如下:
this.context = (StreamingRuntimeContext) getRuntimeContext();
//获取flink全局参数,用于通过脏数据管理器的参数配置
ExecutionConfig.GlobalJobParameters params =
context.getExecutionConfig().getGlobalJobParameters();
DirtyConfig dc = DirtyConfUtil.parseFromMap(params.toMap());
//注册脏数据管理器
this.dirtyManager = new DirtyManager(dc, this.context);
。。。
if (!initialized) {
//初始化累加器,
initAccumulatorCollector();
//初始化行大小
initRowSizeCalculator();
//初始统计metric
initStatisticsAccumulator();
//初始化消费速率
initByteRateLimiter();
//初始化cp相关文件
initRestoreInfo();
initialized = true;
}
nbsp;
开启metric同步线程,每taskmanager心跳+1s同步一次数据。
private void initAccumulatorCollector() {
String lastWriteLocation =
String.format("%s_%s", Metrics.LAST_WRITE_LOCATION_PREFIX, indexOfSubTask);
String lastWriteNum =
String.format("%s_%s", Metrics.LAST_WRITE_NUM__PREFIX, indexOfSubTask);
accumulatorCollector =
new AccumulatorCollector(
context,
Arrays.asList(
Metrics.NUM_READS,
Metrics.READ_BYTES,
Metrics.READ_DURATION,
Metrics.WRITE_BYTES,
Metrics.NUM_WRITES,
lastWriteLocation,
lastWriteNum));
accumulatorCollector.start();
}
注册如下指标
- 先从RuntimeContext获取指标,如果没有则注册meitric到jobmanager中。
- 或者通过MetricGroup()进行注册,在BaseMetric类中有详细逻辑,见下节分析。
private void initStatisticsAccumulator() {
numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS);
bytesReadCounter = getRuntimeContext().getLongCounter(Metrics.READ_BYTES);
durationCounter = getRuntimeContext().getLongCounter(Metrics.READ_DURATION);
inputMetric = new BaseMetric(getRuntimeContext());
inputMetric.addMetric(Metrics.NUM_READS, numReadCounter, true);
inputMetric.addMetric(Metrics.READ_BYTES, bytesReadCounter, true);
inputMetric.addMetric(Metrics.READ_DURATION, durationCounter);
inputMetric.addDirtyMetric(Metrics.DIRTY_DATA_COUNT, this.dirtyManager.getConsumedMetric());
inputMetric.addDirtyMetric(
Metrics.DIRTY_DATA_COLLECT_FAILED_COUNT,
this.dirtyManager.getFailedConsumedMetric());
}
openInputFormat中可以定义:用户的merticReporter
...
if (useCustomReporter()) {
customReporter =
DataSyncFactoryUtil.discoverMetric(
config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
customReporter.open();
}
2. 通过BaseMetric管理metric
BaseMetric提供注册metric group、注册metric 以及等待metric同步(到jobmanager)等能力。
BaseMetric主要有addMetric、addDirtyMetric、waitForReportMetrics、getChunjunMetricGroup等方法,其中构造方法中,通过runtimeContext注册了chunjunMetricGroup、chunjunDirtyMetricGroup。具体逻辑如下:
注册metric group:
public BaseMetric(RuntimeContext runtimeContext) {
//获取全局变量:DELAY_PERIOD_MILL
ExecutionConfig.GlobalJobParameters params =
runtimeContext.getExecutionConfig().getGlobalJobParameters();
Map<String, String> confMap = params.toMap();
this.DELAY_PERIOD_MILL =
Long.parseLong(
String.valueOf(confMap.getOrDefault(DELAY_PERIOD_MILL_KEY, "20000")));
//注册metric group:chunjun、output
chunjunMetricGroup =
runtimeContext
.getMetricGroup()
.addGroup(
Metrics.METRIC_GROUP_KEY_CHUNJUN,
Metrics.METRIC_GROUP_VALUE_OUTPUT);
//注册metric group:DirtyData、output
chunjunDirtyMetricGroup =
chunjunMetricGroup.addGroup(
Metrics.METRIC_GROUP_KEY_DIRTY, Metrics.METRIC_GROUP_VALUE_OUTPUT);
}
注册metric:有gauge、meter等metric类型。
//注册指标
public void addMetric(String metricName, LongCounter counter) {
addMetric(metricName, counter, false);
}
public void addMetric(String metricName, LongCounter counter, boolean meterView) {
metricCounters.put(metricName, counter);
chunjunMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter));
if (meterView) {
chunjunMetricGroup.meter(
metricName + Metrics.SUFFIX_RATE, new SimpleLongCounterMeterView(counter, 20));
}
}
//metricName: metric名字
//counter: 具体计数的counter
public void addDirtyMetric(String metricName, LongCounter counter) {
metricCounters.put(metricName, counter);
chunjunDirtyMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter));
}
等待metric指标:当taskslot结束之后,需要等待一段时间把未同步的指标同步给jobmanager。
public void waitForReportMetrics() {
try {
Thread.sleep(DELAY_PERIOD_MILL);
} catch (InterruptedException e) {
ThreadUtil.sleepMilliseconds(DELAY_PERIOD_MILL);
log.warn("Task thread is interrupted");
}
}
二. AccumulatorCollector:metric同步
AccumulatorCollector实现了周期性合并并获取全局metric,主流程是:每taskmanager心跳后+1s,同步一次全局的metric给taskslot。
AccumulatorCollector有start、close、collectAccumulator、getAccumulatorValue、getLocalAccumulatorValue等方法。
1. 启动线程池,周期性更新metric信息
/** 启动线程池,周期性更新累加器信息 */
public void start() {
scheduledExecutorService.scheduleAtFixedRate(
this::collectAccumulator, 0, period, TimeUnit.MILLISECONDS);
}
收集累加器信息,具体逻辑是
调用requestJob(向jobmanager进行rpc请求,合并并获取全局metric),同步全局metric。具体步骤是:
- 每个slot将自己的metric同步到全局(jobmanager)中
- 脏数据同步给每个slot:拿到全局的脏数据metric传给taskslot,可在每个taskslot中判断处理的脏数据是否超过全局设置的。
- 如果没有注册metric,则获取maxValue指标,这里主要是jdbc用到了此指标。
public void collectAccumulator() {
CompletableFuture<ExecutionGraphInfo> executionGraphInfoCompletableFuture =
gateway.requestJob(Time.seconds(10));
ExecutionGraphInfo executionGraphInfo;
try {
executionGraphInfo = executionGraphInfoCompletableFuture.get();
} catch (Exception e) {
// 限制最大出错次数,超过最大次数则使任务失败,如果不失败,统计数据没有及时更新,会影响速率限制,错误控制等功能
collectErrorTimes++;
if (collectErrorTimes > MAX_COLLECT_ERROR_TIMES) {
// 主动关闭线程和资源,防止异常情况下没有关闭
close();
throw new RuntimeException(
"The number of errors in updating statistics data exceeds the maximum limit of 100 times. To ensure the correctness of the data, the task automatically fails");
}
return;
}
StringifiedAccumulatorResult[] accumulatorResult =
executionGraphInfo.getArchivedExecutionGraph().getAccumulatorResultsStringified();
for (StringifiedAccumulatorResult result : accumulatorResult) {
ValueAccumulator valueAccumulator = valueAccumulatorMap.get(result.getName());
if (valueAccumulator != null) {
valueAccumulator.setGlobal(Long.parseLong(result.getValue()));
} else if (result.getName().equals(Metrics.MAX_VALUE)) {
rdbMaxFuncValue = result.getValue();
}
}
}
2. 获取全局指标、本地指标
获取指定累加器信息
public long getAccumulatorValue(String name, boolean needWaited) {
if (needWaited) {
waited();
}
ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);
if (valueAccumulator == null) {
return 0;
}
return valueAccumulator.getGlobal();
}
获取每个subtask的本地指标
/**
* 根据名称获取指定累加器的本地value
* * @param name 累加器指标名称
* @return
*/
public long getLocalAccumulatorValue(String name) {
ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);
if (valueAccumulator == null) {
return 0;
}
return valueAccumulator.getLocal().getLocalValue();
}
3. 资源回收
/** 关闭线程池 */
public void close() {
if (scheduledExecutorService != null
&& !scheduledExecutorService.isShutdown()
&& !scheduledExecutorService.isTerminated()) {
scheduledExecutorService.shutdown();
}
}
三. 小结
我们大致了解了chunjun
- 在什么时机注册metric指标:在BaseRichInputFormat中的open方法中,在连接器消费数据前,进行相关metric的注册;
- chunjun提供了管理(注册、指标更新、等待metric同步等)metric的基类:BaseMetric;
- 周期获取全局metric:以便每个subtask进行metric的指标判断; 等metric管理能力。