P2C算法

概念:基于公平性考虑的P2C算法, 对于每次调用,从可用的服务列表中选择两个节点,然后选择一个“当前正在处理的连接数”较少的那个节点。

使用场景:服务器机器配置比较均匀下,高并发下快速选择负载较轻的服务器。

优点:公平性、均衡性

缺点:随机性,简单的P2C无法考虑服务器的实时负载情况和性能指标

实现思路:

1.获取可用的服务提供列表

2.选择两个随机的服务提供者

3.通过负载指标选择负载较轻的服务提供者

4.将请求路由到选中的服务提供者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public static void main(String[] args) {
List<Server> serverList = new ArrayList<>();
serverList.add(new Server(1,"服务器1","8080","127.0.0.1",90,0,0,"",2000));
serverList.add(new Server(2,"服务器2","8090","127.0.0.2",80,0,0,"",3000));
serverList.add(new Server(3,"服务器3","8088","127.0.0.3",70,0,0,"",3000));
serverList.add(new Server(4,"服务器4","8099","127.0.0.4",70,0,0,"",3000));
serverList.add(new Server(5,"服务器5","8070","127.0.0.5",80,0,0,"",3000));
serverList.add(new Server(6,"服务器6","8060","127.0.0.6",70,0,0,"",4000));
serverList.add(new Server(7,"服务器7","8050","127.0.0.7",80,0,0,"",5000));

AdaptiveLoadBalance adaptiveLoadBalance = new AdaptiveLoadBalance();

Invocation invocation = new Invocation();
for (int i = 0; i < 1000; i++) {
// 负载均衡策略的执行,即是在所有的Provider中选出一个,作为当前Consumer的远程调用对象
System.out.println(adaptiveLoadBalance.doSelect(serverList,invocation).toString());
}
Iterator iterator = adaptiveLoadBalance.adaptiveMetrics.metricsStatistics.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, AdaptiveMetrics> entry = (Map.Entry<String, AdaptiveMetrics>) iterator.next();
System.out.println("Key: "+entry.getKey()+" consumerReq: "+entry.getValue().consumerReq );
}
}

private Server selectByP2C(List<Server> invokers, Invocation invocation){
int length = invokers.size();
if(length == 1) {
return invokers.get(0);
}

if(length == 2) {
return chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation);
}
// 随机选择两个服务器
int pos1 = ThreadLocalRandom.current().nextInt(length);
int pos2 = ThreadLocalRandom.current().nextInt(length - 1);
if (pos2 >= pos1) {
pos2 = pos2 + 1;
}
// 动态计算两个服务器的负载情况,选择负载较轻的服务器
return chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation);
}

private Server chooseLowLoadInvoker(Server invoker1,Server invoker2,Invocation invocation){
int weight1 = invoker1.getWeight();
int weight2 = invoker2.getWeight();
int timeout1 = getTimeout(invoker1, invocation);
int timeout2 = getTimeout(invoker2, invocation);
long load1 = Double.doubleToLongBits(adaptiveMetrics.getLoad(getServiceKey(invoker1,invocation),weight1,timeout1 ));
long load2 = Double.doubleToLongBits(adaptiveMetrics.getLoad(getServiceKey(invoker2,invocation),weight2,timeout2 ));

// 负载相同的情况下
if (load1 == load2) {
// The sum of weights
int totalWeight = weight1 + weight2;
if (totalWeight > 0) {
// 根据权重随机选择
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
if (offset < weight1) {
return invoker1;
}
return invoker2;
}
// 默认权重为0时 随机选择
return ThreadLocalRandom.current().nextInt(2) == 0 ? invoker1 : invoker2;
}
// 根据负载情况选择服务器,负载低的 被选择。
return load1 > load2 ? invoker2 : invoker1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class AdaptiveMetrics {

public final ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>();
//
public long currentProviderTime = 0;
// 是在 ProfilerServerFilter 的 onResponse 方法中经过计算得到的 cpu load 依赖其他组件计算出来的 CPU负载
public double providerCPULoad = 0;
// 是在 setProviderMetrics 方法里面维护的,其中 lastLatency 是在 ProfilerServerFilter 的 onResponse 方法中经过计算得到的 rt 值
// /'leɪtənsɪ/
public long lastLatency = 0;
// 当前时间
public long currentTime = 0;

//Allow some time disorder
public long pickTime = System.currentTimeMillis();

public double beta = 0.5;
// 是总的调用次数
public final AtomicLong consumerReq = new AtomicLong();
// 是在每次调用成功后在 AdaptiveLoadBalanceFilter 的 onResponse 方法中维护的值。
public final AtomicLong consumerSuccess = new AtomicLong();
// 每次出现调用异常时,维护的值
public final AtomicLong errorReq = new AtomicLong();
// 这是一个公式算出来的值 Vt = β * Vt-1 + (1 - β ) * θt
// 指数加权移动平均值的控制图 (exponentially weighted moving average) , EWMA主要用于对网络的状态参数进行估计和平滑 , 负责得到 当前服务器的“平滑负载指标”
// 有两个特点,1.不需要保存过去所有数值,2.计算量显著减少
public double ewma = 0;

public double getLoad(String idKey, int weight, int timeout) {
AdaptiveMetrics metrics = getStatus(idKey);

//If the time more than 2 times, mandatory selected 如果超时时间超过 2次,则强制选择
if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
return 0;
}

if (metrics.currentTime > 0) {
long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1;
if (multiple > 0) {
if (metrics.currentProviderTime == metrics.currentTime) {
//penalty value
metrics.lastLatency = timeout * 2L;
} else {
metrics.lastLatency = metrics.lastLatency >> multiple;
}
metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency;
metrics.currentTime = System.currentTimeMillis();
}
}

long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get();
// Vt = β * Vt-1 + (1 - β ) * θt
// 服务器的CPU负载 乘以 ( (响应时间) * 当前正在处理的请求数 + 1) / ( 请求成功次数 ) / (( 请求次数 +1)*权重 +1)
return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double) metrics.consumerSuccess.get() / (double) (metrics.consumerReq.get() + 1)) * weight) + 1);
}

public AdaptiveMetrics getStatus(String idKey) {
return ConcurrentHashMapUtils.computeIfAbsent(metricsStatistics, idKey, k -> new AdaptiveMetrics());
}

public void addConsumerReq(String idKey) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.consumerReq.incrementAndGet();
}

public void addConsumerSuccess(String idKey) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.consumerSuccess.incrementAndGet();
}

public void addErrorReq(String idKey) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.errorReq.incrementAndGet();
}

public void setPickTime(String idKey, long time) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.pickTime = time;
}


public void setProviderMetrics(String idKey, Map<String, String> metricsMap) {

AdaptiveMetrics metrics = getStatus(idKey);

long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v, false)).orElse("0"));
//If server time is less than the current time, discard
if (metrics.currentProviderTime > serviceTime) {
return;
}

metrics.currentProviderTime = serviceTime;
metrics.currentTime = serviceTime;
metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v, true)).orElse("0"));
metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v, false)).orElse("0")));

metrics.beta = 0.5;
metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency;
}
}

P2C算法
http://www.zivjie.cn/2023/12/09/算法/负载均衡算法/P2C算法/
作者
Francis
发布于
2023年12月9日
许可协议