GRPC(4):拦截器

发布于:2023-01-21 ⋅ 阅读:(216) ⋅ 点赞:(0)

概述

上篇文章记录了 gRPC 四种通信模式及实现,今天学习一个新的知识点——拦截器。通常客户端请求到达服务端的时候不会立即进行业务处理,而是进行一些预处理操作,比如监控数据采集(统计 QPS),链路追踪,身份信息校验,必传参数校验等等。gRPC 为此提供了一个拦截器(Interceptor)功能来实现这一系列的操作。按照通信方式可以分为一元拦截器(Unary Interceptor)和流拦截器(Streaming Interceptor),按照应用角色可以分为客户端拦截器(Client-Side Interceptor)和服务端拦截器(Server-Side Interceptor),具体类型如下

// grpc interceptor.go
grpc.UnaryClientInterceptor
grpc.UnaryServerInterceptor
grpc.StreamClientInterceptor
grpc.StreamServerInterceptor

服务端拦截器(Server-Side Interceptor)

图片

  • 一元拦截器(Unary Interceptor)
type UnaryServerInterceptor func(
  ctx context.Context,     // 请求上下文,可以做一些超时处理
  req interface{},        // gRPC 请求参数
  info *UnaryServerInfo,  // gRPC 服务接口信息
  handler UnaryHandler,   // gRPC 实际调用方法
) (resp interface{}, err error)
  • 流拦截器(Streaming Interceptor)
type StreamServerInterceptor func(
  srv interface{},         // 请求参数
  ss ServerStream,         // gRPC 服务端流信息
  info *StreamServerInfo,  // gRPC 服务接口信息
  handler StreamHandler    // gRPC 实际调用方法
) error

客户端拦截器(Client-Side Interceptor)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZZMeKkaS-1660261217935)(https://mmbiz.qpic.cn/mmbiz_png/SANP3yJnyiaMuaHGY7MzLsd6om3d3iaNicMEf5RfCIXdXnvSIF0noR8Tm9oSyxH8f7VibBbU5prppUBuPdfPzyuBQg/640?wx_fmt=png)]

  • 一元拦截器(Unary Interceptor)
type UnaryClientInterceptor func(
  ctx context.Context,    // 请求上下文,可以做一些超时处理
  method string,          // 请求方法
  req, reply interface{}, // 请求和响应
  cc *ClientConn,         // 连接信息
  invoker UnaryInvoker,   // 调用的 gRPC 方法
  opts ...CallOption      // gRPC 调用预处理接口活后处理接口
) error
  • 流拦截器(Streaming Interceptor)
type StreamClientInterceptor func(
  ctx context.Context,    // 请求上下文 
  desc *StreamDesc,       // 调用 gRPC 方法流信息
  cc *ClientConn,         // 连接信息
  method string,          // 调用方法
  streamer Streamer,      // 流对象,通过 desc 初始化
  opts ...CallOption      // gRPC 调用预处理接口活后处理接口
) (ClientStream, error)

拦截器(interceptor)处理过程

拦截器的处理过程可以分为以下三个阶段:

  1. 预处理阶段(pre-processing)

  2. 调用 RPC 方法(invoking RPC method)

  3. 后处理阶段(post-processing)

图片

  • 客户端一元拦截器(Client-Side Unary Interceptor)
func mySqlUnaryClientInterceptor(
  ctx context.Context,
  method string,
  req, reply interface{},
  cc *grpc.ClientConn,
  invoker grpc.UnaryInvoker,
  opts ...grpc.CallOption) error {

  // Pre-processing logic
  // set requestID
  uuID, _ := uuid.NewRandom()
  ctx = context.WithValue(ctx, "requestID", uuID)

  start := time.Now()
  log.Printf("client unary interceptor pre-processing: requstID [%s]\n",
             ctx.Value("requestID"))

  // Invoking the remote method
  err := invoker(ctx, method, req, reply, cc, opts...)

  // Post processing logic
  end := time.Now()
  log.Printf("client unary interceptor post processing: time [%s]\n",
             end.Sub(start))

  return err
}
  • 服务端一元拦截器(Server-Side Unary Interceptor)
// mySqlUnaryServerInterceptor 服务端一元拦截器
func mySqlUnaryServerInterceptor(
  ctx context.Context,
  req interface{},
  info *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler) (interface{}, error) {

  // Pre-processing logic
  // set requestID
  uuID, _ := uuid.NewRandom()
  ctx = context.WithValue(ctx, "requestID", uuID)

  start := time.Now()
  log.Printf("server unary interceptor pre-processing: requstID [%s]\n",
             ctx.Value("requestID"))

  // Invoking the handler to complete the normal execution of a unary RPC.
  m, err := handler(ctx, req)

  // Post processing logic
  end := time.Now()
  log.Printf("server unary interceptor post processing: time [%s]\n", 
             end.Sub(start))
  return m, err
}

流拦截器预处理阶段和与一元拦截器类似,调用 RPC 方法和后处理两个阶段则不相同,stream 都是 streamer (客户端:clientStream, 服务端:serverStream)调用 SendMsgRecvMsg 获取的,streamer 又是调用 RPC 方法获取的,因此在流拦截器中我们可以对 Streamer 进行封装,在封装的方法 SendMsg 实现流的预处理和 RecvMsg 实现流的拦截

  • 客户端流拦截器(Client-Side Stream Interceptor)
// wrappedStream  用于包装 grpc.ClientStream 结构体并拦截其对应的方法。
type wrappedStream struct {
  grpc.ClientStream
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
  return &wrappedStream{s}
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
  log.Printf("Client Stream Interceptor Post Processing RecvMsg (Type: %T) at %v",
    m, time.Now().Format(time.RFC3339))
  return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
  log.Printf("Client Stream Interceptor Post Processing SendMsg (Type: %T) at %v",
    m, time.Now().Format(time.RFC3339))
  return w.ClientStream.SendMsg(m)
}

func mysqlClientStreamInterceptor(
  ctx context.Context, desc *grpc.StreamDesc,
  cc *grpc.ClientConn, method string,
  streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  // Pre-processing
  log.Println("Client Stream Interceptor Pre-processing : ", method)

  s, err := streamer(ctx, desc, cc, method, opts...)
  if err != nil {
    return nil, err
  }

  return newWrappedStream(s), nil
}
  • 服务端流拦截器(Server-Side Stream Interceptor)
type wrappedStream struct {
  grpc.ServerStream
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
  return &wrappedStream{s}
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
  log.Printf("Server Stream Interceptor Post Processing RecvMsg (Type: %T) at %s",
    m, time.Now().Format(time.RFC3339))
  return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
  log.Printf("Server Stream Interceptor Post Processing SendMsg (Type: %T) at %v",
    m, time.Now().Format(time.RFC3339))
  return w.ServerStream.SendMsg(m)
}

func mysqlServerStreamInterceptor(
  srv interface{}, ss grpc.ServerStream,
  info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  // Pre-processing
  log.Println("Server Stream Interceptor Pre-processing : ", info.FullMethod)

  // Invoking the StreamHandler to complete the execution of RPC invocation
  err := handler(srv, newWrappedStream(ss))
  if err != nil {
    log.Printf("RPC failed with error %v", err)
  }
  return err
}
  • 使用方法

我们可以在调用 RPC 方法之前和之后添加自己的逻辑,比如链路追踪设置,请求耗时数据采集等等。客户端和服务端调用方法如下所示:

// 客户端添加一元拦截器
conn, err := grpc.Dial(address, grpc.WithInsecure(),
    grpc.WithUnaryInterceptor(mySqlUnaryClientInterceptor),
    grpc.WithStreamInterceptor(clientStreamInterceptor))
// 服务端添加一元拦截器
s := grpc.NewServer(
    grpc.UnaryInterceptor(mySqlUnaryServerInterceptor),
    grpc.StreamInterceptor(orderServerStreamInterceptor))

示例代码执行结果

  • 客户端一元拦截器(Client-Side Unary Interceptor)
2022/07/03 23:09:01 client unary interceptor pre-processing: requstID [d01970fb-96a7-463f-ad68-d9ac597e8e34]
2022/07/03 23:09:02 client unary interceptor post processing: time [142.6873ms]
2022/07/03 23:09:02 select record success []
  • 服务端一元拦截器(Server-Side Unary Interceptor)
2022/07/03 23:09:02 server unary interceptor pre-processing: requstID [ea90a302-9fe0-404d-b309-998638df91ca]
2022/07/03 23:09:02 receives a select record request with sql : SELECT * FROM users WHERE id=1;
2022/07/03 23:09:02 server unary interceptor post processing: time [115.369ms]
  • 客户端流拦截器(Client-Side Stream Interceptor)
2022/07/03 23:01:33 Client Stream Interceptor Pre-processing :  /_02_grpc_demo.MysqlService/UpdateRecord
2022/07/03 23:01:33 Client Stream Interceptor Post Processing SendMsg (Type: *pb.MysqlReq) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 Client Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlRes) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 received response: code:200 msg:"update records sql [UPDATE users SET role = \"stu+tech\" WHERE name = \"hamming_test\"] succ"
2022/07/03 23:01:33 Client Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlRes) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 no more responses
  • 服务端流拦截器(Server-Side Stream Interceptor)
2022/07/03 23:01:33 Server Stream Interceptor Pre-processing :  /_02_grpc_demo.MysqlService/UpdateRecord
2022/07/03 23:01:33 Server Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlReq) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 Server Stream Interceptor Post Processing RecvMsg (Type: *pb.MysqlReq) at 2022-07-03T23:01:33+08:00
2022/07/03 23:01:33 read ends
2022/07/03 23:01:33 receives a update record request with sql : UPDATE users SET role = "stu+tech" WHERE name = "hamming_test"
2022/07/03 23:01:33 Server Stream Interceptor Post Processing SendMsg (Type: *pb.MysqlRes) at 2022-07-03T23:01:33+08:00

总结

  • gRPC 四种类型的拦截器

  • gRPC 一元拦截器的处理过程

  • gRPC 流拦截器的处理过程

参考资料

  • 示例完整代码:https://github.com/unendlichkeiten/grpc_demo

  • gRPC Up & Running by Kasun Indrasiri and Danesh Kuruppu

  • https://www.lixueduan.com/post/grpc/05-Interceptor/

  • http://wzmmmmj.com/2020/11/08/grpc-interceptor/

  • https://github.com/grpc-up-and-running/samples

  • https://github.com/grpc-ecosystem/go-grpc-middleware

关注公众号一起学习

本文含有隐藏内容,请 开通VIP 后查看