RPC - 客户端注册和发现模块

发布于:2025-06-23 ⋅ 阅读:(16) ⋅ 点赞:(0)

registryMethod 函数详解:

函数目的

registryMethod 是 Provider 类的核心方法,用于向服务注册中心注册服务。注册成功后,服务注册中心会更新内部的服务映射表,建立服务名称到提供者地址的映射关系。

执行流程示例

场景: 多米诺注册芝士披萨服务

// 多米诺披萨店注册芝士披萨服务
Provider::ptr provider = std::make_shared<Provider>(requestor);
BaseConnection::ptr conn = ConnectionFactory::createConnection("127.0.0.1", 8000);
Address pizzaHost("192.168.1.101", 9001);
bool success = provider->registryMethod(conn, "芝士披萨", pizzaHost);
  1. 创建服务注册请求消息:
   auto msg_req = MessageFactory::create<ServiceRequest>();
   msg_req->setId(UUID::uuid());
   msg_req->setMType(MType::REQ_SERVICE);
   msg_req->setMethod("芝士披萨");
   msg_req->setHost({"192.168.1.101", 9001});
   msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);

发送请求并等待响应:

   BaseMessage::ptr msg_rsp;
   bool ret = _requestor->send(conn, msg_req, msg_rsp);

服务注册中心处理请求(内部过程,不在当前函数中):

   // 服务注册中心内部的处理逻辑
   void PDManager::onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {
       if (msg->optype() == ServiceOptype::SERVICE_REGISTRY) {
           std::string method = msg->method();
           Address host = msg->host();
           
           // 更新映射表:将服务方法与提供者关联
           _providers->addProvider(conn, host, method);
           
           // 映射表变化:
           // 之前: _method_providers["芝士披萨"] = [...]
           // 之后: _method_providers["芝士披萨"] = [..., {"192.168.1.101", 9001}]
           
           // 通知所有关注此服务的客户端
           _discoverers->onlineNotify(method, host);
           
           // 发送响应
           ServiceResponse::ptr rsp = MessageFactory::create<ServiceResponse>();
           rsp->setRcode(RCode::RCODE_OK);
           conn->sendMessage(rsp);
       }
       // ...其他操作类型处理...
   }

处理响应:

   auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
   if (service_rsp.get() == nullptr) {
       ELOG("响应类型向下转换失败!");
       return false;
   }

检查响应状态

   if (service_rsp->rcode() != RCode::RCODE_OK) {
       ELOG("服务注册失败,原因:%s", errReason(service_rsp->rcode()).c_str());
       return false;
   }

注册成功,映射表更新完成:

   return true;

映射表变化详解

  1. 注册前的映射表状态:
   _method_providers = {
       "夏威夷披萨": [{"192.168.1.102", 9001}],
       "素食披萨": [{"192.168.1.103", 9001}]
   }

注册后的映射表状态:

   _method_providers = {
       "夏威夷披萨": [{"192.168.1.102", 9001}],
       "素食披萨": [{"192.168.1.103", 9001}],
       "芝士披萨": [{"192.168.1.101", 9001}]  // 新增映射
   }

如果是已有服务新增提供者:

   // 如果另一家披萨店也注册提供芝士披萨
   _method_providers = {
       "夏威夷披萨": [{"192.168.1.102", 9001}],
       "素食披萨": [{"192.168.1.103", 9001}],
       "芝士披萨": [{"192.168.1.101", 9001}, {"192.168.1.104", 9002}]  // 添加新提供者
   }

注册成功后,服务注册中心还会通过 onlineNotify 通知所有关注"芝士披萨"服务的客户端,有新的服务提供者上线,客户端的 Discoverer 会更新本地缓存的服务提供者列表。

生活中的类比

新餐厅在美食外卖平台注册

想象一下,registryMethod 就像一家新开的餐厅(服务提供者)在美团或饿了么这样的外卖平台(服务注册中心)上注册自己的特色菜品(服务)。

场景:小王的川菜馆注册麻婆豆腐服务

现实生活中的流程:

小王开了一家川菜馆,想在外卖平台上注册提供麻婆豆腐。

小王登录外卖平台商家端,填写注册表单:

  • 店铺名称:小王川菜馆
  • 地址:北京市海淀区中关村大街128号
  • 特色菜品:麻婆豆腐
  • 联系电话:010-12345678

小王提交注册申请,等待平台审核。

外卖平台处理注册请求:

  • 验证小王提供的信息
  • 将"麻婆豆腐"与"小王川菜馆"关联起来
  • 更新平台的菜品目录

注册成功,平台向小王发送确认消息。

平台更新用户端,现在用户搜索"麻婆豆腐"时,可以看到小王川菜馆。

代码中对应的流程

// 小王川菜馆注册麻婆豆腐服务
Provider::ptr provider = std::make_shared<Provider>(requestor);
BaseConnection::ptr conn = ConnectionFactory::createConnection("delivery-platform.com", 8000);
Address restaurantAddress("北京市海淀区中关村大街128号", 12345678); // 地址和电话
bool success = provider->registryMethod(conn, "麻婆豆腐", restaurantAddress);

映射表变化:

注册前的外卖平台菜品目录:

   菜品目录 = {
       "宫保鸡丁": ["老张川菜馆", "蜀香小馆"],
       "水煮鱼": ["蜀香小馆", "渝味轩"]
   }

注册后的外卖平台菜品目录

   菜品目录 = {
       "宫保鸡丁": ["老张川菜馆", "蜀香小馆"],
       "水煮鱼": ["蜀香小馆", "渝味轩"],
       "麻婆豆腐": ["小王川菜馆"]  // 新增映射
   }

用户体验变化:

  • 之前用户搜索"麻婆豆腐"时,找不到提供这道菜的餐厅
  • 注册成功后,用户搜索"麻婆豆腐"时,会看到"小王川菜馆"
  • 平台会向已经收藏"麻婆豆腐"的用户推送通知:"小王川菜馆现在提供麻婆豆腐啦!"

MethodHost 类详解:

类目的

MethodHost 类是一个服务地址管理器,负责存储和管理能够提供特定服务的所有主机地址,并提供负载均衡功能。它是服务发现机制中的核心组件,用于维护服务提供者列表并支持动态更新。

方法详解与执行流程

场景1: 麻辣烫服务新增提供者

当"老王麻辣烫"店铺上线,开始提供麻辣烫服务:

// 服务注册中心收到上线通知后
MethodHost::ptr methodHost = _method_hosts["麻辣烫"];
methodHost->appendHost({"192.168.1.105", 8080});  // 老王麻辣烫的地址

执行步骤:

  1. 获取互斥锁保护共享数据:std::unique_lock<std::mutex> lock(_mutex)
  1. 添加新主机到列表:_hosts.push_back({"192.168.1.105", 8080})

映射表变化

   之前: _hosts = [{"192.168.1.101", 8080}]  // 只有小李麻辣烫
   之后: _hosts = [{"192.168.1.101", 8080}, {"192.168.1.105", 8080}]  // 增加了老王麻辣烫

场景2: 麻辣烫服务提供者下线

当"小李麻辣烫"暂停营业,需要从服务列表中移除:

// 服务注册中心检测到连接断开后
MethodHost::ptr methodHost = _method_hosts["麻辣烫"];
methodHost->removeHost({"192.168.1.101", 8080});  // 小李麻辣烫的地址

执行步骤:

  1. 获取互斥锁保护共享数据:std::unique_lock<std::mutex> lock(_mutex)
  1. 遍历主机列表查找匹配地址:
   for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {
       if (*it == {"192.168.1.101", 8080}) {
           _hosts.erase(it);
           break;
       }
   }

映射表变化:

   之前: _hosts = [{"192.168.1.101", 8080}, {"192.168.1.105", 8080}]
   之后: _hosts = [{"192.168.1.105", 8080}]  // 只剩下老王麻辣烫
  1. 现在客户端调用chooseHost()时,只会返回老王麻辣烫的地址

生活中的例子

想象一个外卖平台的"麻辣烫"分类页面:

初始状态:

  • 页面显示3家麻辣烫店铺:小李、老王、小张
  • 平台后台维护一个麻辣烫店铺列表:[小李, 老王, 小张]

新店开业(appendHost):

  • 小赵开了一家新麻辣烫店并在平台注册
  • 平台将小赵的店添加到列表:[小李, 老王, 小张, 小赵]
  • 用户刷新页面,现在能看到4家麻辣烫店铺

店铺暂停营业(removeHost):

  • 小张的店因装修暂停营业
  • 平台将小张的店从列表移除:[小李, 老王, 小赵]
  • 用户刷新页面,现在只能看到3家麻辣烫店铺

负载均衡(chooseHost):

  • 当用户点击"随机推荐一家"按钮时
  • 平台会轮流推荐列表中的店铺,确保每家店获得平等的展示机会

MethodHost类就像这个平台的后台管理系统,动态维护服务提供者列表,并在客户端请求时提供负载均衡的选择。

Discoverer 类详解:

类目的

Discoverer 类是RPC框架中的服务发现组件,负责查找和管理可用的服务提供者。它的主要职责是:

  • 发现并获取能提供特定服务的主机地址
  • 缓存服务提供者信息,提高查询效率
  • 处理服务上线和下线通知,动态更新服务提供者列表
  • 提供负载均衡功能,在多个服务提供者之间分配请求

核心功能

服务发现(serviceDiscovery方法):

  • 查找能提供特定服务的主机
  • 先检查本地缓存,如无则向服务注册中心查询
  • 将查询结果缓存起来供后续使用

服务状态变更处理(onServiceRequest方法):

  • 处理服务上线通知,添加新的服务提供者
  • 处理服务下线通知,移除不可用的服务提供者
  • 调用下线回调函数处理相关资源清理

生活中的例子

Discoverer类就像一个智能外卖APP:

查找服务(serviceDiscovery):

  • 用户想点一份特定菜品(如"麻辣香锅")
  • APP先检查缓存,看是否有最近浏览过的提供该菜品的餐厅
  • 如果没有,则向服务器查询提供"麻辣香锅"的餐厅列表
  • 获取列表后,选择一家餐厅(负载均衡)并缓存整个列表

处理餐厅状态变更(onServiceRequest):

  • 新餐厅上线:APP收到通知"新餐厅'川香阁'开始提供麻辣香锅",更新列表
  • 餐厅下线:APP收到通知"'香辣坊'暂停营业",从列表中移除并取消相关订单

关键组件

_method_hosts:服务方法到主机列表的映射表

  • 键:服务方法名(如"用户认证"、"支付处理")
  • 值:提供该服务的主机列表(MethodHost对象)

_requestor:负责发送请求和接收响应的组件

_offline_callback:服务下线时的回调函数,用于处理资源清理

_mutex:互斥锁,保护在多线程环境下对共享数据的访问

总结

Discoverer类是分布式系统中服务发现的核心组件,它使客户端能够:

  • 动态发现可用的服务提供者
  • 在多个提供者之间实现负载均衡
  • 实时感知服务提供者的变化
  • 通过本地缓存提高查询效率

这使得系统具有高可用性、可扩展性和弹性,服务提供者可以动态加入或离开系统,而客户端能够自动适应这些变化。

serviceDiscovery 方法详解:

方法目的

serviceDiscovery 是 Discoverer 类的核心方法,用于发现和获取能提供特定服务的主机地址。它首先检查本地缓存,如果没有找到则向服务注册中心发起查询,并将结果缓存起来以供后续使用。

执行流程示例

场景: 用户查找附近的奶茶店

想象一个用户想要点一杯奶茶,使用APP查找附近提供奶茶服务的店铺:

// 用户APP初始化
Discoverer::ptr discoverer = std::make_shared<Discoverer>(requestor, offlineCallback);
BaseConnection::ptr conn = ConnectionFactory::createConnection("service-center.com", 8000);
Address milkTeaShop;
bool found = discoverer->serviceDiscovery(conn, "奶茶", milkTeaShop);

执行步骤:

  1. 检查本地缓存:
   {
       std::unique_lock<std::mutex> lock(_mutex);
       auto it = _method_hosts.find("奶茶");
       if (it != _method_hosts.end()) {
           if (it->second->empty() == false) {
               milkTeaShop = it->second->chooseHost();
               return true;  // 成功找到缓存的奶茶店地址
           }
       }
   }
  • 如果APP之前已经查询过奶茶店,会直接从缓存返回一个地址
  • 类似用户打开APP时,显示上次浏览过的奶茶店

缓存未命中,创建服务发现请求:

   auto msg_req = MessageFactory::create<ServiceRequest>();
   msg_req->setId(UUID::uuid());
   msg_req->setMType(MType::REQ_SERVICE);
   msg_req->setMethod("奶茶");
   msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);
  • 类似用户在APP搜索框输入"奶茶"并点击搜索
  • APP向服务器发送查询请求

发送请求并等待响应:

   BaseMessage::ptr msg_rsp;
   bool ret = _requestor->send(conn, msg_req, msg_rsp);
   if (ret == false) {
       ELOG("服务发现失败!");
       return false;
   }
  • APP向服务中心发送请求
  • 如果网络问题导致请求失败,显示"搜索失败,请检查网络"

处理响应:

   auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
   if (!service_rsp) {
       ELOG("服务发现失败!响应类型转换失败!");
       return false;
   }
   if (service_rsp->rcode() != RCode::RCODE_OK) {
       ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());
       return false;
   }
  • 检查服务器返回的响应是否有效
  • 类似APP收到服务器响应后检查数据格式是否正确

更新本地缓存并返回结果:

   std::unique_lock<std::mutex> lock(_mutex);
   auto method_host = std::make_shared<MethodHost>(service_rsp->hosts());
   if (method_host->empty()) {
       ELOG("%s 服务发现失败!没有能够提供服务的主机!", method.c_str());
       return false;
   }
   milkTeaShop = method_host->chooseHost();
   _method_hosts["奶茶"] = method_host;
   return true;
  • 将服务器返回的奶茶店列表保存到本地缓存
  • 从列表中选择一家奶茶店返回给用户
  • 下次用户再查询奶茶店时可以直接从缓存获取

生活中的例子

想象小明想喝奶茶,但不知道附近有哪些奶茶店:

小明先检查记忆(检查本地缓存):

  • 小明回忆上次是在"一点点"买的奶茶
  • 如果记得路线,他可以直接去那里(缓存命中)
  • 如果忘记了或想尝试新店,需要查询(缓存未命中)

小明问路人(发送服务发现请求):

  • "请问附近有奶茶店吗?"
  • 等待路人回答(等待响应)

路人回答(处理响应):

  • "有啊,附近有'一点点'、'CoCo'和'喜茶'"
  • 小明记下这些店的位置(更新缓存)

选择一家店(chooseHost):

  • 小明决定去"CoCo"(负载均衡选择)
  • 下次想喝奶茶时,他已经知道三家店的位置(缓存有效)

这个方法通过本地缓存优化了服务发现的效率,避免了每次都需要向服务中心查询,同时通过动态更新保证了服务信息的及时性。

onServiceRequest 方法详解:

方法目的

onServiceRequest 是 Discoverer 类的方法,用于处理服务状态变更通知,包括服务上线和下线。它是服务动态发现机制的核心,确保客户端能够及时感知服务提供者的变化。

执行流程示例

场景1: 新的披萨店开始提供服务

想象一个新的披萨店"意式风情"开业,开始提供披萨服务:

// 服务注册中心向所有关注披萨服务的客户端发送通知
ServiceRequest::ptr msg = MessageFactory::create<ServiceRequest>();
msg->setMethod("披萨");
msg->setHost({"192.168.1.110", 8080});  // 意式风情披萨店地址
msg->setOptype(ServiceOptype::SERVICE_ONLINE);

// 对每个关注披萨服务的客户端
discoverer->onServiceRequest(conn, msg);

执行步骤:

判断操作类型:

   auto optype = msg->optype();  // SERVICE_ONLINE
   std::string method = msg->method();  // "披萨"
  • 确认这是一个服务上线通知
  • 获取服务方法名称为"披萨"

加锁保护共享数据:

   std::unique_lock<std::mutex> lock(_mutex);
  • 确保在多线程环境下安全访问和修改数据

处理上线通知:

   if (optype == ServiceOptype::SERVICE_ONLINE) {
       auto it = _method_hosts.find("披萨");
       if (it == _method_hosts.end()) {
           // 如果这是第一家提供披萨服务的店
           auto method_host = std::make_shared<MethodHost>();
           method_host->appendHost({"192.168.1.110", 8080});
           _method_hosts["披萨"] = method_host;
       } else {
           // 如果已有其他披萨店,将新店添加到列表
           it->second->appendHost({"192.168.1.110", 8080});
       }
   }
  • 检查是否已有披萨服务的提供者列表
  • 如果没有,创建新列表并添加"意式风情"披萨店
  • 如果已有,将"意式风情"披萨店添加到现有列表

更新后的映射表:

   _method_hosts = {
       "汉堡": [...],
       "披萨": [..., {"192.168.1.110", 8080}]  // 添加了新的披萨店
   }

场景2: 披萨店暂停营业

想象"意式风情"披萨店因设备故障暂停营业:

// 服务注册中心向所有关注披萨服务的客户端发送通知
ServiceRequest::ptr msg = MessageFactory::create<ServiceRequest>();
msg->setMethod("披萨");
msg->setHost({"192.168.1.110", 8080});
msg->setOptype(ServiceOptype::SERVICE_OFFLINE);

// 对每个关注披萨服务的客户端
discoverer->onServiceRequest(conn, msg);

执行步骤:

判断操作类型:

   auto optype = msg->optype();  // SERVICE_OFFLINE
   std::string method = msg->method();  // "披萨"
  • 确认这是一个服务下线通知
  • 获取服务方法名称为"披萨"

加锁保护共享数据:

   std::unique_lock<std::mutex> lock(_mutex);

处理下线通知:

   if (optype == ServiceOptype::SERVICE_OFFLINE) {
       auto it = _method_hosts.find("披萨");
       if (it == _method_hosts.end()) {
           return;  // 没有找到披萨服务提供者列表,直接返回
       }
       it->second->removeHost({"192.168.1.110", 8080});  // 从列表中移除
       _offline_callback({"192.168.1.110", 8080});  // 调用下线回调
   }
  • 查找披萨服务的提供者列表
  • 从列表中移除"意式风情"披萨店
  • 调用下线回调函数,可能用于清理与该店相关的连接或缓存
   _method_hosts = {
       "汉堡": [...],
       "披萨": [...]  // 移除了"意式风情"披萨店
   }

生活中的例子

想象一个使用外卖APP的用户:

新餐厅上线通知(SERVICE_ONLINE):

  • 用户订阅了披萨类别的推送
  • 一家新的披萨店"意式风情"在APP上线
  • APP推送通知:"新店上线!意式风情披萨店开业啦"
  • APP更新披萨分类页面,添加新店信息
  • 用户下次浏览披萨分类时,能看到这家新店

餐厅下线通知(SERVICE_OFFLINE):

  • "意式风情"披萨店因设备故障暂停营业
  • APP收到下线通知
  • APP更新披萨分类页面,移除该店或标记为"暂停营业"
  • 如果用户之前有未完成的订单,APP可能会通知用户:"您的订单餐厅暂停营业,建议取消订单"(类似_offline_callback的功能)

这个方法确保了客户端能够实时感知服务提供者的变化,提供了动态服务发现的能力,使系统更加灵活和可靠。


    网站公告

    今日签到

    点亮在社区的每一天
    去签到