一. yarn ResourceManager 的三种通信协议
- ResourceTrackerProtocol
NodeManager 和 ResourceManager 的 RPC 通信协议。其中 ResourceManager 充当RPC Server的角色,而 NodeManager 充当 RPC Client 的角色。NodeManager 通过该协议向 ResourceManager 注册、汇报节点健康情况以及 container 的运行状态,并接收 ResourceManager 下达的重新初始化、清理 container 等命令。NodeManager 周期性主动向 ResourceManager 发送请求,并领取 ResourceManager 下达给自己的命令 - ApplicationMasterProtocol
该协议中,ApplicationMaster 充当 RPC Client 角色,ResourceManager 充当RPC Server 的角色。应用程序的 ApplicationMaster 通过该协议向 角色,ResourceManager 注册、申请、释放资源 - ApplicationClientProtocol
该协议中,应用程序的客户端充当RPC Client的角色,而 ResourceManager 充当 RPC Server 的角色。客户端通过该RPC协议向 ResourceManager 提交应用程序、控制应用程序(如杀死job)以及查询应用程序状态等。yarn rest api 对应的服务端用这个协议处理的。
二. ResourceManager 的用户交互服务
ResourceManager 会开启多总类型的服务,比如管理 NodeManager 的服务,管理 ApplicationMaster 的服务, 还有这里的用户交互服务。用户交互服务有3个:
ClientRMService
负责普通用户交互
ClientRMService相当于一个RPC Server,是为普通用户提供的服务, 它处理来自客户端各种RPC请求, 比如提交应用程序、 终止应用程序、 获取应用程序运行状态. 它实现了 ApplicationClientProtocol 协议。获取 yarn 中执行完成的任务列表接口 “http://ip:5004/ws/v1/cluster/apps”,就是该类的 getApplications 方法实现的public class ClientRMService extends AbstractService implements ApplicationClientProtocol { /** * Get applications matching the {@link GetApplicationsRequest}. If * caseSensitive is set to false, applicationTypes in * GetApplicationRequest are expected to be in all-lowercase */ @Override public GetApplicationsResponse getApplications(GetApplicationsRequest request) throws YarnException { ... ... // 从 rmContext 中获取应用列表, 该方法返回一个 ConcurrentMap<ApplicationId, RMApp> // 所以获取执行 app 列表的方法只会从 ResourceManager 的内存中获取,不会访问 hdfs final Map<ApplicationId, RMApp> apps = rmContext.getRMApps(); Iterator<RMApp> appsIter = apps.values().iterator(); ... ... } }
yarn 的 rest api 都来自
WebServices
基类,它的其中一个子类 RMWebServices 负责接受 rest uri 的请求@Singleton @Path(RMWSConsts.RM_WEB_SERVICE_PATH) public class RMWebServices extends WebServices implements RMWebServiceProtocol { @GET @Path(RMWSConsts.APPS) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Override public AppsInfo getApps(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.STATE) String stateQuery, @QueryParam(RMWSConsts.STATES) Set<String> statesQuery, @QueryParam(RMWSConsts.FINAL_STATUS) String finalStatusQuery, @QueryParam(RMWSConsts.USER) String userQuery, @QueryParam(RMWSConsts.QUEUE) String queueQuery, @QueryParam(RMWSConsts.LIMIT) String limit, @QueryParam(RMWSConsts.STARTED_TIME_BEGIN) String startedBegin, @QueryParam(RMWSConsts.STARTED_TIME_END) String startedEnd, @QueryParam(RMWSConsts.FINISHED_TIME_BEGIN) String finishBegin, @QueryParam(RMWSConsts.FINISHED_TIME_END) String finishEnd, @QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> applicationTypes, @QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags, @QueryParam(RMWSConsts.NAME) String name, @QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) { ... ... } }
AdminService
负责和管理员用户交互WebApp
用来输出 web 页面