gRPC 微服务—Context信息的跨服务传递

科技   2024-09-13 08:51   北京  

导语

和在普通HTTP请求中一样,gRPC提供了在每一次RPC中携带上下文的结构:metadata在Go语言中,它与context.Context紧密结合,帮助我们实现服务端与客户端之间互相传递信息

什么是 metadata

gRPC 的 metadata 简单理解,就是 HTTP Header  中的 key-value 对

  • metadata 是以 key-value 的形式存储数据的,其中 key 是 string 类型,而 value 是 []string,即一个字符串数组类型

  • metadata 使得 client 和 server 能够为对方提供关于本次调用的一些信息,就像一次HTTP请求的Request HeaderResponse Header一样

  • HTTP Header 的生命周期是一次 HTTP 请求,那么 metadata 的生命周期就是一次 RPC 调用

metadata 创建

🌲 使用New():

md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

🌲 使用Pairs():

要注意如果有相同的 key 会自动合并

md := metadata.Pairs(
    "key1""value1",
    "key1""value1.2"// "key1" will have map value []string{"value1", "value1.2"}
    "key2""value2",
)

🌲 合并多个metadata

md1 :=  metadata.Pairs("k1""v1""k2""v2")
md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

md := metadata.Join(md1, md2)

🌲 存储二进制数据

在 metadata 中,key 永远是 string 类型,但是 value 可以是 string 也可以是二进制数据。为了在 metadata 中存储二进制数据,我们仅仅需要在 key 的后面加上一个 - bin 后缀。具有 - bin 后缀的 key 所对应的 value 在创建 metadata 时会被编码(base64),收到的时候会被解码:

md := metadata.Pairs(
    "key""string value",
    "key-bin"string([]byte{96102}),
)

metadata 结构本身也有一些操作方法,参考文档非常容易理解。这里不再赘述:https://pkg.go.dev/google.golang.org/grpc@v1.44.0/metadata

metadata 发送与接收

让我们再次回顾下pb文件和生成出来的client与server端的接口

service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementClient interface {
 GetOrder(ctx context.Context, 
           in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)
}
type OrderManagementServer interface {
 GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
 mustEmbedUnimplementedOrderManagementServer()
}

可以看到相比pb中的接口定义,生成出来的Go代码除了增加了error返回值,还多了context.Context

和错误处理类似,gRPC中的context.Context 也符合Go语言的使用习惯:通常情况下我们在函数首个参数放置context.Context用来传递一次RPC中有关的上下文,借助context.WithValue()ctx.Value()context添加变量或读取变量

metadata就是gRPC中可以传递的上下文信息之一,所以metadata的使用方式就是:metadata记录到context,从context读取metadata

Clinet发送Server接收

client发送metadata,那就是把metadata存储到contex.Context

server接收metadata,就是从contex.Context中读取Metadata

Clinet 发送 Metadata

Metadata放到contex.Context,有几种方式

🌲 使用NewOutgoingContext

将新创建的metadata添加到context中,这样会 覆盖 掉原来已有的metadata

// 将metadata添加到context中,获取新的context
md := metadata.Pairs("k1""v1""k1""v2""k2""v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)

// unary RPC
response, err := client.SomeRPC(ctx, someRequest)

// streaming RPC
stream, err := client.SomeStreamingRPC(ctx)

🌲 使用AppendToOutgoingContext

可以直接将 key-value 对添加到已有的context

  • 如果context中没有metadata,那么就会 创建 一个

  • 如果已有metadata,那么就将数据 添加 到原来的metadata

// 如果对应的 context 没有 metadata,那么就会创建一个
ctx := metadata.AppendToOutgoingContext(ctx, "k1""v1""k1""v2""k2""v3")

// 如果已有 metadata 了,那么就将数据添加到原来的 metadata  (例如在拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3""v4")

// 普通RPC(unary RPC)
response, err := client.SomeRPC(ctx, someRequest)

// 流式RPC(streaming RPC)
stream, err := client.SomeStreamingRPC(ctx)

Server 接收 Metedata

普通RPC与流式RPC的区别不大,都是从contex.Context中读取metadata

🌲 使用FromIncomingContext

普通RPC(unary RPC)

//Unary Call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

流式RPC(streaming RPC)

//Streaming Call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
    md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
    // do something with metadata
}

Server发送Clinet接收

服务端发送的metadata被分成了headertrailer两者,因而客户端也可以读取两者

Server 发送 Metadata

对于普通RPC(unary RPC)server可以使用grpc包中提供的函数向client发送 headertrailer

  • grpc.SendHeader()
  • grpc.SetHeader()
  • grpc.SetTrailer()

对于**流式RPC(streaming RPC)server可以使用ServerStream[1]接口中定义的函数向client发送headertrailer

  • ServerStream.SendHeader()
  • ServerStream.SetHeader()
  • ServerStream.SetTrailer()

🌲 普通RPC(unary RPC)

使用 grpc.SendHeader()  和 grpc.SetTrailer() 方法 ,这两个函数将context.Context作为第一个参数

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 创建并发送header
  header := metadata.Pairs("header-key""val")
  grpc.SendHeader(ctx, header)
  
  // 创建并发送trailer
  trailer := metadata.Pairs("trailer-key""val")
  grpc.SetTrailer(ctx, trailer)
}

如果不想立即发送header,也可以使用grpc.SetHeader()grpc.SetHeader()可以被多次调用,在如下时机会把多个metadata合并发送出去

  • 调用grpc.SendHeader()
  • 第一个响应被发送时
  • RPC结束时(包含成功或失败)
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 创建header,在适当时机会被发送
  header := metadata.Pairs("header-key1""val1")
  grpc.SetHeader(ctx, header)
    
  // 创建header,在适当时机会被发送
  header := metadata.Pairs("header-key2""val2")
  grpc.SetHeader(ctx, header)
  
  // 创建并发送trailer
  trailer := metadata.Pairs("trailer-key""val")
  grpc.SetTrailer(ctx, trailer)
}

🌲 流式RPC(streaming RPC)

使用 ServerStream.SendHeader()ServerStream.SetTrailer() 方法

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key""val")
  stream.SendHeader(header)
  
  // create and set trailer
  trailer := metadata.Pairs("trailer-key""val")
  stream.SetTrailer(trailer)
}

如果不想立即发送header,也可以使用ServerStream.SetHeader()ServerStream.SetHeader()可以被多次调用,在如下时机会把多个metadata合并发送出去

  • 调用ServerStream.SendHeader()
  • 第一个响应被发送时
  • RPC结束时(包含成功或失败)
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key""val")
  stream.SetHeader(header)
  
  // create and set trailer
  trailer := metadata.Pairs("trailer-key""val")
  stream.SetTrailer(trailer)
}

Client 接收 Metadata

🌲 普通RPC(unary RPC)

普通RPC(unary RPC)使用grpc.Header()grpc.Trailer()方法来接收 Metadata

// RPC using the context with new metadata.
var header, trailer metadata.MD

// Add Order
order := pb.Order{Id: "101", Items: []string{"iPhone XS""Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
  panic(err)
}

🌲 流式RPC(streaming RPC)

流式RPC(streaming RPC)通过调用返回的 ClientStream接口的Header() Trailer()方法接收 metadata

stream, err := client.SomeStreamingRPC(ctx)

// retrieve header
header, err := stream.Header()

stream.CloseAndRecv()

// retrieve trailer
trailer := stream.Trailer()

HeaderTrailer区别

根本区别:发送的时机不同!

headers会在下面三种场景下被发送

  • SendHeader() 被调用时(包含grpc.SendHeaderstream.SendHeader)
  • 第一个响应被发送时
  • RPC结束时(包含成功或失败)

trailer会在rpc返回的时候,即这个请求结束的时候被发送

差异在流式RPC(streaming RPC)中比较明显:

因为trailer是在服务端发送完请求之后才发送的,所以client获取trailer的时候需要在stream.CloseAndRecv或者stream.Recv 返回非nil错误 (包含 io.EOF)之后

如果stream.CloseAndRecv之前调用stream.Trailer()获取的是空

stream, err := client.SomeStreamingRPC(ctx)

// retrieve header
header, err := stream.Header()

// retrieve trailer 
// `trailer`会在rpc返回的时候,即这个请求结束的时候被发送
// 因此此时调用`stream.Trailer()`获取的是空
trailer := stream.Trailer()

stream.CloseAndRecv()

// retrieve trailer 
// `trailer`会在rpc返回的时候,即这个请求结束的时候被发送
// 因此此时调用`stream.Trailer()`才可以获取到值
trailer := stream.Trailer()

使用场景

既然我们把metadata类比成HTTP Header,那么metadata的使用场景也可以借鉴HTTPHeader。如传递用户token进行用户认证,传递trace进行链路追踪等

拦截器中的metadata

在拦截器中,我们不但可以获取或修改接收到的metadata,甚至还可以截取并修改要发送出去的metadata

还记得拦截器如何实现么?如果已经忘了快快回顾一下吧:

🌰 举个例子:

我们在客户端拦截器中从要发送给服务端的metadata中读取一个时间戳字段,如果没有则补充这个时间戳字段

注意这里用到了一个上文没有提到的FromOutgoingContext(ctx)函数

func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption)
 error
 {

 var s string

 // 获取要发送给服务端的`metadata`
 md, ok := metadata.FromOutgoingContext(ctx)
 if ok && len(md.Get("time")) > 0 {
  s = md.Get("time")[0]
 } else {
        // 如果没有则补充这个时间戳字段
  s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)
  ctx = metadata.AppendToOutgoingContext(ctx, "time", s)
 }

 log.Printf("call timestamp: %s", s)

 // Invoking the remote method
 err := invoker(ctx, method, req, reply, cc, opts...)

 return err
}

func main() {
 conn, err := grpc.Dial("127.0.0.1:8009",
  grpc.WithInsecure(),
  grpc.WithChainUnaryInterceptor(
   orderUnaryClientInterceptor,
  ),
 )
 if err != nil {
  panic(err)
 }
    
    c := pb.NewOrderManagementClient(conn)

 ctx = metadata.AppendToOutgoingContext(context.Background(), "time",
  "raw"+strconv.FormatInt(time.Now().UnixNano(), 10))

 // RPC using the context with new metadata.
 var header, trailer metadata.MD

 // Add Order
 order := pb.Order{
  Id:          "101",
  Items:       []string{"iPhone XS""Mac Book Pro"},
  Destination: "San Jose, CA",
  Price:       2300.00,
 }
 res, err := c.AddOrder(ctx, &order)
 if err != nil {
  panic(err)
 }
}

以上的思路在server同样适用。基于以上原理我们可以实现链路追踪、用户认证等功能



最后真心推荐一下我前段时间写的付费专栏,该专栏主要用一些实际的案例讲解项目业务需求分析、技术实现分析、模块划分和分层的方法论,讲明白这些底层逻辑后再教大家怎么用 UML 等工具把它们图形化表达出来。详细内容请可扫码观看,或访问:独家原创--程序员的全能画图课

另外现在我在做的一个Go实战项目的专栏,整个项目也采用了这里讲的知识来实际地一步步做分析和开发的,大家可以先把这些方法论学会,后面跟着新专栏学习,看项目和代码的实践能明显提高自己做项目的架构水平。


网管叨bi叨
分享软件开发和系统架构设计基础、Go 语言和Kubernetes。
 最新文章