【tRPC-go】message、context相关源码设计思路

发布于:2025-04-14 ⋅ 阅读:(30) ⋅ 点赞:(0)

codec 包基础设计

package:trpc-go/codec

  1. Msg接口的定义: 后续实际的消息msg结构体实现该接口,可以方便地获取一些值(如Context),也可以方便地进行一些值地设置(如设置SerializationType)
// message.go 文件下 

// Msg defines core message data for multi protocol, business protocol
// should set this message when packing and unpacking data.
type Msg interface {
	// Context returns rpc context
	Context() context.Context

	// WithRemoteAddr sets upstream address for server,
	// or downstream address for client.
	WithRemoteAddr(addr net.Addr)

	// WithLocalAddr sets server local address.
	WithLocalAddr(addr net.Addr)

	// RemoteAddr returns upstream address for server,
	// or downstream address for client.
	RemoteAddr() net.Addr

	// LocalAddr returns server local address.
	LocalAddr() net.Addr

	// WithNamespace sets server namespace.
	WithNamespace(string)

	// Namespace returns server namespace.
	Namespace() string

	// WithEnvName sets server environment.
	WithEnvName(string)
		// SerializationType returns serialization type.
	SerializationType() int

	// WithCompressType sets compress type.
	WithCompressType(int)
	....
  1. msg 整个rpc的上下文,先看接口体的设计,以及实现上面Msg接口定义的方法
// message_impl.go 文件

// msg is the context of rpc.
type msg struct {
	context             context.Context
	frameHead           interface{}
	requestTimeout      time.Duration
	serializationType   int
	compressType        int
	streamID            uint32
	dyeing              bool
	dyeingKey           string
	serverRPCName       string
	clientRPCName       string
	serverMetaData      MetaData
	clientMetaData      MetaData
	callerServiceName   string
	calleeServiceName   string
	calleeContainerName string
	serverRspErr        error
	clientRspErr        error
	serverReqHead       interface{}
	serverRspHead       interface{}
	clientReqHead       interface{}
	clientRspHead       interface{}
	localAddr           net.Addr
	remoteAddr          net.Addr
	logger              interface{}
	callerApp           string
	callerServer        string
	callerService       string
	callerMethod        string
	calleeApp           string
	calleeServer        string
	calleeService       string
	calleeMethod        string
	namespace           string
	setName             string
	envName             string
	envTransfer         string
	requestID           uint32
	calleeSetName       string
	streamFrame         interface{}
	commonMeta          CommonMeta
	callType            RequestType
}

// 实现了 Msg接口所有方法
// Context restores old context when create new msg.
func (m *msg) Context() context.Context {
	return m.context
}

// WithNamespace set server's namespace.
func (m *msg) WithNamespace(namespace string) {
	m.namespace = namespace
}
....

msg在后续:会被写入trpc协议的帧头中,进行网络传输,以达到下游
在这里插入图片描述

客户端调用 使用message:GreeterClientProxyImpl

  1. 客户端调用具体rpc方法桩代码:
func (c *GreeterClientProxyImpl) Hello(ctx context.Context, req *HelloRequest, opts ...client.Option) (*HelloReply, error) {
	// 派生出新的消息,以实现整个调用链路的context消息传递
	ctx, msg := codec.WithCloneMessage(ctx)
	
	defer codec.PutBackMessage(msg)
	msg.WithClientRPCName("/trpc.helloworld.Greeter/Hello")
	msg.WithCalleeServiceName(GreeterServer_ServiceDesc.ServiceName)
	msg.WithCalleeApp("")
	msg.WithCalleeServer("")
	msg.WithCalleeService("Greeter")
	msg.WithCalleeMethod("Hello")
	msg.WithSerializationType(codec.SerializationTypePB)
	callopts := make([]client.Option, 0, len(c.opts)+len(opts))
	callopts = append(callopts, c.opts...)
	callopts = append(callopts, opts...)
	rsp := &HelloReply{}
	if err := c.client.Invoke(ctx, req, rsp, callopts...); err != nil {
		return nil, err
	}
	return rsp, nil
}

先看WithCloneMessage方法:主要实现一组新的 ctx 和 msg 的派生

  • 首先可以看到第二个返回值Msg就是之前的接口类型
  • val.(*msg)是因为context中存储的不是实际结构体,而是结构体的指针
  • 所以上面桩代码中:msg.WithClientRPCName(“/trpc.helloworld.Greeter/Hello”),WithClientRPCName这些方法都是指针类型的实现:func (m *msg) xxx(namespace string) {
// message_impl.go 文件

// WithCloneMessage copy a new message and put into context, each rpc call should
// create a new message, this method will be called by client stub.
func WithCloneMessage(ctx context.Context) (context.Context, Msg) {
	newMsg := msgPool.Get().(*msg)
	val := ctx.Value(ContextKeyMessage)
	m, ok := val.(*msg)
	if !ok {
		ctx = context.WithValue(ctx, ContextKeyMessage, newMsg)
		newMsg.context = ctx
		return ctx, newMsg
	}
	ctx = context.WithValue(ctx, ContextKeyMessage, newMsg)
	newMsg.context = ctx
	copyCommonMessage(m, newMsg)
	copyServerToClientMessage(m, newMsg)
	return ctx, newMsg
}

到这里,一个派生的消息就从一个现有消息中拷贝了框架定义的一些上下文信息了,并且也设置了主调信息为自身服务,但是,被调信息呢?你回到开头看 tRPC 的桩代码就会知道了,桩代码里会负责进一步重写被调信息。

参考文献:

https://cloud.tencent.com/developer/article/2417507