一、简介
我们之前其实已经完成了关于grpc的一些基础用法,实际上还有一些比较相对进阶的使用方式。比如:
- 拦截器:包括客户端和服务端的拦截器,进而在每一端都可以划分为流式的拦截器和非流式的拦截器。和以前我们在spring web中的拦截器思路是一样的,都可以在请求来之前做一些统一的处理,进而减少代码量,做一些鉴权 ,数据校验 ,限流等等,和业务解耦。
gRPC的拦截器- 一元请求的 拦截器
客户端 【请求 响应】
服务端 【请求 响应】 - 流式请求的 拦截器 (Stream Tracer)
客户端 【请求 响应】
服务端 【请求 响应】
- 一元请求的 拦截器
- 客户端重试:grpc的客户端还可以发起重试请求,当我们有一些异常并非代码异常的时候,可以通过重试来避免问题。
- NameResolver :当用于微服务的时候,需要注册中心对服务名的解析等等。
- 负载均衡:包括(pick-first , 轮训)等轮训方式。
- 可以在其他微服务框架中整合,比如dubbo中,spring cloud中,用protobuf来序列化数据,用grpc来发起rpc(比如可以替代open fegin)等场合。
下面我们就来从拦截器功能开始学习一下grpc。
二、项目构建
我们为进阶篇搭建一个新的工程。结构还是客户端,服务端,api模块。
其中api模块作为公共内容被其他模块引入做公共的声明使用。
api模块: rpc-grpc-adv-api
服务端模块:rpc-grpc-adv-server
客户端模块: rpc-grpc-adv-client
1、api模块
<dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.51.0</version><scope>runtime</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.51.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.51.0</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version><scope>provided</scope></dependency>
</dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact><outputDirectory>${basedir}/src/main/java</outputDirectory><clearOutputDirectory>false</clearOutputDirectory></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins>
</build>
在main目录下创建proto目录,下建立Hello.proto文件,声明内容为。
syntax = "proto3";package com.levi;option java_multiple_files = false;
option java_package = "com.levi";
option java_outer_classname = "HelloProto";message HelloRequest{string name = 1;
}message HelloRespnose{string result = 1;
}service HelloService{// 普通方法rpc hello(HelloRequest) returns (HelloRespnose);// 双端流方法rpc hello1(stream HelloRequest) returns (stream HelloRespnose);
}
然后通过编译器编译生成对应的message类HelloProto.java和service类HelloServiceGrpc.java。
然后该模块将会被其他模块引用,使用这些定义的类。
2、server模块
引入api模块。
<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>
服务端业务代码为:
@Slf4j
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloRespnose> responseObserver) {String name = request.getName();System.out.println("接收到客户端的参数name = " + name);responseObserver.onNext(HelloProto.HelloRespnose.newBuilder().setResult("this is server result").build());responseObserver.onCompleted();}
}
服务端启动代码为:
package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}
3、client模块
引入api模块。
<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>
ok,至此就搭建完了项目结构。
三、拦截器
1、一元拦截器
其中一元拦截器就是在我们以前的一元通信模式使用的。也就是非流式的通信模式下。
而一元拦截器也分为两种:客户端拦截器和服务端拦截器
每一种下面又能分为两种
1.简单模式:只能拦截请求,不能拦截响应。
2.复杂模式:可以拦截请求和响应两种。
下面我们先来研究客户端拦截器。
1.1、客户端拦截器
我们说客户端拦截器又分为简单模式和复杂模式。
1.1.1、简单客户端拦截器
我们来开发客户端的代码,首先我们来编写一个简单拦截器。
package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/**
* 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor
* 该拦截器在客户端发起请求时被调用,
* 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等
*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....");/** 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传* 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项* 其实就是拦截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接)* 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用*/return next.newCall(method, callOptions);
}
我们定义好拦截器之后就要整合在客户端调用的构建上。
package com.levi;import com.levi.interceptor.CustomClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.List;public class GrpcClient {public static void main(String[] args) {ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000)// .intercept(new CustomClientInterceptor())// 可以传递多个拦截器,按照传递顺序执行拦截器.intercept(List.of(new CustomClientInterceptor())).usePlaintext().build();try {HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel);HelloProto.HelloRequest helloRequest = HelloProto.HelloRequest.newBuilder().setName("levi").build();HelloProto.HelloRespnose helloRespnose = helloServiceBlockingStub.hello(helloRequest);System.out.println("接收到的服务端响应为: " + helloRespnose.getResult());} catch (Exception e) {e.printStackTrace();} finally {managedChannel.shutdown();}}
}
启动服务端,启动客户端,日志显示正常输出没毛病。
但是此时我们也看出来这种简单模式存在几个问题。
# 客户端简单拦截器的问题
1. 只能拦截请求,不能拦截响应。我们只能在请求的时候发起拦截,但是接收响应的时候无法拦截,也就是类似spring mvc的时候没有后置的拦截能力。
2. 即使拦截了请求操作,但是就这个请求拦截上,这个业务粒度也是过于宽泛,不精准。无法在请求的各个阶段发起拦截(1. 开始阶段 2. 设置消息数量 3.发送数据阶段 4.半连接阶段。),其实我们上面的代码可以看出来我们的拦截器是在往下传递ClientCall给grpc,也就是这个调用最后是ClientCall完成的。这里的各个阶段拦截其实就是在ClientCall的各个方法里面插入一些拦截操纵,其实就是在发起请求前,在ClientCall构建的各个阶段拦截一下(这个的底层应该是netty那些阶段性的事件感知实现的。)
装饰者模式增强了一下。
1.1.2、复杂客户端拦截器
我们前面操作的简单客户端请求拦截器粒度比较大,无法实现对请求过程的更加细力度的监听和管理。所以我们需要一个更加强大的拦截器。我们说白了就是对原来能正常请求中间加一些增强方法,其实就是装饰者模式,包装一下原始类型。在原始类型的基础上加了一堆方法分别在各个阶段生效,从而来增强原始能力。但是真正的rpc调用实现还是原始类型发起的。
请求拦截
于是我们来写一下代码。
/*这个类型增强原始类型 适用于控制 拦截 请求发送各个环节*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否则就发起请求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");//真正的去发起grpc的请求// 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到// delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式delegate().start(responseListener, headers);}
}
我们看到这就是一个增强的包装类,他是对原始的简单拦截器的那个ClientCall的包装。我们看到它在后续的动作之前增强了一个检查的实现。
然后你钥匙要继续就一定要delegate().start才会往下走。否则没启动ClientCall会报错。
ok,我们已经完成了增强ClientCall的开发,现在要把原来的拦截器方法里面的简单ClientCall替换为增强ClientCall。
我们来修改拦截器代码。
/*** 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor* 该拦截器在客户端发起请求时被调用,* 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....");/** 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传* 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项* 其实就是拦截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接)* 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用*/// return next.newCall(method, callOptions);/** 如果我们需要用复杂客户端拦截器 ,就需要对原始的ClientCall进行包装* 那么这个时候,就不能返回原始ClientCall对象,* 应该返回 包装的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}
于是,启动服务端代码,客户端代码观察执行结果。
没有问题。
此外这个增强拦截还有更加细粒度的方法增强,我们来实现一下。
package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;import javax.annotation.Nullable;/*** 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor* 该拦截器在客户端发起请求时被调用,* 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....");/** 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传* 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项* 其实就是拦截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接)* 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用*/// return next.newCall(method, callOptions);/** 如果我们需要用复杂客户端拦截器 ,就需要对原始的ClientCall进行包装* 那么这个时候,就不能返回原始ClientCall对象,* 应该返回 包装的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}/*这个类型增强原始类型 适用于控制 拦截 请求发送各个环节*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否则就发起请求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");//真正的去发起grpc的请求// 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到// delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式delegate().start(responseListener, headers);}// 真正开始发送消息,netty的发送消息的方法,outBoundBuffer@Overridepublic void sendMessage(ReqT message) {log.info("发送请求数据: {}", message);super.sendMessage(message);}// 指定发送消息的数量,类似批量发送@Overridepublic void request(int numMessages) {log.info("指定发送消息的数量: {}", numMessages);super.request(numMessages);}// 取消请求的时候回调触发@Overridepublic void cancel(@Nullable String message, @Nullable Throwable cause) {log.info("取消请求: {}", message);super.cancel(message, cause);}// 链接半关闭的时候回调触发,请求消息无法发送,但是可以接受响应的消息@Overridepublic void halfClose() {log.info("链接半关闭");super.halfClose();}// 消息发送是否启用压缩@Overridepublic void setMessageCompression(boolean enabled) {log.info("消息发送是否启用压缩: {}", enabled);super.setMessageCompression(enabled);}// 是否可以发送消息,这个在流式里面会调用,一元的不会@Overridepublic boolean isReady() {log.info("是否可以发送消息: {}", super.isReady());return super.isReady();}
}
运行程序结果为:
至此我们看到我们在客户端请求的各个阶段都进行了监听回调。这就是客户端的请求增强了。
响应拦截
前面我们完成的是对于请求的拦截,其实我们可以在客户端这里对服务端响应的拦截。我们可以拦截响应数据,这个能力可以让我们在不同的客户端定制自己的拦截需求。服务端不管你的需求,都返回,你不同的客户端可能有不同的要求,自己去做拦截定制。
我们先来看一下我们之前的那个请求增强。
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");delegate().start(responseListener, headers);}
}
你有请求拦截才有响应拦截,而且响应拦截我们一般都是通过监听器来实现的,因为客户端你也不知道你啥时候响应,所以就需要监听回调的形式来监听。我们看到在checkedStart这个方法这里。他的参数列表里面有一个responseListener,响应监听器。其实就是这个东西,我们需要重新实现他,然后传进去,他就会在checkedStart调用的时候传递给grpc,grpc就根据你的实现来拦截了。现在他是一个responseListener,啥也没有,你要想拦截功能还是要增强包装。所以我们来实现一下。
/*用于监听响应,并对响应进行拦截,其中响应头回来onHeaders被调用是服务端的responseObserver.onNext这个调用触发的。而服务端调用responseObserver.onCompleted()才会回调onMessage这个。对应的其实就是netty的write和flush,responseObserver.onCompleted()才会真的flush,把数据写回来。可以在服务端做修改测试一下。*/
@Slf4j
class CustomCallListener<RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {// 构造器包装原始的Listener,下面的回调实现包装增强。protected CustomCallListener(ClientCall.Listener<RespT> delegate) {super(delegate);}@Overridepublic void onHeaders(Metadata headers) {log.info("响应头信息 回来了......");super.onHeaders(headers);}@Overridepublic void onMessage(RespT message) {log.info("响应的数据 回来了.....{} ", message);super.onMessage(message);}// 这个在流式里面会调用,一元的不会,可以不实现@Overridepublic void onReady() {super.onReady();}//这个在流式里面会调用,一元的不会,可以不实现@Overridepublic void onClose(Status status, Metadata trailers) {super.onClose(status, trailers);}
}
通过构造函数执行包装,然后再包装里面增强。此时我们只需要替代delegate().start(responseListener, headers);中的参数responseListener为我们自己定义的就好了。
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否则就发起请求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("发送请求数据之前的检查.....");//真正的去发起grpc的请求// 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到// delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式// delegate().start(responseListener, headers);delegate().start(new CustomCallListener<>(responseListener), headers);// 传入增强响应拦截}...... 省略其余代码
}
执行没有问题。
1.2、服务端拦截器
1.2.1、服务端简单拦截器
对应客户端那边的拦截器,服务端这里其实是对应的,该有的都有。我们来看下服务端的简单拦截器。
/*** 自定义服务端拦截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");// 返回req请求监听器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);return reqTListener;}
}
这里的概念我们都可以在客户端那里找到对应的。我们看到这个方法返回了一个ServerCall.Listener 类型的,而且泛型是req,可见是对于请求的监听器。作为服务端,他是被动连接的,所以她的拦截方式就是监听,什么时候来我什么时候拦截,他不知道你啥时候来,就只能监听着。而next.startCall(call, headers);返回的就是一个具有原始能力的拦截器,没有封装增强的。我们把这个拦截器整合到服务端发布代码中使其生效。
package com.levi;import com.levi.interceptor.CustomServerInterceptor;
import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());// 注册自定义拦截器serverBuilder.intercept(new CustomServerInterceptor());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}
我们把自定义的拦截器注册进去之后启动服务端和客户端看一下。没有问题。
同样的服务端的简单拦截器也存在像客户端那边的问题,
拦截请求发送过来的数据,无法处理响应的数据。
拦截力度过于宽泛所以我么需要复杂拦截器,增强原始的拦截器,达到更加细力度的控制拦截。一切都和当初我们在客户端做的一样,重新定义一个增强的。
1.2.2、服务端复杂拦截器
拦截请求,拦截的是客户端过来的消息
/*** 自定义服务端拦截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");// 返回req请求监听器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器// ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);// 包装器设计模式,封装原始的监听器,增强原始监听器的功能,实际的核心调用还是原始的在做// 只是加了一些额外的增强的方法return new CustomServerCallListener<>(next.startCall(call, headers));}
}/*** 复杂服务端拦截器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器* 对于reqTListener的事件,我们可以在事件触发时,做一些自定义的操作,* 本质是对于原始监听器的一个包装增强,包装器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//准备接受请求数据public void onReady() {log.debug("onRead Method Invoke,准备好接收客户端数据....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客户端请求提交的数据,客户端的请求数据是: {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("监听到了 半连接触发这个操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服务端 调用onCompleted()触发...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出现异常后 会调用这个方法... 可以在这里做一些关闭资源的操作");super.onCancel();}
}
在经历了客户端的开发之后,我们这里其实就很好理解了。调用没有问题。
拦截响应,拦截的是服务端发给客户端的响应
我们能拦截请求,自然也就能拦截响应。我们先来看一下什么是服务端的响应,其实就是服务端回写给客户端的操作。也就是服务端调用客户端的操作,我们上面拦截请求其实是客户端发给服务端,也即是服务端的监听ServerCall.Listener,服务端的监听器做包装增强。
现在你要增强服务端对客户端的调用其实就是ServerCall(这里对应我们客户端那里的ClientCall)。所以我们要对ServerCall做包装。就是你谁干啥就增强啥就实现啥就行。
/*** 目的:通过自定义的ServerCall 包装原始的ServerCall 增加对于响应拦截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {// 这里包装的是ServerCallprotected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定发送消息的数量 【响应消息】public void request(int numMessages) {log.debug("response 指定消息的数量 【request】");super.request(numMessages);}@Override//设置响应头public void sendHeaders(Metadata headers) {log.debug("response 设置响应头 【sendHeaders】");super.sendHeaders(headers);}@Override//响应数据public void sendMessage(RespT message) {log.debug("response 响应数据 【send Message 】 {} ", message);super.sendMessage(message);}@Override//关闭连接public void close(Status status, Metadata trailers) {log.debug("response 关闭连接 【close】");super.close(status, trailers);}
}
然后我们再把这个包装增强整合到拦截器里面,交给grpc的体系中才能生效,在interceptCall中进行整合,我们不需要改动服务端发布那里的代码,那里可以直接通过CustomServerInterceptor来处理我们这里整合到的两个拦截器,以下为完整代码。
package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/*** 自定义服务端拦截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");//包装ServerCall 处理服务端响应拦截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包装Listener 处理服务端请求拦截CustomServerCallListener<ReqT> reqTCustomServerCallListener = new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));return reqTCustomServerCallListener;}
}/*** 复杂服务端拦截器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器* 对于reqTListener的事件,我们可以在事件触发时,做一些自定义的操作,* 本质是对于原始监听器的一个包装增强,包装器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//准备接受请求数据public void onReady() {log.debug("onRead Method Invoke,准备好接收客户端数据....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客户端请求提交的数据,客户端的请求数据是: {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("监听到了 半连接触发这个操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服务端 调用onCompleted()触发...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出现异常后 会调用这个方法... 可以在这里做一些关闭资源的操作");super.onCancel();}
}/*** 通过自定义的ServerCall 包装原始的ServerCall 增加对于响应拦截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {protected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定发送消息的数量 【响应消息】public void request(int numMessages) {log.debug("response 指定消息的数量 【request】");super.request(numMessages);}@Override//设置响应头public void sendHeaders(Metadata headers) {log.debug("response 设置响应头 【sendHeaders】");super.sendHeaders(headers);}@Override//响应数据public void sendMessage(RespT message) {log.debug("response 响应数据 【send Message 】 {} ", message);super.sendMessage(message);}@Override//关闭连接public void close(Status status, Metadata trailers) {log.debug("response 关闭连接 【close】");super.close(status, trailers);}
}
而且我们看到增强类都是要实现构造的,因为要传进去原始类,进行封装,调用核心方法还是走super,走那个原始的操作。你的增强的操作可以加在这些新的方法里面。这些增强方法,你可以酌情看你要的业务方法,需要的就实现,不需要就可以不覆盖实现。其实他就是服务端的响应的各个阶段不同的触发。
我们运行代码没有问题,各个阶段都被触发了。
对于你要是想只拦截响应,不拦截请求可以这么做。
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服务器端 拦截请求操作的功能 写在这个方法中log.debug("服务器端拦截器生效.....");//包装ServerCall 处理服务端响应拦截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包装Listener 处理服务端请求拦截CustomServerCallListener<ReqT> reqTCustomServerCallListener =new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));// return reqTCustomServerCallListener;/*** 只拦截响应,我们就不需要包装Listener,也就是返回原始的Listener即可。原始的Listener我们是通过* next.startCall(reqTRespTCustomServerCall, headers)获取到的。所以继续用next.startCall不操作包装的* Listener即可,但是我们要包装响应也就是serverCall,所以返回reqTRespTCustomServerCall。包在原始Listener中* 你要是包装请求,那就是需要包装的Listener,不需要就直接next.startCall返回startCall即可。*/return next.startCall(reqTRespTCustomServerCall, headers);}
明白请求是包装的listener,响应是servercall,需要哪个就加强哪个就行,不需要增强拦截就用原始的就行。
四、总结
这就是grpc中比较常见的一元拦截器的使用,他是对于一元rpc的拦截。在各个拦截方法中我们可以定义一些自己的业务方法。进而灵活使用拦截器。而且你要是在某个点拦截之后不想继续往下走,那你就不要调用每个拦截方法的super,不要做后续的调用,直接断开链路即可。而且至于拦截请求还是响应就看你包装啥就完了,他不是耦合在一起的。
后面我们再来分析监听流也就是流式和双向调用的拦截器。