本文将基于前文实现的编解码与心跳机制,构建一个简单的 RPC 框架,包括请求封装、响应解析、动态代理调用。为打造微服务通信基础打下基础。
一、什么是 RPC?
RPC(Remote Procedure Call,远程过程调用)允许你像调用本地方法一样调用远程服务。
RPC 框架的核心包括:
-
通信协议(我们用 Netty 实现)
-
服务注册与发现(此处简化为直连)
-
编码/解码机制(TLV、自定义协议)
-
动态代理与调用(Java 反射)
二、定义协议数据结构
请求对象
public class RpcRequest {private String className;private String methodName;private Class<?>[] paramTypes;private Object[] args;
}
响应对象
public class RpcResponse {private Object result;private Throwable error;
}
三、编码器和解码器(简化版)
此处建议使用 Java 内置序列化或 JSON,避免自行实现复杂字节协议
public class RpcEncoder<T> extends MessageToByteEncoder<T> {@Overrideprotected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) throws Exception {byte[] data = SerializationUtil.serialize(msg); // 自定义序列化工具out.writeInt(data.length);out.writeBytes(data);}
}
public class RpcDecoder<T> extends ByteToMessageDecoder {private final Class<T> clazz;public RpcDecoder(Class<T> clazz) { this.clazz = clazz; }@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) return;in.markReaderIndex();int length = in.readInt();if (in.readableBytes() < length) {in.resetReaderIndex();return;}byte[] bytes = new byte[length];in.readBytes(bytes);out.add(SerializationUtil.deserialize(bytes, clazz));}
}
四、客户端动态代理
public class RpcClientProxy {private final String host;private final int port;public RpcClientProxy(String host, int port) {this.host = host;this.port = port;}@SuppressWarnings("unchecked")public <T> T getProxy(Class<T> serviceClass) {return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),new Class<?>[]{serviceClass},(proxy, method, args) -> {RpcRequest request = new RpcRequest();request.setClassName(serviceClass.getName());request.setMethodName(method.getName());request.setParamTypes(method.getParameterTypes());request.setArgs(args);// Netty 同步发送请求,获取响应(略)RpcResponse response = NettyClient.send(request, host, port);return response.getResult();});}
}
五、服务端调用分发
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {RpcResponse response = new RpcResponse();try {Class<?> clazz = Class.forName(request.getClassName());Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());Object result = method.invoke(clazz.getDeclaredConstructor().newInstance(), request.getArgs());response.setResult(result);} catch (Exception e) {response.setError(e);}ctx.writeAndFlush(response);}
}
六、服务端注册
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new RpcDecoder(RpcRequest.class));pipeline.addLast(new RpcEncoder<>(RpcResponse.class));pipeline.addLast(new RpcServerHandler());}});
bootstrap.bind(8080).sync();
七、调用示例
// 定义服务接口
public interface HelloService {String hello(String name);
}// 服务实现类
public class HelloServiceImpl implements HelloService {public String hello(String name) {return "Hello, " + name;}
}// 客户端调用
RpcClientProxy proxy = new RpcClientProxy("localhost", 8080);
HelloService service = proxy.getProxy(HelloService.class);
System.out.println(service.hello("Netty"));
八、总结
通过本篇,你已经实现了:
-
基于 Netty 的 RPC 协议通信
-
编解码框架构建
-
动态代理与服务远程调用
-
基础版 Netty RPC 框架原型
虽然还不完善(无注册中心、无连接池、无异步支持),但已具备通信能力,是 Netty 技术栈走向分布式架构的重要一步。