真的够可以的,基于Netty实现了RPC框架( 二 )

至此API 模块就定义完成了,非常简单的两个接口 。接下来,我们要确定传输规则,也就是传输协议,协议内容当然要自定义,才能体现出Netty 的优势 。
设计一个InvokerMessage类,里面包含了服务名称、调用方法、参数列表、参数值,这就是我们自定义协议的协议包:
@Datapublic class InvokerMessage implements Serializable {private String className; // 服务名称private String methodName; // 调用哪个方法private Class<?>[] params; // 参数列表private Object[] values; // 参数值}通过定义这样的协议类,就能知道我们需要调用哪个服务,服务中的哪个方法,方法需要传递的参数列表(参数类型+参数值),这些信息正确传递过去了才能拿到正确的调用返回值 。
接下来创建这两个服务的具体实现类,IRpcHello的实现类如下:
public class RpcHelloProvider implements IRpcHello {public String hello(String name) {return "Hello, " + name + "!";}}IRpcCalc的实现类如下:
public class RpcCalcProvider implements IRpcCalc {@Overridepublic int add(int a, int b) {return a + b;}@Overridepublic int sub(int a, int b) {return a - b;}@Overridepublic int mul(int a, int b) {return a * b;}@Overridepublic int div(int a, int b) {return a / b;}}Registry 注册中心主要功能就是负责将所有Provider的服务名称和服务引用地址注册到一个容器中(这里为了方便直接使用接口类名作为服务名称,前提是假定我们每个服务只有一个实现类),并对外发布 。Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口 。先启动一个Netty服务,创建RpcRegistry 类,RpcRegistry.java的具体代码如下:
public class RpcRegistry {private final int port;public RpcRegistry(int port){this.port = port;}public void start(){NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();try{ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workGroup).channel(NIOServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();// 处理拆包、粘包的编解码器pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));pipeline.addLast(new LengthFieldPrepender(4));// 处理序列化的编解码器pipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));// 自己的业务逻辑pipeline.addLast(new MyRegistryHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); // 设置长连接ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();System.out.println("RPC Registry start listen at " + this.port);channelFuture.channel().closeFuture().sync();} catch (Exception e){e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}public static void main(String[] args) {new RpcRegistry(8080).start();}}接下来只需要实现我们自己的Handler即可,创建MyRegistryHandler.java,内容如下:
public class MyRegistryHandler extends ChannelInboundHandlerAdapter {// 在注册中心注册服务需要有容器存放public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>();// 类名的缓存位置private static final List<String> classCache = new ArrayList<>();// 约定,只要是写在provider下所有的类都认为是一个可以对完提供服务的实现类// edu.xpu.rpc.providerpublic MyRegistryHandler(){scanClass("edu.xpu.rpc.provider");doRegister();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Object result = new Object();// 客户端传过来的调用信息InvokerMessage request = (InvokerMessage)msg;// 先判断有没有这个服务String serverClassName = request.getClassName();if(registryMap.containsKey(serverClassName)){// 获取服务对象Object clazz = registryMap.get(serverClassName);Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams());result = method.invoke(clazz, request.getValues());System.out.println("request=" + request);System.out.println("result=" + result);}ctx.writeAndFlush(result);ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}// 实现简易IOC容器// 扫描出包里面所有的Classprivate void scanClass(String packageName){ClassLoader classLoader = this.getClass().getClassLoader();URL url = classLoader.getResource(packageName.replaceAll("\.", "/"));File dir = new File(url.getFile());File[] files = dir.listFiles();for (File file: files){if(file.isDirectory()){scanClass(packageName + "." + file.getName());}else{// 拿出类名String className = packageName + "." + file.getName().replace(".class", "").trim();classCache.add(className);}}}// 把扫描到的Class实例化,放到Map中// 注册的服务名称就叫做接口的名字 [约定优于配置]private void doRegister(){if(classCache.size() == 0) return;for (String className: classCache){try {Class<?> clazz = Class.forName(className);// 服务名称Class<?> anInterface = clazz.getInterfaces()[0];registryMap.put(anInterface.getName(), clazz.newInstance());} catch (Exception e) {e.printStackTrace();}}}}


推荐阅读