拦截器


在应用开发过程中会有这样的需求,就是在请求执行前后做一些通用的处理逻辑,比如记录日志、tracing、身份认证等,在web框架中一般是使用middleware来实现的,gRPC 在客户端和服务端都支持了拦截器功能,用来处理这种业务需求。

下面基于 PingPong 服务做一些扩展来演示拦截器的使用方法。

源码目录:

|—- src/
	|-- interceptor/
		|—— client.go // 客户端
		|—— server.go // 服务端
	|—- protos/ping/
		|—— ping.proto   // protobuf描述文件
		|—— ping.pb.go   // protoc编译生成
    	|-- ping_grpc.pb.go // protoc编译生成

普通拦截器

在客户端和服务端分别实现一个记录请求日志的拦截器,打印请求前后的信息。

服务端

// src/interceptor/server.go

...

// 服务端拦截器 - 记录请求和响应日志
func serverUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	// 前置逻辑
	log.Printf("[Server Interceptor] accept request: %s", info.FullMethod)

	// 处理请求
	response, err := handler(ctx, req)

	// 后置逻辑
	log.Printf("[Server Interceptor] response: %s", response)

	return response, err
}

func main() {
	// 以option的方式添加拦截器
	srv := grpc.NewServer(grpc.UnaryInterceptor(serverUnaryInterceptor))

...

客户端

// src/inteceptor/client.go

...

// 客户端拦截器 - 记录请求和响应日志
func clientUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// 前置逻辑
	log.Printf("[Client Interceptor] send request: %s", method)

	// 发起请求
	err := invoker(ctx, method, req, reply, cc, opts...)

	// 后置逻辑
	log.Printf("[Client Interceptor] response: %s", reply)

	return err
}

...

func Ping() {
	// 以option方式添加拦截器
	conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), grpc.WithUnaryInterceptor(clientUnaryInterceptor))
	if err != nil {
		log.Fatal(err)
	}

...

这里分别定义了 serverUnaryInterceptorclientUnaryInterceptor 拦截器,函数的签名定义在 google.golang.org/grpc 包中,分别为 UnaryServerInterceptorUnaryClientInterceptor, 在前置逻辑位置可以对请求信息做处理,在后置逻辑位置可以对响应信息做处理。在初始化服务端和客户端连接时以option的形式配置就好了,同时也支持配置多个拦截器。

运行结果:

// server
2022/09/27 00:00:00 [Server Interceptor] accept request: /protos.PingPong/Ping
2022/09/27 00:00:00 [Server Interceptor] response: value:"pong"

// client
2022/09/27 00:00:00 [Client Interceptor] send request: /protos.PingPong/Ping
2022/09/27 00:00:00 [Client Interceptor] response: value:"pong"

流拦截器

同样实现一个打印请求和响应日志的拦截器,只是函数签名变成了 grpc.StreamServerInterceptorgrpc.StreamClientInterceptor

服务端

// src/interceptor/server.go

...

// 服务端拦截器 - 记录stream请求和响应日志
func serverStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 前置逻辑
	log.Printf("[Server Stream Interceptor] accept request: %s", info.FullMethod)

	// 处理请求
	err := handler(srv, ss)
	return err
}

...

func main() {
	// 以option的方式添加拦截器
	opts := []grpc.ServerOption{
		grpc.UnaryInterceptor(serverUnaryInterceptor),
		grpc.StreamInterceptor(serverStreamInterceptor),
	}
	srv := grpc.NewServer(opts...)

...

以上实现其实和普通拦截器的使用方式没太大区别,但是流的特性在于请求和响应不是一次性处理完成的,而是多次发送和接收数据,所以我们可能需要在发送和接收数据的过程中处理一些公共逻辑,这才是流拦截器特别的地方。我们注意到 handler 方法调用的第二个参数是一个 grpc.ServerStream 接口类型,这个接口类型包含了 SendMsgRecvMsg 方法,所以我们可以使用一个自定义类型实现这个接口,对原对象进行包装重写这两个方法,这样就能达到我们的目的了。

// 服务端拦截器 - 记录stream请求和响应日志
func serverStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 前置逻辑
	log.Printf("[Server Stream Interceptor] accept request: %s", info.FullMethod)

	// 处理请求,使用自定义类型包装 ServerStream
	err := handler(srv, &customServerStream{ss})
	return err
}

type customServerStream struct {
	grpc.ServerStream
}

func (s *customServerStream) SendMsg(m interface{}) error {
	log.Printf("[Server Stream Interceptor] send: %T", m)
	return s.ServerStream.SendMsg(m)
}

func (s *customServerStream) RecvMsg(m interface{}) error {
	log.Printf("[Server Stream Interceptor] recv: %T", m)
	return s.ServerStream.RecvMsg(m)
}

客户端

客户端的使用方式和服务端类似,只是对应的数据处理接口类型变成了 grpc.ClientStream

// src/interceptor/client.go

···

	opts := []grpc.DialOption{
		grpc.WithInsecure(),
		grpc.WithStreamInterceptor(clientStreamInterceptor),
	}
	conn, err := grpc.Dial("localhost:1234", opts...)

···

func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	// 前置逻辑
	log.Printf("[Client Stream Interceptor] send request: %s", method)

	// 请求
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}

	// 自定义类型包装 ClientStream
	return &customClientStream{s}, nil
}

type customClientStream struct {
	grpc.ClientStream
}

func (s *customClientStream) SendMsg(m interface{}) error {
	log.Printf("[Client Stream Interceptor] send: %T", m)
	return s.ClientStream.SendMsg(m)
}

func (s *customClientStream) RecvMsg(m interface{}) error {
	log.Printf("[Client Stream Interceptor] recv: %T", m)
	return s.ClientStream.RecvMsg(m)
}

运行结果,以 MultiPingPong 方法为例,最后一次输出的recv是结束消息,err == io.EOF

// server
2022/10/02 10:53:11 [Server Stream Interceptor] accept request: /protos.PingPong/MultiPingPong
2022/10/02 10:53:11 [Server Stream Interceptor] recv: *ping.PingRequest
2022/10/02 10:53:11 [Server Stream Interceptor] recv: *ping.PingRequest
2022/10/02 10:53:11 [Server Stream Interceptor] send: *ping.PongResponse
2022/10/02 10:53:11 [Server Stream Interceptor] recv: *ping.PingRequest

// client
2022/10/02 10:53:11 [Client Stream Interceptor] send request: /protos.PingPong/MultiPingPong
2022/10/02 10:53:11 [Client Stream Interceptor] send: *ping.PingRequest
2022/10/02 10:53:11 send:ping
2022/10/02 10:53:11 [Client Stream Interceptor] send: *ping.PingRequest
2022/10/02 10:53:11 send:ping
2022/10/02 10:53:12 [Client Stream Interceptor] recv: *ping.PongResponse
2022/10/02 10:53:12 recv:pong
2022/10/02 10:53:13 [Client Stream Interceptor] recv: *ping.PongResponse

注意:在自定义的 RecvMsg 方法中,前置位置只能读取消息的类型,无法读取实际数据,因为这个时候接收到的消息还没有解析处理,如果要获取接收消息的实际数据,需要把自定义的处理逻辑放在后置位置,例如:

func (s *customClientStream) RecvMsg(m interface{}) error {
  err := s.ClientStream.RecvMsg(m)
  log.Printf("[Client Stream Interceptor] recv: %v", m)
  return err
}