现在网上大部分都是 grpc 相关的介绍,真正涉及到 grpc 的配置使用的文章还是比较少的
所以本系列着重介绍 grpc 开发时可以能会用到的一些配置
拦截器在作用于每一个 RPC 调用,通常用来做日志,认证,metric 等等
interfactor 分为两种
- unary interceptor 拦截 unary(一元) RPC 调用
- stream interceptor 处理 stream RPC
client 和 server 端需要单独设置他们的 interceptor
客户端
UnaryClientInterceptor
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
可以通过参数获取 context
, method 名称
,发送的请求
, CallOption
用户可以根据这些信息,来改变调用或者做一些其他处理
interceptor 实现可以分为三步,预处理
,调用(invoker)RPC 方法
,调用后处理
预处理完成后,通过 invoker 来调用 RPC 方法
err := invoker(ctx, method, req, reply, cc, opts...)
调用RPC后,用户同样也可以处理返回的响应和错误
StreamClientInterceptor
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
stream interceptor 是拦截用户在 stream 上的操作,返回给用户自定义的 ClientStream
通过调用streamer
可以获得 ClientStream
, 包装ClientStream
并重载他的 RecvMsg
和 ·SendMsg
方法,即可做一些拦截处理了
最后将包装好的 ClientStream
返回给客户
type wrappedStream struct{
grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{})error{
log.Printf("Receive a message (Type: %T)", m)
return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream)SendMsg(m interface{})error{
log.Printf("Send a message (Type: %T)", m)
return w.ClientStream.SendMsg(m)
}
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption)(grpc.ClientStream, error){
// do soming
// return ClientStream
s , err := streamer(ctx, desc, cc, method, opts...)
if err != nil{
return nil, err
}
return &wrappedStream{s}, nil
}
配置拦截器
创建 ClientConn
时,使用 WithUnaryInterceptor 和 WithStreamInterceptor 来设置 interceptor
conn, err := grpc.Dial(
grpc.WithUnaryInterceptor(unaryInterepotr),
grpc.WithStreamInterceptor(streamInterceptor)
)
链式拦截器
创建 ClientConn
时,使用WithChainUnaryInterceptor 和 WithChainStreamInterceptor 可以设置链式的 interceptor
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption
第一个 interceptor
是最外层的,最后一个为最内层
使用WithUnaryInterceptor
, WithChainStreamInterceptor·
添加的interceptor,总是最先执行
服务端
UnaryServerInterceptor
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
处理和作用于 client 类似
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler)(interface{}, error){
// do somthing
m, err := handler(ctx, req)
if err != nil{
log.Printf("RPC failed: %v", err)
}
return m, err
StreamServerInteceptor
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
与客户端处理类似, 封装 ServerStream
,并覆盖 RecvMsg
,SendMsg
方法
调用 handler 需要传递自定义的 ServerStream
type StreamHandler func(srv interface{}, stream ServerStream) error
配置拦截器
在调用 grpc.NewServer
时通过 UnaryInterceptor, StreamInterceptor 来配置 interceptor
s := grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor)
)
链式拦截器
调用 grpc.NewServer
时,使用ChainUnaryInterceptor 和 ChainStreamInterceptor 可以设置链式的 interceptor
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption
第一个 interceptor
是最外层的,最后一个为最内层
使用UnaryInterceptor
, StreamInterceptor·
添加的interceptor,总是最先执行
example: Interceptor