From a834ea03b76956ad2a95202eac46b53f18fc3839 Mon Sep 17 00:00:00 2001 From: icanci Date: Tue, 7 Feb 2023 20:26:16 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../holder/RasClientRepositoryHolder.java | 7 +- .../properties/RasClientProperties.java | 19 +- .../ras/client/server/NamedServer.java | 29 +- .../ras/client/server/RegisterServer.java | 39 +-- .../ras/common/socket/RasLoadRequestDTO.java | 15 +- .../ras/common/socket/RegisterDTO.java | 14 +- .../cache/RasServerRepositoryHolder.java | 28 +- .../ras/server/server/RegisterServer.java | 274 +++++++++--------- 8 files changed, 241 insertions(+), 184 deletions(-) diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasClientRepositoryHolder.java b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasClientRepositoryHolder.java index 8775e24..15864cb 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasClientRepositoryHolder.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasClientRepositoryHolder.java @@ -106,7 +106,10 @@ public class RasClientRepositoryHolder extends MetaCacheHolder implements Initia int serverPort = rasProperties.getServerPort(); int clientPort = rasProperties.getClientPort(); String appId = rasProperties.getAppId(); - + String loadBalance = rasProperties.getLoadBalance(); + if (LoadBalanceTypeEnum.getByCode(loadBalance) == null) { + loadBalance = LoadBalanceTypeEnum.FASTEST_CALL_SPEED.getCode(); + } String[] serverAddress = serverIps.split(","); RandomAddressUtils.randomAddress(serverAddress); @@ -115,7 +118,7 @@ public class RasClientRepositoryHolder extends MetaCacheHolder implements Initia try { String reqUrl = String.format(LOAD_REQUEST_FORMAT, address, serverPort); - RasLoadRequestDTO requestDTO = new RasLoadRequestDTO(appId, address, clientPort); + RasLoadRequestDTO requestDTO = new RasLoadRequestDTO(appId, address, clientPort, loadBalance); Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, requestDTO, Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 3); diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/properties/RasClientProperties.java b/client/src/main/java/cn/icanci/loopstack/ras/client/properties/RasClientProperties.java index 3fde0ff..a10720b 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/properties/RasClientProperties.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/properties/RasClientProperties.java @@ -18,15 +18,20 @@ public class RasClientProperties { /** * 客户端注册的port */ - private int clientPort = 11000; + private int clientPort = 11000; /** * 服务端ip,以,分隔 */ - private String serverIps = "127.0.0.1"; + private String serverIps = "127.0.0.1"; /** * 服务端port */ - private int serverPort = 9995; + private int serverPort = 9995; + /** + * 负载均衡算法 + * 默认算法为调用速度最快的算法 + */ + private String loadBalance = "FASTEST_CALL_SPEED"; public String getAppId() { return appId; @@ -59,4 +64,12 @@ public class RasClientProperties { public void setServerPort(int serverPort) { this.serverPort = serverPort; } + + public String getLoadBalance() { + return loadBalance; + } + + public void setLoadBalance(String loadBalance) { + this.loadBalance = loadBalance; + } } diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedServer.java b/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedServer.java index d42e6d8..81094d4 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedServer.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedServer.java @@ -1,5 +1,11 @@ package cn.icanci.loopstack.ras.client.server; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -9,12 +15,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.timeout.IdleStateHandler; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * @author icanci * @since 1.0 Created in 2023/01/01 09:55 @@ -30,15 +30,15 @@ public class NamedServer { NamedServer.registerServer = registerServer; } - public static void startClient(String serverIps, int serverPort, int clientPort) { + public static void startClient(String serverIps, int serverPort, int clientPort, String loadBalance) { // 启动时候注册 - startClient0(serverIps, serverPort, clientPort); + startClient0(serverIps, serverPort, clientPort, loadBalance); // 自动进行注册 // Tips: 项目启动了,但是没有配置项目信息,此时注册失败,如果不自动注册,则需要进行重启才能注册。因此开启自助注册 - autoRegister(serverIps, serverPort, clientPort); + autoRegister(serverIps, serverPort, clientPort, loadBalance); } - private static void startClient0(String serverAddress, int serverPort, int clientPort) { + private static void startClient0(String serverAddress, int serverPort, int clientPort, String loadBalance) { Thread rasThread = new Thread(() -> { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -85,9 +85,10 @@ public class NamedServer { * @param serverAddress 服务端ip地址 * @param serverPort 服务端端口 * @param clientPort 客户端端口 + * @param loadBalance 复杂均衡算法 */ - private static void doRegistry(String serverAddress, int serverPort, int clientPort) { - registerServer.register(serverAddress, serverPort, clientPort); + private static void doRegistry(String serverAddress, int serverPort, int clientPort, String loadBalance) { + registerServer.register(serverAddress, serverPort, clientPort, loadBalance); } /** @@ -97,11 +98,11 @@ public class NamedServer { * @param serverPort 服务端端口 * @param clientPort 客户端端口 */ - private static void autoRegister(String serverAddress, int serverPort, int clientPort) { + private static void autoRegister(String serverAddress, int serverPort, int clientPort, String loadBalance) { Thread autoRegisterThread = new Thread(() -> { // 每120秒刷新注册一次 LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(120)); - doRegistry(serverAddress, serverPort, clientPort); + doRegistry(serverAddress, serverPort, clientPort, loadBalance); }); autoRegisterThread.setDaemon(true); autoRegisterThread.start(); diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/server/RegisterServer.java b/client/src/main/java/cn/icanci/loopstack/ras/client/server/RegisterServer.java index 992e565..71c5715 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/server/RegisterServer.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/server/RegisterServer.java @@ -1,16 +1,5 @@ package cn.icanci.loopstack.ras.client.server; -import cn.hutool.http.Method; -import cn.hutool.json.JSONUtil; -import cn.icanci.loopstack.api.client.Client; -import cn.icanci.loopstack.api.client.http.HttpClientImpl; -import cn.icanci.loopstack.lsi.common.result.R; -import cn.icanci.loopstack.ras.client.properties.RasClientProperties; -import cn.icanci.loopstack.ras.common.utils.RandomAddressUtils; -import cn.icanci.loopstack.ras.common.socket.RegisterDTO; -import cn.icanci.loopstack.ras.common.socket.UriConstant; -import cn.icanci.loopstack.utils.IPUtils; - import java.util.concurrent.*; import javax.annotation.Resource; @@ -22,6 +11,17 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Maps; +import cn.hutool.http.Method; +import cn.hutool.json.JSONUtil; +import cn.icanci.loopstack.api.client.Client; +import cn.icanci.loopstack.api.client.http.HttpClientImpl; +import cn.icanci.loopstack.lsi.common.result.R; +import cn.icanci.loopstack.ras.client.properties.RasClientProperties; +import cn.icanci.loopstack.ras.common.socket.RegisterDTO; +import cn.icanci.loopstack.ras.common.socket.UriConstant; +import cn.icanci.loopstack.ras.common.utils.RandomAddressUtils; +import cn.icanci.loopstack.utils.IPUtils; + /** * @author icanci * @since 1.0 Created in 2023/01/06 22:30 @@ -31,9 +31,9 @@ import com.google.common.collect.Maps; public class RegisterServer implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(RegisterServer.class); /** http实例 */ - private static final Client CLIENT = HttpClientImpl.getInstance(); + private static final Client CLIENT = HttpClientImpl.getInstance(); @Resource - private RasClientProperties rasProperties; + private RasClientProperties rasProperties; private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); @@ -56,8 +56,9 @@ public class RegisterServer implements InitializingBean { * @param serverAddress 服务端ip地址 * @param serverPort 服务端端口 * @param clientPort 客户端端口 + * @param loadBalance 复杂均衡算法 */ - public void register(String serverAddress, int serverPort, int clientPort) { + public void register(String serverAddress, int serverPort, int clientPort, String loadBalance) { String appId = rasProperties.getAppId(); String[] addresses = serverAddress.split(","); // 注册地址打散,防止压力在同一个机器上 @@ -65,7 +66,7 @@ public class RegisterServer implements InitializingBean { // 执行注册 for (String address : addresses) { try { - FutureTask task = new FutureTask<>(new RegisterCallable(address, clientPort, appId, serverPort)); + FutureTask task = new FutureTask<>(new RegisterCallable(address, clientPort, appId, serverPort, loadBalance)); REGISTER_POOL.execute(task); R r = task.get(10, TimeUnit.SECONDS); if (r.isOk()) { @@ -86,7 +87,7 @@ public class RegisterServer implements InitializingBean { NamedServer.setRegisterService(this); // 启动服务器 - NamedServer.startClient(rasProperties.getServerIps(), rasProperties.getServerPort(), rasProperties.getClientPort()); + NamedServer.startClient(rasProperties.getServerIps(), rasProperties.getServerPort(), rasProperties.getClientPort(), rasProperties.getLoadBalance()); } /** 注册器 */ @@ -96,17 +97,19 @@ public class RegisterServer implements InitializingBean { private final int clientPort; private final String appId; private final int serverPort; + private final String loadBalance; - public RegisterCallable(String address, int clientPort, String appId, int serverPort) { + public RegisterCallable(String address, int clientPort, String appId, int serverPort, String loadBalance) { this.address = address; this.clientPort = clientPort; this.appId = appId; this.serverPort = serverPort; + this.loadBalance = loadBalance; } @Override public R call() throws Exception { - RegisterDTO registerDTO = new RegisterDTO(IPUtils.getHostIpAddress(), clientPort, appId); + RegisterDTO registerDTO = new RegisterDTO(IPUtils.getHostIpAddress(), clientPort, appId, loadBalance); String reqUrl = String.format(REQ_URL_FORMAT, address, serverPort, UriConstant.ClientToServer.REGISTER); Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, registerDTO, Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 3); diff --git a/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RasLoadRequestDTO.java b/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RasLoadRequestDTO.java index 3616eda..8ac3876 100644 --- a/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RasLoadRequestDTO.java +++ b/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RasLoadRequestDTO.java @@ -19,14 +19,19 @@ public class RasLoadRequestDTO implements Serializable { * 当前服务的服务端口 */ private int port; + /** + * 负载均衡算法 + */ + private String loadBalance; public RasLoadRequestDTO() { } - public RasLoadRequestDTO(String appId, String address, int port) { + public RasLoadRequestDTO(String appId, String address, int port, String loadBalance) { this.appId = appId; this.address = address; this.port = port; + this.loadBalance = loadBalance; } public String getAppId() { @@ -52,4 +57,12 @@ public class RasLoadRequestDTO implements Serializable { public void setPort(int port) { this.port = port; } + + public String getLoadBalance() { + return loadBalance; + } + + public void setLoadBalance(String loadBalance) { + this.loadBalance = loadBalance; + } } diff --git a/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RegisterDTO.java b/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RegisterDTO.java index 63fbf46..926f5bb 100644 --- a/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RegisterDTO.java +++ b/common/src/main/java/cn/icanci/loopstack/ras/common/socket/RegisterDTO.java @@ -21,14 +21,19 @@ public class RegisterDTO implements Serializable { * SDK 服务服务ID */ private String appId; + /** + * 负载均衡算法 + */ + private String loadBalance; public RegisterDTO() { } - public RegisterDTO(String clientAddress, int clientPort, String appId) { + public RegisterDTO(String clientAddress, int clientPort, String appId, String loadBalance) { this.clientAddress = clientAddress; this.clientPort = clientPort; this.appId = appId; + this.loadBalance = loadBalance; } public String getClientAddress() { @@ -55,4 +60,11 @@ public class RegisterDTO implements Serializable { this.appId = appId; } + public String getLoadBalance() { + return loadBalance; + } + + public void setLoadBalance(String loadBalance) { + this.loadBalance = loadBalance; + } } diff --git a/server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java b/server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java index 7eec165..dd1dae8 100644 --- a/server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java +++ b/server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java @@ -1,5 +1,12 @@ package cn.icanci.loopstack.ras.server.cache; +import cn.icanci.loopstack.ras.common.enums.LoadBalanceTypeEnum; +import cn.icanci.loopstack.ras.common.model.Application; +import cn.icanci.loopstack.ras.common.model.Instance; +import cn.icanci.loopstack.ras.common.socket.RasRefreshDTO; +import cn.icanci.loopstack.ras.common.socket.RegisterDTO; +import cn.icanci.loopstack.ras.server.server.NamedNettyServerHandler; + import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -9,13 +16,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; -import cn.icanci.loopstack.ras.common.enums.LoadBalanceTypeEnum; -import cn.icanci.loopstack.ras.common.model.Application; -import cn.icanci.loopstack.ras.common.model.Instance; -import cn.icanci.loopstack.ras.common.socket.RasRefreshDTO; -import cn.icanci.loopstack.ras.common.socket.RegisterDTO; -import cn.icanci.loopstack.ras.server.server.NamedNettyServerHandler; - /** * @author icanci * @since 1.0 Created in 2023/02/05 12:40 @@ -107,6 +107,7 @@ public class RasServerRepositoryHolder implements InitializingBean { if (serverApplication == null || CollectionUtils.isEmpty(serverApplication.getInstances())) { return; } + // Instance 重写了 equals 和 hashcode 方法 Set instances = serverApplication.getInstances(); SERVER_APPLICATION_INSTANCE.addAll(instances); } @@ -133,7 +134,18 @@ public class RasServerRepositoryHolder implements InitializingBean { return null; } - public RasRefreshDTO clientRegister(RegisterDTO toBean) { + /** + * 客户端向服务端注册 + * + * @param register register + * @return 返回注册表信息 + */ + public RasRefreshDTO clientRegister(RegisterDTO register) { + // 1.先注册服务 + + // 2.返回注册表信息 + RasRefreshDTO rasRefresh = new RasRefreshDTO(); + return null; } } diff --git a/server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java b/server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java index 085be74..a580931 100644 --- a/server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java +++ b/server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java @@ -1,137 +1,137 @@ -package cn.icanci.loopstack.ras.server.server; - -import cn.hutool.http.Method; -import cn.hutool.json.JSONUtil; -import cn.icanci.loopstack.api.client.Client; -import cn.icanci.loopstack.api.client.http.HttpClientImpl; -import cn.icanci.loopstack.lsi.common.result.R; -import cn.icanci.loopstack.ras.common.socket.RegisterDTO; -import cn.icanci.loopstack.ras.common.socket.UriConstant; -import cn.icanci.loopstack.ras.common.utils.RandomAddressUtils; -import cn.icanci.loopstack.ras.server.cache.RasServerRepositoryHolder; -import cn.icanci.loopstack.ras.server.properties.RasServerProperties; -import cn.icanci.loopstack.utils.IPUtils; - -import java.util.concurrent.*; - -import javax.annotation.Resource; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.stereotype.Service; - -import com.google.common.collect.Maps; - -/** - * @author icanci - * @since 1.0 Created in 2023/01/06 22:30 - */ -@Service -@SuppressWarnings("all") -public class RegisterServer implements InitializingBean { - private static final Logger logger = LoggerFactory.getLogger(RegisterServer.class); - /** http实例 */ - private static final Client CLIENT = HttpClientImpl.getInstance(); - @Resource - private RasServerProperties rasServerProperties; - @Resource - private RasServerRepositoryHolder rasServerRepositoryHolder; - - private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); - - private static final ThreadPoolExecutor REGISTER_POOL = new ThreadPoolExecutor(CORE_SIZE, // - CORE_SIZE << 1, // - 60L, // - TimeUnit.SECONDS, // - new LinkedBlockingQueue<>(2000), // - runnable -> new Thread(runnable, "RegisterServer Biz Pool-" + runnable.hashCode()), // - (r, executor) -> { - throw new RuntimeException("RegisterServer Biz Pool is EXHAUSTED!"); - }); - - /** 注册请求地址 */ - private static final String REQ_URL_FORMAT = "http://%s:%s%s"; - - /** - * 注册到注册中心 - * - * @param serverAddress 服务端ip地址 - * @param serverPort 服务端端口 - * @param clientPort 客户端端口 - */ - public void register(String serverAddress, int serverPort, int clientPort) { - String appId = rasServerProperties.getAppId(); - String[] addresses = serverAddress.split(","); - // 注册地址打散,防止压力在同一个机器上 - RandomAddressUtils.randomAddress(addresses); - // 执行注册 - for (String address : addresses) { - try { - FutureTask task = new FutureTask<>(new RegisterCallable(address, clientPort, appId, serverPort)); - REGISTER_POOL.execute(task); - R r = task.get(10, TimeUnit.SECONDS); - if (r.isOk()) { - return; - } else { - logger.error("Task Register Exception:{}", r.getMessage()); - } - } catch (ExecutionException | InterruptedException | TimeoutException e) { - logger.warn("Register Exception:{}", e.getMessage()); - } - } - // 如果全部失败了,则会走到这里,此时自身就是服务器 - fixServerRegister(clientPort); - // - } - - /** - * 注册自己 - * - * @param clientPort clientPort - */ - private void fixServerRegister(int clientPort) { - String hostIpAddress = IPUtils.getHostIpAddress(); - rasServerRepositoryHolder.fixServerRegister(hostIpAddress, clientPort, rasServerProperties.getAppId()); - } - - @Override - public void afterPropertiesSet() throws Exception { - - // 注入注册bean - NamedServer.setRegisterService(this); - - // 启动服务器 - NamedServer.startClient(rasServerProperties.getServerIps(), rasServerProperties.getServerPort(), rasServerProperties.getClientPort()); - } - - /** 注册器 */ - private static class RegisterCallable implements Callable { - - private final String address; - private final int clientPort; - private final String appId; - private final int serverPort; - - public RegisterCallable(String address, int clientPort, String appId, int serverPort) { - this.address = address; - this.clientPort = clientPort; - this.appId = appId; - this.serverPort = serverPort; - } - - @Override - public R call() throws Exception { - RegisterDTO registerDTO = new RegisterDTO(IPUtils.getHostIpAddress(), clientPort, appId); - String reqUrl = String.format(REQ_URL_FORMAT, address, serverPort, UriConstant.ServerToServer.REGISTER); - - Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, registerDTO, Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 3); - - R call = CLIENT.call(rpcRequest, R.class); - - logger.info("[RegisterCallable][call] Register result:{}", JSONUtil.toJsonStr(call)); - - return call; - } - } -} +//package cn.icanci.loopstack.ras.server.server; +// +//import cn.hutool.http.Method; +//import cn.hutool.json.JSONUtil; +//import cn.icanci.loopstack.api.client.Client; +//import cn.icanci.loopstack.api.client.http.HttpClientImpl; +//import cn.icanci.loopstack.lsi.common.result.R; +//import cn.icanci.loopstack.ras.common.socket.RegisterDTO; +//import cn.icanci.loopstack.ras.common.socket.UriConstant; +//import cn.icanci.loopstack.ras.common.utils.RandomAddressUtils; +//import cn.icanci.loopstack.ras.server.cache.RasServerRepositoryHolder; +//import cn.icanci.loopstack.ras.server.properties.RasServerProperties; +//import cn.icanci.loopstack.utils.IPUtils; +// +//import java.util.concurrent.*; +// +//import javax.annotation.Resource; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.beans.factory.InitializingBean; +//import org.springframework.stereotype.Service; +// +//import com.google.common.collect.Maps; +// +///** +// * @author icanci +// * @since 1.0 Created in 2023/01/06 22:30 +// */ +//@Service +//@SuppressWarnings("all") +//public class RegisterServer implements InitializingBean { +// private static final Logger logger = LoggerFactory.getLogger(RegisterServer.class); +// /** http实例 */ +// private static final Client CLIENT = HttpClientImpl.getInstance(); +// @Resource +// private RasServerProperties rasServerProperties; +// @Resource +// private RasServerRepositoryHolder rasServerRepositoryHolder; +// +// private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); +// +// private static final ThreadPoolExecutor REGISTER_POOL = new ThreadPoolExecutor(CORE_SIZE, // +// CORE_SIZE << 1, // +// 60L, // +// TimeUnit.SECONDS, // +// new LinkedBlockingQueue<>(2000), // +// runnable -> new Thread(runnable, "RegisterServer Biz Pool-" + runnable.hashCode()), // +// (r, executor) -> { +// throw new RuntimeException("RegisterServer Biz Pool is EXHAUSTED!"); +// }); +// +// /** 注册请求地址 */ +// private static final String REQ_URL_FORMAT = "http://%s:%s%s"; +// +// /** +// * 注册到注册中心 +// * +// * @param serverAddress 服务端ip地址 +// * @param serverPort 服务端端口 +// * @param clientPort 客户端端口 +// */ +// public void register(String serverAddress, int serverPort, int clientPort) { +// String appId = rasServerProperties.getAppId(); +// String[] addresses = serverAddress.split(","); +// // 注册地址打散,防止压力在同一个机器上 +// RandomAddressUtils.randomAddress(addresses); +// // 执行注册 +// for (String address : addresses) { +// try { +// FutureTask task = new FutureTask<>(new RegisterCallable(address, clientPort, appId, serverPort)); +// REGISTER_POOL.execute(task); +// R r = task.get(10, TimeUnit.SECONDS); +// if (r.isOk()) { +// return; +// } else { +// logger.error("Task Register Exception:{}", r.getMessage()); +// } +// } catch (ExecutionException | InterruptedException | TimeoutException e) { +// logger.warn("Register Exception:{}", e.getMessage()); +// } +// } +// // 如果全部失败了,则会走到这里,此时自身就是服务器 +// fixServerRegister(clientPort); +// // +// } +// +// /** +// * 注册自己 +// * +// * @param clientPort clientPort +// */ +// private void fixServerRegister(int clientPort) { +// String hostIpAddress = IPUtils.getHostIpAddress(); +// rasServerRepositoryHolder.fixServerRegister(hostIpAddress, clientPort, rasServerProperties.getAppId()); +// } +// +// @Override +// public void afterPropertiesSet() throws Exception { +// +// // 注入注册bean +// NamedServer.setRegisterService(this); +// +// // 启动服务器 +// NamedServer.startClient(rasServerProperties.getServerIps(), rasServerProperties.getServerPort(), rasServerProperties.getClientPort()); +// } +// +// /** 注册器 */ +// private static class RegisterCallable implements Callable { +// +// private final String address; +// private final int clientPort; +// private final String appId; +// private final int serverPort; +// +// public RegisterCallable(String address, int clientPort, String appId, int serverPort) { +// this.address = address; +// this.clientPort = clientPort; +// this.appId = appId; +// this.serverPort = serverPort; +// } +// +// @Override +// public R call() throws Exception { +// RegisterDTO registerDTO = new RegisterDTO(IPUtils.getHostIpAddress(), clientPort, appId); +// String reqUrl = String.format(REQ_URL_FORMAT, address, serverPort, UriConstant.ServerToServer.REGISTER); +// +// Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, registerDTO, Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 3); +// +// R call = CLIENT.call(rpcRequest, R.class); +// +// logger.info("[RegisterCallable][call] Register result:{}", JSONUtil.toJsonStr(call)); +// +// return call; +// } +// } +//} -- Gitee