From 955b7f5bb19f071c442844ec2034a0fb62cc8d8f Mon Sep 17 00:00:00 2001 From: icanci Date: Sat, 4 Feb 2023 22:46:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=80=E8=87=B4=E6=80=A7hash=E5=88=86?= =?UTF-8?q?=E7=A6=BB=E8=AF=B7=E6=B1=82=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ConsistencyHashLoadBalancing.java | 38 ++++++- .../ConsistencyHashLoadBalancingHolder.java | 104 ++++++++++++++++++ 2 files changed, 139 insertions(+), 3 deletions(-) create mode 100644 client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/model/ConsistencyHashLoadBalancingHolder.java diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/ConsistencyHashLoadBalancing.java b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/ConsistencyHashLoadBalancing.java index 208a18a..d9a56f3 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/ConsistencyHashLoadBalancing.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/ConsistencyHashLoadBalancing.java @@ -1,12 +1,21 @@ package cn.icanci.loopstack.ras.client.cache.loadbalancing; +import cn.icanci.loopstack.ras.client.cache.loadbalancing.model.ConsistencyHashLoadBalancingHolder; import cn.icanci.loopstack.ras.client.cache.loadbalancing.model.LoadBalancingHolder; +import cn.icanci.loopstack.ras.client.cache.model.ApplicationValue; +import cn.icanci.loopstack.ras.client.cache.model.ClientApplicationValue; import cn.icanci.loopstack.ras.common.enums.LoadBalanceTypeEnum; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Service; /** * 一致性哈希 + * - 参考xxl-job的实现 * * @author icanci * @since 1.0 Created in 2023/01/30 19:34 @@ -15,18 +24,41 @@ import org.springframework.stereotype.Service; @LoadBalancingBean(LoadBalanceTypeEnum.CONSISTENCY_HASH) public class ConsistencyHashLoadBalancing extends LoadBalancingCache implements LoadBalancing { + /** + * 数据缓存结果模型 + */ + private static Map HOLDER = new ConcurrentHashMap<>(); + @Override public void init() { - // + // 1.根据路由表,找到所有的指定路由方式 + if (isLbAppEmpty()) { + return; + } + Set applicationValues = filterLbType(LoadBalanceTypeEnum.CONSISTENCY_HASH); + + if (CollectionUtils.isEmpty(applicationValues)) { + return; + } + + Map tempMap = new ConcurrentHashMap<>(); + // 2.遍历应用表,构建缓存 + for (ClientApplicationValue applicationValue : applicationValues) { + String appId = applicationValue.getAppId(); + Set apps = applicationValue.getApplicationValues(); + tempMap.put(appId, new ConsistencyHashLoadBalancingHolder(apps)); + } + // 3.替换 + HOLDER = tempMap; } @Override public void refresh() { - + init(); } @Override protected LoadBalancingHolder getHolder(String appId) { - return null; + return HOLDER.get(appId); } } diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/model/ConsistencyHashLoadBalancingHolder.java b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/model/ConsistencyHashLoadBalancingHolder.java new file mode 100644 index 0000000..ee6e55a --- /dev/null +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/model/ConsistencyHashLoadBalancingHolder.java @@ -0,0 +1,104 @@ +package cn.icanci.loopstack.ras.client.cache.loadbalancing.model; + +import cn.icanci.loopstack.ras.client.cache.model.ApplicationValue; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import org.apache.commons.collections4.CollectionUtils; + +/** + * + * @author icanci + * @since 1.0 Created in 2023/02/04 17:44 + */ +public class ConsistencyHashLoadBalancingHolder extends LoadBalancingHolder { + /** + * 虚拟节点个数 + */ + private static final int VIRTUAL_NODE_NUM = 1000; + /** + * 应用列表 + */ + private final Set applications; + + public ConsistencyHashLoadBalancingHolder(Set applications) { + this.applications = applications; + setNext(); + } + + @Override + public void setNext() { + List applicationValues = applications.stream().filter(app -> app.getOnline() == 0 && app.getIsDelete() == 0).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(applicationValues)) { + this.next = toHash(applicationValues); + } else { + applicationValues = applications.stream().filter(app -> app.getOnline() == 0).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(applicationValues)) { + this.next = toHash(applicationValues); + } else { + applicationValues = applications.stream().filter(app -> app.getOnline() == 1).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(applicationValues)) { + this.next = toHash(applicationValues); + } + } + } + } + + /** + * 执行hash + * + * @param applicationValues applicationValues + * @return 返回hash值 + */ + public ApplicationValue toHash(List applicationValues) { + // ------A1------A2-------A3------ + // -----------J1------------------ + TreeMap addressRing = new TreeMap(); + for (ApplicationValue applicationValue : applicationValues) { + for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { + String address = applicationValue.getAddress() + applicationValue.getPort(); + long addressHash = hash("RAS-" + address + "-NODE-" + i); + addressRing.put(addressHash, applicationValue); + } + } + + // 任务线程id+当前毫秒 进行哈希 + long threadHash = hash(String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis())); + SortedMap lastRing = addressRing.tailMap(threadHash); + if (!lastRing.isEmpty()) { + // 拿到最近的一个 + return lastRing.get(lastRing.firstKey()); + } + return addressRing.firstEntry().getValue(); + } + + /** + * get hash code on 2^32 ring (md5散列的方式计算hash值) + * + * @param key key + * @return hash + */ + private static long hash(String key) { + MessageDigest md5; + try { + md5 = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("MD5 not supported", e); + } + md5.reset(); + byte[] keyBytes = null; + keyBytes = key.getBytes(StandardCharsets.UTF_8); + md5.update(keyBytes); + byte[] digest = md5.digest(); + long hashCode = ((long) (digest[3] & 0xFF) << 24) | ((long) (digest[2] & 0xFF) << 16) | ((long) (digest[1] & 0xFF) << 8) | (digest[0] & 0xFF); + return hashCode & 0xffffffffL; + } + +} -- Gitee