Netty重构RPC

1 RPC概述

当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本, 通常会采用单一应用架构。之后出现了ORM 框架,主要用于简化增删改查工作流的,数据访问框架ORM 是关键。

随着用户量增加,当访问量逐渐增大,单一应用增加机器,带来的加速度越来越小,需要将应用拆分成互不干扰的几个应用,以提升效率,于是就出现了垂直应用架构。MVC 架构就是一种非常经典的用于加速前端页面开发的架构。

当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服逐渐形成稳定的服务中心,使前端应用能更快速的响应,多变的市场需求,就出现了分布式服务架构。分布式架构下服务数量逐渐增加,为了提高管理效率,RPC 框架应运而生。RPC 用于提高业务复用及整合的,分布式服务框架下RPC 是关键。

下一代框架,将会是流动计算架构占据主流。当服务越来越多,容量的评估,小服务的资源浪费等问题,逐渐明显。此时,需要增加一个调度中心,基于访问压力实时管理集群容量,提高集群利用率。SOA架构就是用于提高及其利用率的,资源调度和治理中心SOA是关键。

Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信。

2 环境预设

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>io,netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>

主要模块包括:

​ api:主要用来定义对外开放的功能与服务接口。

​ protocol:主要定义自定义传输协议的内容。

​ registry:主要负责保存所有可用的服务名称和服务地址。

​ provider:实现对外提供的所有服务的具体功能。

​ consumer:客户端调用。

​ monitor:完成调用链监控。

3 代码

3.1 API模块

1
2
3
4
//用来确认服务是否可用
public interface IRpcHelloService {
String hello(String name);
}
1
2
3
4
5
6
7
//模拟业务加、减、乘、除运算
public interface IRpcService {
public int add(int a, int b);
public int sub(int a, int b);
public int mult(int a, int b);
public int div(int a, int b);
}

3.2 创建自定义协议

1
2
3
4
5
6
7
@Data
public class InvokerProtocol implements Serializable {
private String className; //类名
private String methodName; //函数名
private Class<?>[] params; //参数类型
private Object[] values; //参数列表
}

3.3 实现Provider服务端业务逻辑

1
2
3
4
5
public class RpcHelloServicceImpl implements IRpcHelloService {
public String hello(String name) {
return "Hello " + name + "!";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RpcServiceImpl implements IRpcService {
public int add(int a, int b){
return a + b;
}
public int sub(int a, int b){
return a - b;
}
public int mult(int a, int b){
return a * b;
}
public int div(int a, int b){
return a / b;
}
}

3.4 Registry服务注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RpcRegistry {
private int port;
public RpcRegistry(int port){
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个
maxFrameLength: 框架的最大长度,如果帧的长度大于此值,则抛出TooLongFrameException
lengthFieldOffset: 长度字段的偏移量,即对应的长度字段在整个消息数据中的位置
lengthFieldLength: 长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long就是8)
lengthAdjustment: 要添加到长度字段值的补偿值
initialBytesToStrip: 从解码帧中去除的第一个字节数
*/
pipeline.addLast(new LenthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(new LenthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new RegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = b.bind(port).sync();
System.out.println("RPC Registry start listen at " + port);
future.channel().closeFuture().sync();
} catch(Exception e){
bossGroup.shutdownGraceFully();
workerGroup.shutdownGraceFully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class RegistryHandler extends ChannelInboundHandlerAdapter {
//用于保存所有可用的服务
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>();
//保存所有相关的服务类
private List<String> classNames = new ArrayList<>();

public RegistryHandler(){
//完成递归扫描
scannerClass("com.test.netty.rpc.provider");
doRegister();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
InvokerProrocol request = (InvokerProrocol) msg;
//当客户端建立连接时,需要从自定义协议中获取消息,拿到具体的服务和实参
//使用反射调用
if(registryMap.containsKey(request.getClassName())){
Object clazz = registryMap.get(request.getClassName());
Method method = clazz.getClass().getMethod(request.getMethodName(), requet.getParams());
result = method.invoke(clazz, request.getValues());
}
ctx.write(result);
ctx.flush();
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

//递归扫描
private void scannereClass(String packageName){
URL url = this.getClass().getClassLoadere().getResource(packageName.replaceAll("\\.". "/"));
File dir = new File(url.getFile());
for(File file : dir.listFiles()){
//如果是一个文件夹,继续递归
if(file.isDirectory()){
scannerClass(packageName + "." + file.getName);
} else {
classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
}
}
}
//完成注册
private void doRegistry(){
if(classNames.size() == 0) return;
for(String className : classNames){
try {
Class<?> clazz = Class.forName(className);
Class<?> i = clazz.getInterfaces()[0];
registryMap.put(i.getName(), clazz.newInstance());
} catch (Exception e){
e.printStackTrace();
}
}
}
}

3.5 Consumer远程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class RpcProxy{
public static <T> T create(Class<?> clazz){
//clazz传进来本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
Class<?>[] interfaces = clazz.isInterface() ? new Class[]{clazz} : clazz.getInterfaces();
T result = (T) Proxy.newProxyInnstance(clazz.getClassLoader, interfaces, proxy);
return result;
}

private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;
public MethodProxy(Class<?> clazz){
this.clazz = clazz;
}

public Object invoke(Object proxy, Method mehtod, Object[] args) throws Throwable{
//如果传进来是一个已实现的具体类,忽略
if(Object.class.equals(method.getDeclaringClass())){
try {
return method.invoke(this, args);
} catch (Throwable t){
t.printStackTrace();
}
} else { //传进来的是一个接口(核心)
return rpcInvoke(proxy, method, args);
}
return null;
}

//实现接口的核心方法
public Object rpcInvoke(Object proxy, Method method, Object[] args){
//传输协议封装
InvokerProtocol msg = new InvokerProtocol();
msg.setClassName(this.clazz.getName());
msg.setMethodName(method.getName());
msg.setValues(args);
msg.setParams(method.getParameterTypes());

final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipelien.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Interger.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler", consumerHandler);
}
});
ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(msg).sync();
future.channel().closeFuture().sync();
} catch(Exception e){
e.printStackTrace();
} finally{
group.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object response;
public object getResponse(){
return response;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
response = msg;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is genneral");
}
}

完成客户端调用代码:

1
2
3
4
5
6
7
8
9
public class RpcConsumer{
IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);
System.out.println(rpcHello.hello("test"));
IRpcService service = RpcProxy.create(IRpcService.class);
System.out.println("8 + 2 = " + service.add(8, 2));
System.out.println("8 - 2 = " + service.sub(8, 2));
System.out.println("8 * 2 = " + service.mult(8, 2));
System.out.println("8 / 2 = " + service.div(8, 2));
}

Netty重构RPC
http://www.zivjie.cn/2023/07/16/网络通信/netty/Netty重构RPC/
作者
Francis
发布于
2023年7月16日
许可协议