grpc 开发进阶 - 使用拦截器 interceptor

Posted by icebergu on 07-07,2020

现在网上大部分都是 grpc 相关的介绍,真正涉及到 grpc 的配置使用的文章还是比较少的
所以本系列着重介绍 grpc 开发时可以能会用到的一些配置

拦截器在作用于每一个 RPC 调用,通常用来做日志,认证,metric 等等

interfactor 分为两种

  • unary interceptor 拦截 unary(一元) RPC 调用
  • stream interceptor 处理 stream RPC

client 和 server 端需要单独设置他们的 interceptor

客户端

UnaryClientInterceptor

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

可以通过参数获取 context, method 名称,发送的请求, CallOption
用户可以根据这些信息,来改变调用或者做一些其他处理

interceptor 实现可以分为三步,预处理调用(invoker)RPC 方法调用后处理
预处理完成后,通过 invoker 来调用 RPC 方法

err := invoker(ctx, method, req, reply, cc, opts...)

调用RPC后,用户同样也可以处理返回的响应和错误

StreamClientInterceptor

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

stream interceptor 是拦截用户在 stream 上的操作,返回给用户自定义的 ClientStream

通过调用streamer 可以获得 ClientStream, 包装ClientStream 并重载他的 RecvMsg 和 ·SendMsg 方法,即可做一些拦截处理了
最后将包装好的 ClientStream 返回给客户

type wrappedStream struct{
	grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{})error{
	log.Printf("Receive a message (Type: %T)", m)
	return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream)SendMsg(m interface{})error{
	log.Printf("Send a message (Type: %T)", m)
	return w.ClientStream.SendMsg(m)
}

func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption)(grpc.ClientStream, error){
	// do soming

	// return ClientStream
	s , err := streamer(ctx, desc, cc, method, opts...)
	if err != nil{
		return nil, err
		}
	return &wrappedStream{s}, nil
}

配置拦截器

创建 ClientConn时,使用 WithUnaryInterceptorWithStreamInterceptor 来设置 interceptor

conn, err := grpc.Dial(
    grpc.WithUnaryInterceptor(unaryInterepotr),
    grpc.WithStreamInterceptor(streamInterceptor)
)

链式拦截器

创建 ClientConn时,使用WithChainUnaryInterceptorWithChainStreamInterceptor 可以设置链式的 interceptor

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

第一个 interceptor 是最外层的,最后一个为最内层

使用WithUnaryInterceptor, WithChainStreamInterceptor·添加的interceptor,总是最先执行

服务端

UnaryServerInterceptor

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

处理和作用于 client 类似

func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler)(interface{}, error){
	// do somthing
	m, err := handler(ctx, req)
	if err != nil{
		log.Printf("RPC failed: %v", err)
	}
	return m, err

StreamServerInteceptor

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

与客户端处理类似, 封装 ServerStream,并覆盖 RecvMsg,SendMsg 方法
调用 handler 需要传递自定义的 ServerStream

type StreamHandler func(srv interface{}, stream ServerStream) error

配置拦截器

在调用 grpc.NewServer 时通过 UnaryInterceptor, StreamInterceptor 来配置 interceptor

s := grpc.NewServer(
    grpc.UnaryInterceptor(unaryInterceptor),
    grpc.StreamInterceptor(streamInterceptor)
)

链式拦截器

调用 grpc.NewServer 时,使用ChainUnaryInterceptorChainStreamInterceptor 可以设置链式的 interceptor

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

第一个 interceptor 是最外层的,最后一个为最内层

使用UnaryInterceptor, StreamInterceptor·添加的interceptor,总是最先执行

example: Interceptor