1 RPC概述 当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本, 通常会采用单一应用架构。之后出现了ORM 框架,主要用于简化增删改查工作流的,数据访问框架ORM 是关键。
随着用户量增加,当访问量逐渐增大,单一应用增加机器,带来的加速度越来越小,需要将应用拆分成互不干扰的几个应用,以提升效率,于是就出现了垂直应用架构。MVC 架构就是一种非常经典的用于加速前端页面开发的架构。
当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服逐渐形成稳定的服务中心,使前端应用能更快速的响应,多变的市场需求,就出现了分布式服务架构。分布式架构下服务数量逐渐增加,为了提高管理效率,RPC 框架应运而生。RPC 用于提高业务复用及整合的,分布式服务框架下RPC 是关键。
下一代框架,将会是流动计算架构占据主流。当服务越来越多,容量的评估,小服务的资源浪费等问题,逐渐明显。此时,需要增加一个调度中心,基于访问压力实时管理集群容量,提高集群利用率。SOA架构就是用于提高及其利用率的,资源调度和治理中心SOA是关键。
Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信。
2 环境预设 1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > io,netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.6.Final</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.16.10</version > </dependency >
主要模块包括:
api:主要用来定义对外开放的功能与服务接口。
protocol:主要定义自定义传输协议的内容。
registry:主要负责保存所有可用的服务名称和服务地址。
provider:实现对外提供的所有服务的具体功能。
consumer:客户端调用。
monitor:完成调用链监控。
3 代码 3.1 API模块 1 2 3 4 public interface IRpcHelloService { String hello (String name) ; }
1 2 3 4 5 6 7 public interface IRpcService { public int add (int a, int b) ; public int sub (int a, int b) ; public int mult (int a, int b) ; public int div (int a, int b) ; }
3.2 创建自定义协议 1 2 3 4 5 6 7 @Data public class InvokerProtocol implements Serializable { private String className; private String methodName; private Class<?>[] params; private Object[] values; }
3.3 实现Provider服务端业务逻辑 1 2 3 4 5 public class RpcHelloServicceImpl implements IRpcHelloService { public String hello (String name) { return "Hello " + name + "!" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class RpcServiceImpl implements IRpcService { public int add (int a, int b) { return a + b; } public int sub (int a, int b) { return a - b; } public int mult (int a, int b) { return a * b; } public int div (int a, int b) { return a / b; } }
3.4 Registry服务注册 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class RpcRegistry { private int port; public RpcRegistry (int port) { this .port = port; } public void start () { EventLoopGroup bossGroup = new NioEventLoopGroup (); EventLoopGroup workerGroup = new NioEventLoopGroup (); try { ServerBootstrap b = new ServerBootstrap (); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>(){ @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LenthFieldBasedFrameDecoder (Integer.MAX_VALUE, 0 , 4 , 0 , 4 )); pipeline.addLast(new LenthFieldPrepender (4 )); pipeline.addLast("encoder" , new ObjectEncoder ()); pipeline.addLast("decoder" , new ObjectDecoder (Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null ))); pipeline.addLast(new RegistryHandler ()); } }) .option(ChannelOption.SO_BACKLOG, 128 ) .childOption(ChannelOption.SO_KEEPALIVE, true ); ChannelFuture future = b.bind(port).sync(); System.out.println("RPC Registry start listen at " + port); future.channel().closeFuture().sync(); } catch (Exception e){ bossGroup.shutdownGraceFully(); workerGroup.shutdownGraceFully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class RegistryHandler extends ChannelInboundHandlerAdapter { public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap <>(); private List<String> classNames = new ArrayList <>(); public RegistryHandler () { scannerClass("com.test.netty.rpc.provider" ); doRegister(); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { Object result = new Object (); InvokerProrocol request = (InvokerProrocol) msg; if (registryMap.containsKey(request.getClassName())){ Object clazz = registryMap.get(request.getClassName()); Method method = clazz.getClass().getMethod(request.getMethodName(), requet.getParams()); result = method.invoke(clazz, request.getValues()); } ctx.write(result); ctx.flush(); ctx.close(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } private void scannereClass (String packageName) { URL url = this .getClass().getClassLoadere().getResource(packageName.replaceAll("\\." . "/" )); File dir = new File (url.getFile()); for (File file : dir.listFiles()){ if (file.isDirectory()){ scannerClass(packageName + "." + file.getName); } else { classNames.add(packageName + "." + file.getName().replace(".class" , "" ).trim()); } } } private void doRegistry () { if (classNames.size() == 0 ) return ; for (String className : classNames){ try { Class<?> clazz = Class.forName(className); Class<?> i = clazz.getInterfaces()[0 ]; registryMap.put(i.getName(), clazz.newInstance()); } catch (Exception e){ e.printStackTrace(); } } } }
3.5 Consumer远程调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class RpcProxy { public static <T> T create (Class<?> clazz) { MethodProxy proxy = new MethodProxy (clazz); Class<?>[] interfaces = clazz.isInterface() ? new Class []{clazz} : clazz.getInterfaces(); T result = (T) Proxy.newProxyInnstance(clazz.getClassLoader, interfaces, proxy); return result; } private static class MethodProxy implements InvocationHandler { private Class<?> clazz; public MethodProxy (Class<?> clazz) { this .clazz = clazz; } public Object invoke (Object proxy, Method mehtod, Object[] args) throws Throwable{ if (Object.class.equals(method.getDeclaringClass())){ try { return method.invoke(this , args); } catch (Throwable t){ t.printStackTrace(); } } else { return rpcInvoke(proxy, method, args); } return null ; } public Object rpcInvoke (Object proxy, Method method, Object[] args) { InvokerProtocol msg = new InvokerProtocol (); msg.setClassName(this .clazz.getName()); msg.setMethodName(method.getName()); msg.setValues(args); msg.setParams(method.getParameterTypes()); final RpcProxyHandler consumerHandler = new RpcProxyHandler (); EventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap b = new Bootstrap (); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true ) .handler(new ChannelInitializer <SocketChannel>(){ @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipelien.addLast("frameDecoder" , new LengthFieldBasedFrameDecoder (Interger.MAX_VALUE, 0 , 4 , 0 , 4 )); pipeline.addLast("frameEncoder" , new LengthFieldPrepender (4 )); pipeline.addLast("encoder" , new ObjectEncoder ()); pipeline.addLast("decoder" , new ObjectDecoder (Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null ))); pipeline.addLast("handler" , consumerHandler); } }); ChannelFuture future = b.connect("localhost" , 8080 ).sync(); future.channel().writeAndFlush(msg).sync(); future.channel().closeFuture().sync(); } catch (Exception e){ e.printStackTrace(); } finally { group.shutdownGracefully(); } return consumerHandler.getResponse(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class RpcProxyHandler extends ChannelInboundHandlerAdapter { private Object response; public object getResponse () { return response; } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception{ response = msg; } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception is genneral" ); } }
完成客户端调用代码:
1 2 3 4 5 6 7 8 9 public class RpcConsumer { IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); System.out.println(rpcHello.hello("test" )); IRpcService service = RpcProxy.create(IRpcService.class); System.out.println("8 + 2 = " + service.add(8 , 2 )); System.out.println("8 - 2 = " + service.sub(8 , 2 )); System.out.println("8 * 2 = " + service.mult(8 , 2 )); System.out.println("8 / 2 = " + service.div(8 , 2 )); }