1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| package edu.algorithm.loadbalance;
import edu.algorithm.entity.NamedThreadFactory; import edu.algorithm.entity.RpcStatus; import edu.algorithm.entity.Server;
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean;
public class ShortestResponseLoadBalance {
public static final String NAME = "shortestresponse"; private int slidePeriod = 30000; private ConcurrentMap<String, SlideWindowData> methodMap = new ConcurrentHashMap(); private AtomicBoolean onResetSlideWindow = new AtomicBoolean(false); private volatile long lastUpdateTime = System.currentTimeMillis(); private ExecutorService executorService = Executors .newCachedThreadPool(new NamedThreadFactory("Dubbo-framework-shared-handler", true));
public static class SlideWindowData { private long succeededOffset; private long succeededElapsedOffset; public RpcStatus rpcStatus;
public SlideWindowData(RpcStatus rpcStatus) { this.rpcStatus = rpcStatus; this.succeededOffset = 0L; this.succeededElapsedOffset = 0L; }
public void reset() { this.succeededOffset = this.rpcStatus.getSucceeded(); this.succeededElapsedOffset = this.rpcStatus.getSucceededElapsed(); }
private long getSucceededAverageElapsed() { long succeed = this.rpcStatus.getSucceeded() - this.succeededOffset; return succeed == 0L ? 0L : (this.rpcStatus.getSucceededElapsed() - this.succeededElapsedOffset) / succeed; }
public long getEstimateResponse() { int active = this.rpcStatus.getActive() + 1; return this.getSucceededAverageElapsed() * (long)active; }
public String toString(){ return "SlideWindowData{" + "succeededOffset:" + this.succeededOffset + " " + "succeededElapsedOffset:" + this.succeededElapsedOffset + " " + "rpcStatus:" + this.rpcStatus.toString() + "}"; } }
public ShortestResponseLoadBalance() { }
protected Server select(List<Server> serverList) { int length = serverList.size(); long shortestResponse = Long.MAX_VALUE; int shortestCount = 0; int[] shortestIndexes = new int[length]; int[] weights = new int[length]; int totalWeight = 0; int firstWeight = 0; boolean sameWeight = true;
int offsetWeight; for(offsetWeight = 0; offsetWeight < length; ++offsetWeight) { Server server = serverList.get(offsetWeight); RpcStatus rpcStatus = RpcStatus.getStatus(server.getIp()); SlideWindowData slideWindowData = (SlideWindowData)this.methodMap.computeIfAbsent(server.getIp(), (key) -> { return new SlideWindowData(rpcStatus); }); long estimateResponse = slideWindowData.getEstimateResponse(); int afterWarmup = server.getWeight(); weights[offsetWeight] = afterWarmup; if (estimateResponse < shortestResponse) { shortestResponse = estimateResponse; shortestCount = 1; shortestIndexes[0] = offsetWeight; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; } else if (estimateResponse == shortestResponse) { shortestIndexes[shortestCount++] = offsetWeight; totalWeight += afterWarmup; if (sameWeight && offsetWeight > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } if (System.currentTimeMillis() - this.lastUpdateTime > (long)this.slidePeriod && this.onResetSlideWindow.compareAndSet(false, true)) { this.executorService.execute(() -> { this.methodMap.values().forEach(SlideWindowData::reset); this.lastUpdateTime = System.currentTimeMillis(); this.onResetSlideWindow.set(false); }); } if (shortestCount == 1) { return serverList.get(shortestIndexes[0]); } else { if (!sameWeight && totalWeight > 0) { offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for(int i = 0; i < shortestCount; ++i) { int shortestIndex = shortestIndexes[i]; offsetWeight -= weights[shortestIndex]; if (offsetWeight < 0) { return serverList.get(shortestIndex); } } } return serverList.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); } }
public static void main(String[] args) { List<Server> serverList = new ArrayList<>(); serverList.add(new Server(1,"服务器1","8080","127.0.0.1",90,60)); serverList.add(new Server(2,"服务器2","8080","127.0.0.2",80,80)); serverList.add(new Server(3,"服务器3","8080","127.0.0.3",70,90)); serverList.add(new Server(4,"服务器4","8080","127.0.0.4",60,60)); serverList.add(new Server(5,"服务器5","8080","127.0.0.5",50,80)); serverList.add(new Server(6,"服务器6","8080","127.0.0.6",40,60)); serverList.add(new Server(7,"服务器7","8080","127.0.0.7",30,80));
ShortestResponseLoadBalance loadBalance = new ShortestResponseLoadBalance(); for (int i = 0; i < 10; i++) { Server server = loadBalance.select(serverList); RpcStatus.beginCount(server.getIp()); RpcStatus.endCount(RpcStatus.getStatus(server.getIp()), server.getElapsed(),true); System.out.println("被选中的服务器:" + server.toString()); }
Iterator iterator = loadBalance.methodMap.entrySet().iterator();
while (iterator.hasNext()){ Map.Entry<String, SlideWindowData> entry = (Map.Entry<String, SlideWindowData>) iterator.next(); System.out.println("key" + entry.getKey() +",value :" + entry.getValue().toString()); } } }
|