上一篇介绍了如何编写 protobuf 的 idl [接口定义语言 (Interface Definition Language
)],并使用 idl 生成了 gRPC 的代码,现在来看看如何编写客户端和服务端的代码。
一、Simple RPC (Unary RPC)
1.1 protobuf
syntax = "proto3";
option go_package = ".;helloworld";
// protoc --go_out=. --go-grpc_out=. helloworld.proto
service SearchService {
rpc Search(SearchRequest) returns (SearchResponse) {}
}
message Book {
string title = 1;
int32 price = 3;
}
message SearchRequest {
string request = 1;
repeated string keywords = 2;
Book book = 3;
}
message SearchResponse {
string response = 1;
}
定义如上的 idl,需要关注几个事项
- 使用
protobuf
最新版本syntax = "proto3";
protoc-gen-go
要求 pb 文件必须指定 go 包的路径。即option go_package = ".;helloworld";
,. (点号) - 生成代码的导入路径 (Import Path),helloworld - 生成代码的Go 包名 (Package Name)。service
定义的method
仅能有一个入参和出参数。如果需要传递多个参数需要定义成message
- 可以使用
import
引用另外一个文件的 pb。
生成 go 和 grpc 的代码
protoc -I . --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative helloworld.proto
helloword_proto
├── helloworld.pb.go
└── helloworld_grpc.pb.go
└── helloworld.proto
1.2 server 实现
1、由 pb 文件生成的 gRPC 代码中包含了 service 的接口定义,它和我们定义的 idl 是吻合的
service SearchService {
rpc Search(SearchRequest) returns (SearchResponse) {}
}
type SearchServiceServer interface {
Search(context.Context, *SearchRequest) (*SearchResponse, error)
mustEmbedUnimplementedSearchServiceServer()
}
2、我们的业务逻辑就是实现这个接口
package main
import (
"context"
helloworld "example.com/grpc/helloworld_proto"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"net"
"time"
)
type HelloWorldServer struct {
helloworld.UnimplementedSearchServiceServer
}
func (server *HelloWorldServer) Search(ctx context.Context, r *helloworld.SearchRequest) (*helloworld.SearchResponse, error) {
fmt.Println("Request received:", r.GetRequest())
fmt.Println("Keywords received:", r.GetKeywords())
fmt.Println("Books received:", r.GetBook())
return &helloworld.SearchResponse{
Response: fmt.Sprintf("hello %s", r.GetRequest()),
}, nil
}
3、在实现完业务逻辑之后,我们可以创建并启动服务
func main() {
// 1. 监听端口
listen, err := net.Listen("tcp", ":8090")
if err != nil {
fmt.Println("监听端口失败", err)
}
// 2. 创建gRPC服务器实例
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(serverUnaryInterceptor),
)
// 3. 注册服务
helloworld.RegisterSearchServiceServer(grpcServer, &HelloWorldServer{})
// 4. 启动服务, grpcServer.Serve(listen) 会阻塞当前 goroutine,直到服务停止
err = grpcServer.Serve(listen)
if err != nil {
fmt.Println("启动服务失败", err)
}
}
服务端代码实现的流程如下
client 实现
1、由 pb 文件生成的 gRPC 代码中包含了 client 的实现,它和我们定义的 idl 也是吻合的。
type searchServiceClient struct {
cc grpc.ClientConnInterface
}
func NewSearchServiceClient(cc grpc.ClientConnInterface) SearchServiceClient {
return &searchServiceClient{cc}
}
func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(SearchResponse)
err := c.cc.Invoke(ctx, SearchService_Search_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
2、直接使用 client 来进行 rpc 调用
package main
import (
"context"
helloworld "example.com/grpc/helloworld_proto"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"runtime"
"time"
)
func main() {
conn, err := grpc.NewClient("localhost:8090",
grpc.WithTransportCredentials(insecure.NewCredentials())
)
if err != nil {
panic(err)
}
defer conn.Close()
// 1. 创建客户端实例
client := helloworld.NewSearchServiceClient(conn)
// 2. 调用方法
resp, err := client.Search(context.Background(), &helloworld.SearchRequest{
Request: "Golang",
Keywords: []string{"hello", "world"},
Book: &helloworld.Book{
Title: "Go语言编程",
Price: &wrapperspb.Int64Value{
Value: 100,
},
},
})
if err != nil {
panic(err)
}
fmt.Println(resp)
}
客户端代码实现的流程如下
1.3 总结
✨ 前文提到过protobuf协议
是平台无关的。演示的客户端和服务端都是 golang 的,即使客户端和服务端不同语言也是类似的可以通信的
✨ 对于上面介绍的的这种类似于http1.x
的模式:客户端发送请求,服务端响应请求,一问一答的模式在 gRPC 里叫做Simple RPC
(也称Unary RPC 一元RPC
)。gRPC 同时也支持其他类型的交互方式。
二、Server-Streaming RPC 服务器端流式 RPC
服务器端流式 RPC
,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求
简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。大致如图:
2.1 protobuf 定义
syntax = "proto3";
option go_package = ".;streaming";
service Greeter {
// 服务端流模式:客户端发送一次请求,服务端持续返回数据
rpc GetStream(StreamRequestData) returns (stream StreamResponseData);
}
message StreamRequestData {
string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}
message StreamResponseData {
string data = 1;
}
2.2 server 实现
✨ 注意与Simple RPC
的区别:因为我们的服务端是流式响应的,因此对于服务端来说函数入参多了一个streaming.Greeter_GetStreamServer
参数用来写入多个响应,可以把它看作是客户端的对象
✨ 可以通过调用这个流对象的Send(...)
,来往客户端写入数据
✨ 通过返回nil
或者error
来表示全部数据写完了
func (s *server) GetStream(req *streaming.StreamRequestData, res streaming.Greeter_GetStreamServer) error {
i := 0
for {
i++
_ = res.Send(&streaming.StreamResponseData{
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
if i > 10 {
break
}
}
return nil
}
2.3 client 实现
✨ 注意与Simple RPC
的区别:因为我们的服务端是流式响应的,因此 RPC 函数返回值stream
是一个流,可以把它看作是服务端的对象
✨ 使用stream
的Recv
函数来不断从服务端接收数据
✨ 当Recv
返回io.EOF
代表流已经结束
func main() {
conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), grpc.WithStreamInterceptor(streamInterceptor))
if err != nil {
panic(err)
}
defer conn.Close()
// 1. 创建客户端实例
client := streaming.NewGreeterClient(conn)
// 2. 调用方法 (服务端流模式)
res, _ := client.GetStream(context.Background(), &streaming.StreamRequestData{Data: "服务端流模式"})
for {
a, err := res.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(a.Data)
}
}
2.4 总结
三、Client-Streaming RPC 客户端流式 RPC
客户端流式 RPC
,显然也是单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端,大致如图:
服务端没有必要等到客户端发送完所有请求再响应,可以在收到部分请求之后就响应
3.1 protobuf 定义
syntax = "proto3";
option go_package = ".;streaming";
service Greeter {
// 客户端流模式:客户端持续发送数据,服务端最终返回结果
rpc PutStream(stream StreamRequestData) returns (StreamResponseData);
}
message StreamRequestData {
string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}
message StreamResponseData {
string data = 1;
}
3.2 server 实现
✨ 注意与Simple RPC
的区别:因为我们的客户端是流式请求的,因此请求参数cliStr streaming.Greeter_PutStreamServer
就是流对象
✨ 可以从cliStr streaming.Greeter_PutStreamServer
的Recv
函数读取消息
✨ 当Recv
返回io.EOF
代表流已经结束
✨ 使用cliStr streaming.Greeter_PutStreamServer的
SendAndClose`函数关闭并发送响应
// 在这段程序中,我们对每一个 Recv 都进行了处理
// PutStream 客户端流模式:客户端持续发送数据,服务端最终返回结果
func (s *server) PutStream(cliStr streaming.Greeter_PutStreamServer) error {
for {
if a, err := cliStr.Recv(); err != nil {
fmt.Println(err)
if err == io.EOF {
// Finished reading the order stream
return cliStr.SendAndClose(&streaming.StreamResponseData{Data: "服务端接收完成"})
}
break
} else {
fmt.Println(a.Data)
}
}
return nil
}
3.3 Client 实现
✨ 注意与Simple RPC
的区别:因为我们的客户端是流式响应的,因此 RPC 函数返回值stream
是一个流
✨ 可以通过调用这个流对象的Send(...)
,来往这个对象写入数据
✨ 使用stream
的CloseAndRecv
函数关闭并发送响应
conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), grpc.WithStreamInterceptor(streamInterceptor))
if err != nil {
panic(err)
}
defer conn.Close()
// 1. 创建客户端实例
client := streaming.NewGreeterClient(conn)
putS, _ := client.PutStream(context.Background())
i := 0
for {
i++
_ = putS.Send(&streaming.StreamRequestData{
Data: fmt.Sprintf("客户端流模式:%d", i),
})
time.Sleep(time.Second)
if i > 10 {
// 发送超过10条消息,关闭流
_, err := putS.CloseAndRecv()
if err != nil {
panic(err)
}
break
}
}
3.4 总结
四、Bidirectional-Streaming RPC 双向流式 RPC
双向流式 RPC
,顾名思义是双向流。由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求
首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)
假设该双向流是按顺序发送的话,大致如图:
4.1 protobuf 定义
syntax = "proto3";
option go_package = ".;streaming";
service Greeter {
// 双向流模式:客户端持续发送数据,服务端持续返回数据
rpc BidiStream(stream StreamRequestData) returns (stream StreamResponseData);
}
message StreamRequestData {
string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}
message StreamResponseData {
string data = 1;
}
4.2 server 实现
✨ 函数入参allStr streaming.Greeter_BidiStreamServer
是用来写入多个响应和读取多个消息的对象引用
✨ 可以通过调用这个流对象的Send(...)
,来往这个对象写入响应
✨ 可以通过调用这个流对象的Recv(...)
函数读取消息,当Recv
返回io.EOF
代表流已经结束
✨ 通过返回nil
或者error
表示全部数据写完了
// BidiStream 双向流模式:客户端持续发送数据,服务端持续返回数据
func (s *server) BidiStream(allStr streaming.Greeter_BidiStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
// 接收客户端数据
go func() {
defer wg.Done()
for {
if data, err := allStr.Recv(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println("收到客户端消息:" + data.Data)
}
}
}()
// 发送数据给客户端
go func() {
defer wg.Done()
i := 0
for {
i++
_ = allStr.Send(&streaming.StreamResponseData{Data: fmt.Sprintf("我是服务端%v", time.Now().Unix())})
time.Sleep(time.Second)
if i > 10 {
break
}
}
}()
wg.Wait()
return nil
}
4.3 Client 实现
✨ 函数返回值allStr streaming.Greeter_BidiStreamServer
是用来获取多个响应和写入多个消息的对象引用
✨ 可以通过调用这个流对象的Send(...)
,来往这个对象写入响应
✨ 可以通过调用这个流对象的Recv(...)
函数读取消息,当Recv
返回io.EOF
代表流已经结束
// 双向流模式
client := streaming.NewGreeterClient(conn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
allStr, _ := client.BidiStream(ctx)
wg := sync.WaitGroup{}
wg.Add(2)
// 接收服务端消息
go func() {
defer wg.Done()
for {
data, _ := allStr.Recv()
fmt.Println("收到服务端消息:" + data.Data)
}
}()
// 发送消息给服务端
go func() {
defer wg.Done()
i := 0
for {
i++
_ = allStr.Send(&streaming.StreamRequestData{Data: fmt.Sprintf("我是客户端:%d", rand.IntN(100))})
time.Sleep(time.Second)
if i > 10 {
break
}
}
}()
wg.Wait()