- gRPC(Google Remote Procedure Call)是 Google 开源的一种高性能、通用的开源 RPC 框架。它基于 HTTP/2 协议、使用 Protocol Buffers(protobuf)作为接口定义语言(IDL)和序列化机制,支持多种语言。
- RPC(Remote Procedure Call):远程过程调用,允许客户端像调用本地函数一样调用远程服务器上的函数。
- IDL(接口定义语言):使用 .proto 文件定义服务、方法、消息类型等。
- Protocol Buffers:Google 提供的一种轻量高效的结构化数据序列化方式,比 JSON、XML 更紧凑快速。
- 架构:
Client <-> Stub <-> Channel <-> 网络 <-> Server <-> Handler
,Client Stub:本地代理,封装了 RPC 的调用过程。Channel:客户端到服务端的连接。Server:服务端实现了服务的接口。Handler:处理具体业务逻辑的函数。 - 工作原理:
- 开发者编写 .proto 文件定义服务和消息结构。
- 使用 protoc 编译器生成多语言代码(如 Python、C++、Go)。
- 客户端通过 Stub 调用远程方法,参数以二进制形式发送。
- 服务端接收到请求,调用对应的服务处理逻辑。
- 返回结果通过同样的方式返回给客户端。
// gRPC 的完整流程(以 Go 为例,其他语言相似)
[1] 编写 proto 文件(定义服务)
↓
[2] 使用 protoc 生成服务端和客户端代码
↓
[3] 服务端实现业务逻辑
↓
[4] 客户端调用 gRPC 接口
↓
[5] 服务端部署 + 启动监听端口
↓
[6] 客户端连接并远程调用成功
- 在 gRPC 中,开发者首先通过 .proto 文件定义服务接口和消息结构。
- 客户端和服务端都使用该文件生成代码,然后分别构造请求和实现逻辑。
- 调用时,gRPC 框架会自动将结构体参数序列化为 ProtoBuf 二进制格式,通过 HTTP/2 传输到服务端,服务端反序列化后调用实际函数,最终返回响应并反序列化给客户端。
.proto 文件是用来定义你要通信的数据结构和接口的“说明书”。
就像你在写:
- 什么函数可以调用(服务名 + 方法名)
- 每个函数的参数和返回值长什么样(结构体)
然后 gRPC 根据 .proto 文件自动帮你生成:
- Go 的 struct(对应 message)
- 接口(对应 service)
- gRPC 通信代码(打包解包、序列化反序列化)
gRPC 中 .proto 文件是客户端与服务端之间共享的“契约”。客户端必须拿到这个文件,通过 protoc 生成 stub,才能正确发起调用。这是 gRPC 能实现跨语言、高效通信的核心所在。
- .proto 文件定义了服务的“抽象语义”(函数签名 + 数据结构),然后用工具生成各语言的代码框架,最终由开发者写上具体逻辑,整个服务就能跨语言运行并返回结果了。
- .proto 文件是用来描述“服务接口”和“数据结构”的中立语言规范,它是 gRPC 通信的基础。它定义了你要传什么数据、调用什么函数,用来生成跨语言代码框架。
- 有了 .proto 文件,我们可以自动生成 gRPC 通信代码,客户端就能像调用本地函数一样,跨进程、跨机器调用服务端的函数,gRPC 框架会自动完成底层传输。
[.proto 文件]
↓(protoc 编译)
[生成客户端/服务端代码]
↓
[你写服务端实现 + 启动 gRPC Server]
↓
[你写客户端调用 + 建立 gRPC 连接]
↓
[客户端像本地一样调用方法 → gRPC框架序列化请求 → HTTP/2 发送]
↓
[服务端解包 → 调用函数 → 返回结果 → 自动回传客户端]
// doc.proto
syntax = "proto3";
package types;
message Keyword {
string Field = 1;
string Word = 2;
}
message Document {
string Id = 1; //业务使用的唯一Id,索引上此Id不会重复
uint64 IntId = 2; //倒排索引上使用的文档id(业务侧不用管这个字段)
uint64 BitsFeature = 3; //每个bit都表示某种特征的取值
repeated Keyword Keywords = 4; //倒排索引的key
bytes Bytes = 5; //业务实体序列化之后的结果
}
// go install github.com/gogo/protobuf/protoc-gen-gogofaster
// protoc --gogofaster_out=./types --proto_path=./types doc.proto
// term_query.proto
syntax = "proto3";
package types;
import "types/proto/doc.proto";
message TermQuery {
Keyword Keyword = 1; //Keyword类型引用自doc.proto
repeated TermQuery Must = 2;
repeated TermQuery Should = 3;
}
// protoc -I=C:/Users/jmh00/GolandProjects/criker-search --gogofaster_out=./types/term_query --proto_path=./types/term_query term_query.proto
// 在windows上-I需使用绝对路径
// index.proto
syntax = "proto3";
package index_service;
// 从-I指定的目录下寻找该proto文件
import "types/proto/doc.proto";
import "types/proto/term_query.proto";
message DocId {
string DocId = 1;
}
message AffectedCount {
int32 Count = 1;
}
message SearchRequest {
types.TermQuery Query = 1; //TermQuery类型引用自term_query.proto
uint64 OnFlag = 2;
uint64 OffFlag = 3;
repeated uint64 OrFlags = 4; //repeated 这个字段是一个数组(列表),可以出现 0 次、1 次或多次。
}
message SearchResult {
repeated types.Document Results = 1;
}
message CountRequest {
}
service IndexService {
rpc DeleteDoc(DocId) returns (AffectedCount);
rpc AddDoc(types.Document) returns (AffectedCount);
rpc Search(SearchRequest) returns (SearchResult);
rpc Count(CountRequest) returns (AffectedCount);
}
// protoc -I=C:/Users/jmh00/GolandProjects/criker-search --gogofaster_opt=Mdoc.proto=C:/Users/jmh00/GolandProjects/criker-search/types --gogofaster_opt=Mterm_query.proto=C:/Users/jmh00/GolandProjects/criker-search/types --gogofaster_out=plugins=grpc:./index_service --proto_path=./index_service/proto index.proto
// 在windows上-I需使用绝对路径
// --gogofaster_opt=M指示了.proto里的import转到.go里该怎么写,比如.proto里写import "doc.proto",转到.go里就应该写 import "github.com/Orisun/radic/v2/types"
// -I和--gogofaster_opt=M可以有多个
- 生成代码
protoc \
-I=C:/Users/jmh00/GolandProjects/criker-search \
--gogofaster_opt=Mdoc.proto=C:/Users/jmh00/GolandProjects/criker-search/types \
--gogofaster_opt=Mterm_query.proto=C:/Users/jmh00/GolandProjects/criker-search/types \
--gogofaster_out=plugins=grpc:./index_service \
--proto_path=./index_service/proto \
index.proto
- 实现服务端
// index_service.go
package index_service
import (
"context"
"fmt"
"github.com/jmh000527/criker-search/index_service/service_hub"
"github.com/jmh000527/criker-search/types"
"github.com/jmh000527/criker-search/utils"
"strconv"
"time"
)
const (
IndexService = "index_service"
)
// IndexServiceWorker 代表一个gRPC服务器,负责处理索引相关的服务请求。
// 它包括正排索引和倒排索引的管理,以及与服务注册中心的交互。
type IndexServiceWorker struct {
Indexer *LocalIndexer // 正排索引和倒排索引的组合,用于处理文档的索引和搜索
hub service_hub.ServiceHub // 服务注册和发现相关的配置,负责服务的注册、注销和发现
selfAddr string // 当前服务实例的地址,用于注册到服务中心和服务发现
}
// Init 初始化索引服务。
// 该方法负责初始化IndexServiceWorker的索引管理器,并设置相关的数据库类型和数据目录。
//
// 参数:
// - DocNumEstimate: 预计文档数量,用于初始化倒排索引。
// - dbtype: 数据库类型,决定使用哪种数据库存储索引数据。
// - DataDir: 数据目录,数据库文件存放的路径。
//
// 返回值:
// - error: 如果初始化过程中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Init(DocNumEstimate int, dbtype int, DataDir string) error {
// 创建一个新的Indexer实例
w.Indexer = new(LocalIndexer)
// 初始化Indexer实例,并传递文档数量估计、数据库类型和数据目录
return w.Indexer.Init(DocNumEstimate, dbtype, DataDir)
}
// RegisterService 注册服务到etcd。如果提供了etcdServers,则创建EtcdServiceHub并注册服务。
// 如果etcdServers为空,则表示使用单机模式,不进行服务注册。
//
// 参数:
// - etcdServers: etcd服务器地址列表。如果为空,则表示不进行服务注册。
// - servicePort: 服务端口号。必须大于1024。
//
// 返回值:
// - error: 如果传入的端口号无效或服务注册过程中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) RegisterService(etcdServers []string, servicePort int) error {
// 检查是否需要注册服务到etcd
if len(etcdServers) > 0 {
// 验证服务端口号是否合法
if servicePort <= 1024 {
return fmt.Errorf("无效的服务端口号 %d,服务端口必须大于1024", servicePort)
}
// 获取本地IP地址
localIP, err := utils.GetLocalIP()
if err != nil {
return fmt.Errorf("获取本地IP地址失败: %v", err)
}
// 单机模式下,将本地IP写死为127.0.0.1
localIP = "127.0.0.1"
w.selfAddr = localIP + ":" + strconv.Itoa(servicePort)
// 设置心跳频率
var heartbeatFrequency int64 = 3
// 获取EtcdServiceHub实例(单例模式)
hub := service_hub.GetServiceHub(etcdServers, heartbeatFrequency)
// 注册服务到etcd,初始时租约ID为0
leaseID, err := hub.RegisterService(IndexService, w.selfAddr, 0)
if err != nil {
return fmt.Errorf("服务注册失败: %v", err)
}
// 设置hub
w.hub = hub
// 启动一个协程,定期续约服务租约
go func() {
for {
_, err := hub.RegisterService(IndexService, w.selfAddr, leaseID)
if err != nil {
utils.Log.Printf("续约服务租约失败,租约ID: %v, 错误: %v", leaseID, err)
}
// 心跳间隔时间稍短于最大超时时间
time.Sleep(time.Duration(heartbeatFrequency)*time.Second - 100*time.Millisecond)
}
}()
}
return nil
}
// LoadFromIndexFile 从索引文件中加载数据。在系统重启后,可以通过此方法从持久化的索引文件中恢复数据。
//
// 返回值:
// - int: 加载成功的文档数量。如果加载过程中发生错误,则返回0。
func (w *IndexServiceWorker) LoadFromIndexFile() int {
return w.Indexer.LoadFromIndexFile()
}
// Close 关闭索引服务。如果服务在etcd中注册过,则需要注销服务;否则只需要关闭索引。
//
// 返回值:
// - error: 如果在注销服务或关闭索引过程中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Close() error {
// 检查是否需要注销服务
if w.hub != nil {
// 注销服务
err := w.hub.UnregisterService(IndexService, w.selfAddr)
if err != nil {
utils.Log.Printf("注销服务失败,服务地址: %v, 错误: %v", w.selfAddr, err)
return err
}
utils.Log.Printf("注销服务成功,服务地址: %v", w.selfAddr)
}
// 关闭索引
return w.Indexer.Close()
}
// DeleteDoc 从索引中删除文档。根据提供的文档ID删除对应的文档。
//
// 参数:
// - ctx: 上下文,用于处理请求的生命周期和取消操作。
// - docId: 包含要删除的文档ID。
//
// 返回值:
// - *AffectedCount: 删除操作影响的文档数量。
// - error: 如果删除操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) DeleteDoc(ctx context.Context, docId *DocId) (*AffectedCount, error) {
// 调用Indexer的DeleteDoc方法删除文档,并返回影响的文档数量
return &AffectedCount{
Count: int32(w.Indexer.DeleteDoc(docId.DocId)),
}, nil
}
// AddDoc 向索引中添加文档。如果文档已经存在,会先删除旧文档再添加新文档。
//
// 参数:
// - ctx: 上下文,用于处理请求的生命周期和取消操作。
// - doc: 要添加的文档对象。
//
// 返回值:
// - *AffectedCount: 添加操作影响的文档数量。
// - error: 如果添加操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) AddDoc(ctx context.Context, doc *types.Document) (*AffectedCount, error) {
// 调用Indexer的AddDoc方法添加文档,并返回影响的文档数量
n, err := w.Indexer.AddDoc(*doc)
return &AffectedCount{
Count: int32(n),
}, err
}
// Search 执行检索操作,返回符合查询条件的文档列表。
//
// 参数:
// - ctx: 上下文,用于处理请求的生命周期和取消操作。
// - request: 包含检索查询的请求对象。
//
// 返回值:
// - *SearchResult: 包含检索结果的文档列表。
// - error: 如果检索操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Search(ctx context.Context, request *SearchRequest) (*SearchResult, error) {
// 调用Indexer的Search方法进行检索,并返回检索结果
result := w.Indexer.Search(request.Query, request.OnFlag, request.OffFlag, request.OrFlags)
return &SearchResult{
Results: result,
}, nil
}
// Count 返回索引中当前文档的数量。
//
// 参数:
// - ctx: 上下文,用于处理请求的生命周期和取消操作。
// - request: 包含计数请求的对象。
//
// 返回值:
// - *AffectedCount: 当前索引中的文档数量。
// - error: 如果计数操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Count(ctx context.Context, request *CountRequest) (*AffectedCount, error) {
// 调用Indexer的Count方法获取文档数量,并返回结果
return &AffectedCount{
Count: int32(w.Indexer.Count()),
}, nil
}
- 分布式搜索引擎中,每台私人服务器只存储部分数据。
- 多个index worker之间需要互相通信,协同合作。
- 通信方式包括网络接口,采用gRPC方式进行服务接口的提供。
- index service包含三个接口:删除document、添加document和搜索。
- 删除接口接收document ID,返回受影响的条数。
- 添加接口接收document,返回添加成功的条数。
- 搜索接口接收搜索请求,返回搜索结果。
- index.proto引用了doc.proto和term_query.proto,跨文件和跨目录引用。
- 使用-I参数指定proto文件的搜索路径。
- 转成go代码后,import路径需要修改为正确的路径。