概述
上篇文章记录了 gRPC
四种通信模式及实现,今天学习一个新的知识点——拦截器。通常客户端请求到达服务端的时候不会立即进行业务处理,而是进行一些预处理操作,比如监控数据采集(统计 QPS),链路追踪,身份信息校验,必传参数校验等等。gRPC
为此提供了一个拦截器(Interceptor)功能来实现这一系列的操作。按照通信方式可以分为一元拦截器(Unary Interceptor)和流拦截器(Streaming Interceptor),按照应用角色可以分为客户端拦截器(Client-Side Interceptor)和服务端拦截器(Server-Side Interceptor),具体类型如下
// grpc interceptor.go
grpc.UnaryClientInterceptor
grpc.UnaryServerInterceptor
grpc.StreamClientInterceptor
grpc.StreamServerInterceptor
服务端拦截器(Server-Side Interceptor)
- 一元拦截器(Unary Interceptor)
type UnaryServerInterceptor func(
ctx context.Context, // 请求上下文,可以做一些超时处理
req interface{}, // gRPC 请求参数
info *UnaryServerInfo, // gRPC 服务接口信息
handler UnaryHandler, // gRPC 实际调用方法
) (resp interface{}, err error)
- 流拦截器(Streaming Interceptor)
type StreamServerInterceptor func(
srv interface{}, // 请求参数
ss ServerStream, // gRPC 服务端流信息
info *StreamServerInfo, // gRPC 服务接口信息
handler StreamHandler // gRPC 实际调用方法
) error
客户端拦截器(Client-Side Interceptor)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZZMeKkaS-1660261217935)(https://mmbiz.qpic.cn/mmbiz_png/SANP3yJnyiaMuaHGY7MzLsd6om3d3iaNicMEf5RfCIXdXnvSIF0noR8Tm9oSyxH8f7VibBbU5prppUBuPdfPzyuBQg/640?wx_fmt=png)]
- 一元拦截器(Unary Interceptor)
type UnaryClientInterceptor func(
ctx context.Context, // 请求上下文,可以做一些超时处理
method string, // 请求方法
req, reply interface{}, // 请求和响应
cc *ClientConn, // 连接信息
invoker UnaryInvoker, // 调用的 gRPC 方法
opts ...CallOption // gRPC 调用预处理接口活后处理接口
) error
- 流拦截器(Streaming Interceptor)
type StreamClientInterceptor func(
ctx context.Context, // 请求上下文
desc *StreamDesc, // 调用 gRPC 方法流信息
cc *ClientConn, // 连接信息
method string, // 调用方法
streamer Streamer, // 流对象,通过 desc 初始化
opts ...CallOption // gRPC 调用预处理接口活后处理接口
) (ClientStream, error)
拦截器(interceptor)处理过程
拦截器的处理过程可以分为以下三个阶段:
预处理阶段(pre-processing)
调用 RPC 方法(invoking RPC method)
后处理阶段(post-processing)
- 客户端一元拦截器(Client-Side Unary Interceptor)
func mySqlUnaryClientInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption) error {
// Pre-processing logic
// set requestID
uuID, _ := uuid.NewRandom()
ctx = context.WithValue(ctx, "requestID", uuID)
start := time.Now()
log.Printf("client unary interceptor pre-processing: requstID [%s]\n",
ctx.Value("requestID"))
// Invoking the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
// Post processing logic
end := time.Now()
log.Printf("client unary interceptor post processing: time [%s]\n",
end.Sub(start))
return err
}
- 服务端一元拦截器(Server-Side Unary Interceptor)
// mySqlUnaryServerInterceptor 服务端一元拦截器
func mySqlUnaryServerInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
// Pre-processing logic
// set requestID
uuID, _ := uuid.NewRandom()
ctx = context.WithValue(ctx, "requestID", uuID)
start := time.Now()
log.Printf("server unary interceptor pre-processing: requstID [%s]\n",
ctx.Value("requestID"))
// Invoking the handler to complete the normal execution of a unary RPC.
m, err := handler(ctx, req)
// Post processing logic
end := time.Now()
log.Printf("server unary interceptor post processing: time [%s]\n",
end.Sub(start))
return m, err
}
流拦截器预处理阶段和与一元拦截器类似,调用 RPC 方法和后处理两个阶段则不相同,stream 都是 streamer (客户端:clientStream, 服务端:serverStream)调用 SendMsg
和 RecvMsg
获取的,streamer 又是调用 RPC 方法获取的,因此在流拦截器中我们可以对 Streamer 进行封装,在封装的方法 SendMsg
实现流的预处理和 RecvMsg
实现流的拦截
- 客户端流拦截器(Client-Side Stream Interceptor)
// wrappedStream 用于包装 grpc.ClientStream 结构体并拦截其对应的方法。
type wrappedStream struct {
grpc.ClientStream
}
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
return &wrappedStream{s}
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
log.Printf("Client Stream Interceptor Post Processing RecvMsg (Type: %T) at %v",
m, time.Now().Format(time.RFC3339))
return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
log.Printf("Client Stream Interceptor Post Processing SendMsg (Type: %T) at %v",
m, time.Now().Format(time.RFC3339))
return w.ClientStream.SendMsg(m)
}
func mysqlClientStreamInterceptor(
ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Pre-processing
log.Println("Client Stream Interceptor Pre-processing : ", method)
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return newWrappedStream(s), nil
}
- 服务端流拦截器(Server-Side Stream Interceptor)
type wrappedStream struct {
grpc.ServerStream
}
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s}
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
log.Printf("Server Stream Interceptor Post Processing RecvMsg (Type: %T) at %s",
m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
log.Printf("Server Stream Interceptor Post Processing SendMsg (Type: %T) at %v",
m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
}
func mysqlServerStreamInterceptor(
srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Pre-processing
log.Println("Server Stream Interceptor Pre-processing : ", info.FullMethod)
// Invoking the StreamHandler to complete the execution of RPC invocation
err := handler(srv, newWrappedStream(ss))
if err != nil {
log.Printf("RPC failed with error %v", err)
}
return err
}
- 使用方法
我们可以在调用 RPC 方法之前和之后添加自己的逻辑,比如链路追踪设置,请求耗时数据采集等等。客户端和服务端调用方法如下所示:
// 客户端添加一元拦截器
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(mySqlUnaryClientInterceptor),
grpc.WithStreamInterceptor(clientStreamInterceptor))
// 服务端添加一元拦截器
s := grpc.NewServer(
grpc.UnaryInterceptor(mySqlUnaryServerInterceptor),
grpc.StreamInterceptor(orderServerStreamInterceptor))
示例代码执行结果
- 客户端一元拦截器(Client-Side Unary Interceptor)
2022/07/03 23:09:01 client unary interceptor pre-processing: requstID [d01970fb-96a7-463f-ad68-d9ac597e8e34]
2022/07/03 23:09:02 client unary interceptor post processing: time [142.6873ms]
2022/07/03 23:09:02 select record success []
- 服务端一元拦截器(Server-Side Unary Interceptor)
2022/07/03 23:09:02 server unary interceptor pre-processing: requstID [ea90a302-9fe0-404d-b309-998638df91ca]
2022/07/03 23:09:02 receives a select record request with sql : SELECT * FROM users WHERE id=1;
2022/07/03 23:09:02 server unary interceptor post processing: time [115.369ms]
- 客户端流拦截器(Client-Side Stream Interceptor)
2022/07/03 23:01:33 Client Stream Interceptor Pre-processing : /_02_grpc_demo.MysqlService/UpdateRecord
2022/07/03 23:01:33 Client Stream Interceptor Post Processing SendMsg (Type: *pb.MysqlReq) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 Client Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlRes) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 received response: code:200 msg:"update records sql [UPDATE users SET role = \"stu+tech\" WHERE name = \"hamming_test\"] succ"
2022/07/03 23:01:33 Client Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlRes) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 no more responses
- 服务端流拦截器(Server-Side Stream Interceptor)
2022/07/03 23:01:33 Server Stream Interceptor Pre-processing : /_02_grpc_demo.MysqlService/UpdateRecord
2022/07/03 23:01:33 Server Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlReq) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 Server Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlReq) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 read ends
2022/07/03 23:01:33 receives a update record request with sql : UPDATE users SET role = "stu+tech" WHERE name = "hamming_test"
2022/07/03 23:01:33 Server Stream Interceptor Post Processing SendMsg (Type: *pb.MysqlRes) at 2022-07-03T23:01:33+08:00
总结
gRPC 四种类型的拦截器
gRPC 一元拦截器的处理过程
gRPC 流拦截器的处理过程
参考资料
示例完整代码:https://github.com/unendlichkeiten/grpc_demo
gRPC Up & Running by Kasun Indrasiri and Danesh Kuruppu
https://www.lixueduan.com/post/grpc/05-Interceptor/
http://wzmmmmj.com/2020/11/08/grpc-interceptor/
https://github.com/grpc-up-and-running/samples
https://github.com/grpc-ecosystem/go-grpc-middleware