Flink 调度源码分析4:Physical Slot 分配过程

发布于:2024-05-23 ⋅ 阅读:(53) ⋅ 点赞:(0)

Flink 调度源码分析1:拓扑图创建与提交过程
Flink 调度源码分析2:调度过程
Flink 调度源码分析3:Shared Slot 分配策略
Flink 调度源码分析4:Physical Slot 分配过程

1 整体过程

在 SlotSharingExecutionSlotAllocator.allocateSlotsForVertices() 中,会检查共享组是否有 slot,如果没有的话,会在下一步使用 PhysicalSlotProvider 为其分配 slot。

// 检查共享组是否有 slot
Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =  
        tryAssignExistingSharedSlots(groupsToAssign);  
slots.putAll(assignedSlots);  
groupsToAssign.removeAll(assignedSlots.keySet());  
  
// 对没有 slot 的共享组分配 slot
if (!groupsToAssign.isEmpty()) {  
    Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots =  
            allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever);  
    slots.putAll(allocatedSlots);  
    groupsToAssign.removeAll(allocatedSlots.keySet());  
    // 所有的共享组一定有共享 slot    
    Preconditions.checkState(groupsToAssign.isEmpty());  
}

接下来查看 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 函数(注意这个函数,后面会多次提到这里)。
在这里插入图片描述

2 创建 slot 请求

2.1 获取 slot 配置

在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 中,会对每一个共享组执行下面代码。

// 使用 SharedSlotProfileRetriever 创建 slot 配置文件  
ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);  
SlotProfile slotProfile =  
        sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);

会得到一个 SlotProfile。SlotProfile 是 task 希望调度的 slot 的配置文件。配置文件包含资源或位置限制等属性,其中一些可能是硬限制,也可能是软限制。它还包含 physical slot 的资源信息,当 shared slot 没有可用的 physical slot 时,可使用这些信息分配 physical slot。可以生成一个 matcher,通过在 SlotContext 中对 SlotProfile 以及其他要求进行匹配,筛选出候选 slot。
SlotProfile 包含下面这些属性。

/** This specifies the desired resource profile for the task slot. */  
private final ResourceProfile taskResourceProfile;  
/** This specifies the desired resource profile for the physical slot to host this task slot. */  
private final ResourceProfile physicalSlotResourceProfile;  
/** This specifies the preferred locations for the slot. */  
private final Collection<TaskManagerLocation> preferredLocations;  
/** This contains desired allocation ids of the slot. */  
private final Collection<AllocationID> preferredAllocations;  
/** This contains all reserved allocation ids from the whole execution graph. */  
private final Set<AllocationID> reservedAllocations;
  • taskResourceProfile 和 physicalSlotResourceProfile 是配置,两个一般是相等的。
  • preferredLocations 表示期望得到哪个 taskmanager 的 slot。
  • preferredAllocations 表示希望得到哪个 AllocationID,reservedAllocations存储了已经被分配的 reservedAllocations。
    AllocationID:JobManager 已分配 physical slot 的唯一标识符。该 ID 在 JobManager 首次请求 slot 时分配,并在重新分配时保持不变。JobManager 和 ResourceManager 使用此 ID 来跟踪和同步哪些 slot 分配给了哪个 TaskManager,哪些是空闲的。与 AllocationID 不同,SlotRequestId 用于任务从 SlotPool 请求 logical slot 时。多个 SlotRequestId 可以映射到一个 AllocationID(由于槽共享)。
    然后看看 sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile) 做了什么。
public SlotProfile getSlotProfile(  
        ExecutionSlotSharingGroup executionSlotSharingGroup,  
        ResourceProfile physicalSlotResourceProfile) {  
    Collection<AllocationID> priorAllocations = new HashSet<>();  
    Collection<TaskManagerLocation> preferredLocations = new ArrayList<>();  
    for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {  
        priorAllocationIdRetriever.apply(execution).ifPresent(priorAllocations::add);  
        preferredLocations.addAll(  
                preferredLocationsRetriever.getPreferredLocations(  
                        execution, producersToIgnore));  
    }  
    // 创建 SlotProfile
    return SlotProfile.priorAllocation(  
            physicalSlotResourceProfile,  
            physicalSlotResourceProfile,  
            preferredLocations,  // 指定 slot 位置的选择
            priorAllocations,  
            reservedAllocationIds);  
}

2.2 slot 优先位置

怎么确定 SlotProfile 中的 preferredLocations 参数的值?
位置的确定涉及两种接口:StateLocationRetriever 和 InputsLocationsRetriever。通过这两种获取优先部署位置。StateLocationRetriever 会获取每个执行节点的状态所在的位置。InputsLocationsRetriever 会获取当前节点的输入的所在位置。这两个逻辑在 SchedulerBase 构造函数中创建:

stateLocationRetriever =  // StateLocationRetriever 是只有一个方法的接口,所以这直接通过lambda函数创建实例
        executionVertexId ->  
                getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();  
inputsLocationsRetriever =  // 类为 ExecutionGraphToInputsLocationsRetrieverAdapter
        new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);

在 SlotSharingExecutionSlotAllocatorFactory.createInstance() 中制定了优先位置检索器。

SyncPreferredLocationsRetriever preferredLocationsRetriever =  
        new DefaultSyncPreferredLocationsRetriever(context, context);

查看怎么决定优先位置的代码:

MergingSharedSlotProfileRetriever.getSlotProfile()
->  preferredLocationsRetriever.getPreferredLocations(execution, producersToIgnore)
	->  asyncPreferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore) // 这里虽然写着 async,但其实是同步的,也就是必须这个函数运行成功,才会执行下一步。也就是说这个位置必须是立即可用的,否则就不能用。
		->  getPreferredLocationsBasedOnInputs(executionVertexId, producersToIgnore)

getPreferredLocationsBasedOnInputs() 中的代码如下:

private CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(  
        final ExecutionVertexID executionVertexId,  
        final Set<ExecutionVertexID> producersToIgnore) {  
  
    CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =  
            CompletableFuture.completedFuture(Collections.emptyList());  
  
    final Collection<ConsumedPartitionGroup> consumedPartitionGroups =  
            inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);  
    for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) {  
        // 为了避免太过分散,如果上游算子过多,则不获取它们的位置           
        if (consumedPartitionGroup.getConsumerVertexGroup().size()  
                > MAX_DISTINCT_CONSUMERS_TO_CONSIDER) {  
            continue;  
        }  

		// 获取上游节点的位置
        final Collection<CompletableFuture<TaskManagerLocation>> locationsFutures =  
                getInputLocationFutures(  
                        producersToIgnore,  
                        inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(  
                                consumedPartitionGroup));  
  
        preferredLocations = combineLocations(preferredLocations, locationsFutures);  
    }  
    return preferredLocations;  
}

这里返回的 preferredLocations 最终会传递给 SlotProfile。

2.3 slot 请求

下一步需要创建 PhysicalSlotRequest:

PhysicalSlotRequest request =  
        new PhysicalSlotRequest(  
                physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);

PhysicalSlotRequest 包含以下内容:

private final SlotRequestId slotRequestId;  
private final SlotProfile slotProfile;  
private final boolean slotWillBeOccupiedIndefinitely;  // jobType == JobType.STREAMING

3 分配 physical slot

在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 通过下面的代码分配 slot。

Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult =  
        slotProvider.allocatePhysicalSlots(slotRequests);

这里的 slotProvider 是 PhysicalSlotProvider 类。在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中创建了 PhysicalSlotProvider。可以看到它实际是个 PhysicalSlotProviderImpl。

final PhysicalSlotProvider physicalSlotProvider =  
        new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);

在 slotProvider.allocatePhysicalSlots() 中尝试为每一个 slot 分配请求执行如下代码:

// 尝试从可用的 slots 中,为每个请求分配一个 physicalSlotMap<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots =  
        tryAllocateFromAvailable(physicalSlotRequestsById.values());

在 tryAllocateFromAvailable() 尝试为每一个 slot 请求分配一个 physical slot:

for (PhysicalSlotRequest request : slotRequests) {  
    // 使用 SlotSelectionStrategy 获取 slot    
    Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot =  
            slotSelectionStrategy.selectBestSlotForProfile(  
                    freeSlotInfoTracker, request.getSlotProfile());

这里是根据 slotSelectionStrategy 选择 slot 的。slotSelectionStrategy 的值在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中指定:

final SlotSelectionStrategy slotSelectionStrategy =  
        SlotSelectionStrategyUtils.selectSlotSelectionStrategy(  
                jobType, jobMasterConfiguration);

这里是根据配置文件选择到底使用哪个策略。

cluster.evenly-spread-out-slot(EVENLY_SPREAD_OUT_SLOTS_STRATEGY) 为 True:// 默认为 false
	slotSelectionStrategy = EvenlySpreadOutLocationPreferenceSlotSelectionStrategy  // 均匀分布
否则:
	slotSelectionStrategy = DefaultLocationPreferenceSlotSelectionStrategy
  1. DefaultLocationPreferenceSlotSelectionStrategy
    选择 slot 的代码如下:
    protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(  
    		@Nonnull FreeSlotInfoTracker freeSlotInfoTracker,  
    		@Nonnull ResourceProfile resourceProfile) {  
    	for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) {  
    		SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId);  
    		if (candidate.getResourceProfile().isMatching(resourceProfile)) {  
    			return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED));  
    		}  
    	}  
    	return Optional.empty();  
    }
    
    从所有可用的 slot 里顺序选择一个,只有满足资源需求,就直接分配。这样做,容易造成分配的 slot 集中在某几个 TaskManager 上。好处是可以减少不同 TaskManager 之间的通信代价,坏处是不能平衡各个 TaskManager 之间的资源利用率。
  2. EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
    选择 slot 的代码如下:
    protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(  
            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,  
            @Nonnull ResourceProfile resourceProfile) {  
        return freeSlotInfoTracker.getAvailableSlots().stream()  
                .map(freeSlotInfoTracker::getSlotInfo)  
                // 过滤掉不满足资源要求的 slot
                .filter(slotInfo -> slotInfo.getResourceProfile().isMatching(resourceProfile))  
                // 获取每个 slot 的资源利用率
                .map(  
                        slot ->  
                                new Tuple2<>(  
                                        slot, freeSlotInfoTracker.getTaskExecutorUtilization(slot)))  
                // 找到资源利用率最小的 slot
                .min(Comparator.comparingDouble(tuple -> tuple.f1))  
                .map(  
                        slotInfoWithTaskExecutorUtilization ->  
                                SlotInfoAndLocality.of(  
                                        slotInfoWithTaskExecutorUtilization.f0,  
                                        Locality.UNCONSTRAINED));  
    }
    
    从所有满足资源要求的 slot,找到资源利用率最小的 slot,并分配该 slot。这样 slot 分配在各个 TaskManager 之间近似平均。好处是能平衡各个 TaskManager 之间的资源利用率,坏处是不同 TaskManager 之间的通信代价可能较大。

网站公告

今日签到

点亮在社区的每一天
去签到