From 6d9c3c206fc6d4a99cee97d2d5cf3cdd0b2eb552 Mon Sep 17 00:00:00 2001 From: icanci Date: Mon, 6 Feb 2023 20:38:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E7=AB=AF=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...er.java => RasClientRepositoryHolder.java} | 12 +- .../loadbalancing/LoadBalancingSupport.java | 2 +- .../ras/client/facade/RpcCallFacade.java | 6 +- .../server/NamedNettyServerHandler.java | 30 ++-- .../ras/client/server/RegisterServer.java | 4 +- .../ras/common/socket/SocketMessage.java | 7 + .../ras/common/socket/UriConstant.java | 28 +++- .../ras/common}/utils/RandomAddressUtils.java | 2 +- .../cache/RasServerRepositoryHolder.java | 139 +++++++++++++++++ .../cn/icanci/loopstack/ras/server/readme.txt | 26 ++++ .../server/NamedNettyServerHandler.java | 143 ++++++++++++++++++ .../ras/server/server/NamedServer.java | 114 ++++++++++++++ .../ras/server/server/RegisterServer.java | 137 +++++++++++++++++ 13 files changed, 614 insertions(+), 36 deletions(-) rename client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/{RasRepositoryHolder.java => RasClientRepositoryHolder.java} (95%) rename {client/src/main/java/cn/icanci/loopstack/ras/client => common/src/main/java/cn/icanci/loopstack/ras/common}/utils/RandomAddressUtils.java (93%) create mode 100644 server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java create mode 100644 server/src/main/java/cn/icanci/loopstack/ras/server/readme.txt create mode 100644 server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedNettyServerHandler.java create mode 100644 server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedServer.java create mode 100644 server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasRepositoryHolder.java b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasClientRepositoryHolder.java similarity index 95% rename from client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasRepositoryHolder.java rename to client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasClientRepositoryHolder.java index 3d17351..8775e24 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasRepositoryHolder.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/holder/RasClientRepositoryHolder.java @@ -13,7 +13,6 @@ import cn.icanci.loopstack.ras.client.cache.model.ClientApplicationValue; import cn.icanci.loopstack.ras.client.exception.ServerApplicationWrongfulException; import cn.icanci.loopstack.ras.client.exception.ServiceNotFoundException; import cn.icanci.loopstack.ras.client.server.NamedNettyServerHandler; -import cn.icanci.loopstack.ras.client.utils.RandomAddressUtils; import cn.icanci.loopstack.ras.common.enums.LoadBalanceTypeEnum; import cn.icanci.loopstack.ras.common.exception.ServerOfflineException; import cn.icanci.loopstack.ras.common.model.Application; @@ -21,6 +20,7 @@ import cn.icanci.loopstack.ras.common.model.Instance; import cn.icanci.loopstack.ras.common.socket.RasLoadRequestDTO; import cn.icanci.loopstack.ras.common.socket.RasRefreshDTO; import cn.icanci.loopstack.ras.common.socket.UriConstant; +import cn.icanci.loopstack.ras.common.utils.RandomAddressUtils; import io.netty.util.internal.ThrowableUtil; import java.util.List; @@ -53,14 +53,14 @@ import com.google.common.collect.Sets; */ @Service @Order(100) -public class RasRepositoryHolder extends MetaCacheHolder implements InitializingBean, ApplicationContextAware { +public class RasClientRepositoryHolder extends MetaCacheHolder implements InitializingBean, ApplicationContextAware { - private static final Logger logger = LoggerFactory.getLogger(RasRepositoryHolder.class); + private static final Logger logger = LoggerFactory.getLogger(RasClientRepositoryHolder.class); /** * http://{address}:port+UriConstant.ToClient.LOAD */ - private static final String LOAD_REQUEST_FORMAT = "http://%s:%s" + UriConstant.ToServer.LOAD; + private static final String LOAD_REQUEST_FORMAT = "http://%s:%s" + UriConstant.ClientToServer.LOAD; /** * 请求地址 */ @@ -121,11 +121,11 @@ public class RasRepositoryHolder extends MetaCacheHolder implements Initializing R call = CLIENT.call(rpcRequest, R.class); - logger.info("[RasRepositoryHolder][call] Load result:{}", JSONUtil.toJsonStr(call)); + logger.info("[RasClientRepositoryHolder][call] Load result:{}", JSONUtil.toJsonStr(call)); return JSONUtil.toBean(call.getData().get("response").toString(), RasRefreshDTO.class); } catch (Exception ex) { - logger.error("[RasRepositoryHolder][call] Ex error message:{}", ThrowableUtil.stackTraceToString(ex)); + logger.error("[RasClientRepositoryHolder][call] Ex error message:{}", ThrowableUtil.stackTraceToString(ex)); } } throw new ServerOfflineException("All Server IP are Requested, and All failed!!! Please Check your Config, The Server IPs are: " + serverIps); diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/LoadBalancingSupport.java b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/LoadBalancingSupport.java index 8bb3ad3..58c66fb 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/LoadBalancingSupport.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/cache/loadbalancing/LoadBalancingSupport.java @@ -104,6 +104,7 @@ public final class LoadBalancingSupport extends MetaCacheHolder { if (clientApplicationValue == null) { return; } + // TODO 这里多创建了一个对象 // 解析 ip 和 port String ip = parseIp(url); int port = parsePort(url); @@ -121,7 +122,6 @@ public final class LoadBalancingSupport extends MetaCacheHolder { } /** - * TODO 这里多创建了一个对象 * 获取端口号 * * @param href 网址 diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/facade/RpcCallFacade.java b/client/src/main/java/cn/icanci/loopstack/ras/client/facade/RpcCallFacade.java index e95abc7..a7d0ee9 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/facade/RpcCallFacade.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/facade/RpcCallFacade.java @@ -3,7 +3,7 @@ package cn.icanci.loopstack.ras.client.facade; import cn.hutool.http.Method; import cn.icanci.loopstack.api.client.Client; import cn.icanci.loopstack.api.client.http.HttpClientImpl; -import cn.icanci.loopstack.ras.client.cache.holder.RasRepositoryHolder; +import cn.icanci.loopstack.ras.client.cache.holder.RasClientRepositoryHolder; import cn.icanci.loopstack.ras.client.exception.RpcCallException; import java.util.Map; @@ -41,7 +41,7 @@ public class RpcCallFacade { private static final Client CLIENT = HttpClientImpl.getInstance(); @Resource - private RasRepositoryHolder rasRepositoryHolder; + private RasClientRepositoryHolder rasClientRepositoryHolder; /** * Get请求调用 @@ -156,7 +156,7 @@ public class RpcCallFacade { throw new RpcCallException("The relativePath is null! Please check your config!"); } - String prefixUrl = rasRepositoryHolder.routingRequestAddress(appId); + String prefixUrl = rasClientRepositoryHolder.routingRequestAddress(appId); return prefixUrl + relativePath; } diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedNettyServerHandler.java b/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedNettyServerHandler.java index c0dae89..dd09b40 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedNettyServerHandler.java +++ b/client/src/main/java/cn/icanci/loopstack/ras/client/server/NamedNettyServerHandler.java @@ -1,7 +1,7 @@ package cn.icanci.loopstack.ras.client.server; import cn.hutool.json.JSONUtil; -import cn.icanci.loopstack.ras.client.cache.holder.RasRepositoryHolder; +import cn.icanci.loopstack.ras.client.cache.holder.RasClientRepositoryHolder; import cn.icanci.loopstack.ras.common.socket.RasRefreshDTO; import cn.icanci.loopstack.ras.common.socket.SocketMessage; import cn.icanci.loopstack.ras.common.socket.UriConstant; @@ -28,18 +28,18 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("all") public class NamedNettyServerHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(NamedNettyServerHandler.class); + private static final Logger logger = LoggerFactory.getLogger(NamedNettyServerHandler.class); - private static RasRepositoryHolder rasRepositoryHolder; + private static RasClientRepositoryHolder rasClientRepositoryHolder; - private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); + private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); - private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(CORE_SIZE, // - CORE_SIZE << 1, // - 60L, // - TimeUnit.SECONDS, // - new LinkedBlockingQueue<>(2000), // - runnable -> new Thread(runnable, "RasServerThread Pool-" + runnable.hashCode()), // + private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(CORE_SIZE, // + CORE_SIZE << 1, // + 60L, // + TimeUnit.SECONDS, // + new LinkedBlockingQueue<>(2000), // + runnable -> new Thread(runnable, "RasServerThread Pool-" + runnable.hashCode()), // (r, executor) -> { throw new RuntimeException("RasServerThread Pool is EXHAUSTED!"); });; @@ -47,8 +47,8 @@ public class NamedNettyServerHandler extends SimpleChannelInboundHandler { return message; } + public static SocketMessage success(Object content) { + SocketMessage message = new SocketMessage(); + message.setSuccess(true); + message.setContent(content); + return message; + } + public boolean isSuccess() { return success; } diff --git a/common/src/main/java/cn/icanci/loopstack/ras/common/socket/UriConstant.java b/common/src/main/java/cn/icanci/loopstack/ras/common/socket/UriConstant.java index b8b9900..7a75e66 100644 --- a/common/src/main/java/cn/icanci/loopstack/ras/common/socket/UriConstant.java +++ b/common/src/main/java/cn/icanci/loopstack/ras/common/socket/UriConstant.java @@ -1,16 +1,14 @@ package cn.icanci.loopstack.ras.common.socket; /** - * TODO 发布的处理 - * * @author icanci * @since 1.0 Created in 2023/01/06 22:41 */ public interface UriConstant { /** - * ToClient + * ServerToClient */ - interface ToClient { + interface ServerToClient { /** 心跳 */ String HEARTBEAT = "/ras/register/heartbeat"; /** 刷新 */ @@ -18,13 +16,27 @@ public interface UriConstant { } /** - * ToServer + * ClientToServer */ - interface ToServer { + interface ClientToServer { /** 注册 */ - String REGISTER = "/ras/register/doRegister"; + String REGISTER = "/ras/register/doRegister"; /** 加载客户端和注册中心的信息 */ - String LOAD = "/ras/register/load"; + String LOAD = "/ras/register/load"; + } + + /** + * ServerToServer + */ + interface ServerToServer { + /** 服务和服务之间心跳 */ + String HEARTBEAT = "/ras/register/doHeartbeat"; + /** 服务端注册 */ + String REGISTER = "/ras/register/doServerRegister"; + /** 加载客户端和注册中心的信息 */ + String LOAD = "/ras/register/doLoad"; + /** 刷新客户端和注册中心的信息 */ + String REFRESH = "/ras/register/doRefresh"; /** 刷新服务的状态信息:上线、下线等 */ String TO_REFRESH_DELETE_STATUS = "/ras/register/refreshDeleteStatus"; } diff --git a/client/src/main/java/cn/icanci/loopstack/ras/client/utils/RandomAddressUtils.java b/common/src/main/java/cn/icanci/loopstack/ras/common/utils/RandomAddressUtils.java similarity index 93% rename from client/src/main/java/cn/icanci/loopstack/ras/client/utils/RandomAddressUtils.java rename to common/src/main/java/cn/icanci/loopstack/ras/common/utils/RandomAddressUtils.java index db0ceb5..c40680c 100644 --- a/client/src/main/java/cn/icanci/loopstack/ras/client/utils/RandomAddressUtils.java +++ b/common/src/main/java/cn/icanci/loopstack/ras/common/utils/RandomAddressUtils.java @@ -1,4 +1,4 @@ -package cn.icanci.loopstack.ras.client.utils; +package cn.icanci.loopstack.ras.common.utils; /** * @author icanci diff --git a/server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java b/server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java new file mode 100644 index 0000000..7eec165 --- /dev/null +++ b/server/src/main/java/cn/icanci/loopstack/ras/server/cache/RasServerRepositoryHolder.java @@ -0,0 +1,139 @@ +package cn.icanci.loopstack.ras.server.cache; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.collections4.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Service; + +import cn.icanci.loopstack.ras.common.enums.LoadBalanceTypeEnum; +import cn.icanci.loopstack.ras.common.model.Application; +import cn.icanci.loopstack.ras.common.model.Instance; +import cn.icanci.loopstack.ras.common.socket.RasRefreshDTO; +import cn.icanci.loopstack.ras.common.socket.RegisterDTO; +import cn.icanci.loopstack.ras.server.server.NamedNettyServerHandler; + +/** + * @author icanci + * @since 1.0 Created in 2023/02/05 12:40 + */ +@Service +public class RasServerRepositoryHolder implements InitializingBean { + + private static final Logger logger = LoggerFactory.getLogger(RasServerRepositoryHolder.class); + + /** + * 服务的本地缓存 + */ + private static final Set SERVER_APPLICATION_INSTANCE = new HashSet<>(); + /** + * 客户端的本地缓存 + */ + private static final Map CLIENT_APPLICATION_MAP = new ConcurrentHashMap<>(); + + @Override + public void afterPropertiesSet() throws Exception { + // 注入对象 + NamedNettyServerHandler.setRasRepositoryHolder(this); + } + + /** + * 向本地注册中心注册自己 + * + * @param hostIpAddress hostIpAddress + * @param clientPort clientPort + * @param appId appId + */ + public void fixServerRegister(String hostIpAddress, int clientPort, String appId) { + SERVER_APPLICATION_INSTANCE.add(buildInstance(hostIpAddress, clientPort, appId)); + } + + /** + * 构建Application + * + * @param appId appId + * @return 返回Application + */ + private Application buildApplication(String appId) { + Application application = new Application(); + application.setAppId(appId); + // 服务端不涉及负载均衡,此处赋值默认即可 + application.setLoadBalanceType(LoadBalanceTypeEnum.FIRST.getCode()); + application.setInstances(SERVER_APPLICATION_INSTANCE); + return application; + } + + /** + * 构建实例 + * + * @param hostIpAddress hostIpAddress + * @param clientPort clientPort + * @param appId appId + * @return 返回实例 + */ + private Instance buildInstance(String hostIpAddress, int clientPort, String appId) { + Instance instance = new Instance(); + instance.setAppId(appId); + instance.setAddress(hostIpAddress); + instance.setPort(clientPort); + instance.setCreateTime(new Date()); + instance.setUpdateTime(new Date()); + instance.setIsDelete(0); + instance.setOnline(0); + return instance; + } + + /** + * 刷新需要传递的数据信息 + * + * @param rasRefresh rasRefresh + */ + public void refresh(RasRefreshDTO rasRefresh) { + Application serverApplication = rasRefresh.getServerApplication(); + refreshServerApplication(serverApplication); + List clientApplications = rasRefresh.getClientApplications(); + refreshClientApplications(clientApplications); + } + + /** + * 刷新服务器信息 + * + * @param serverApplication serverApplication + */ + private void refreshServerApplication(Application serverApplication) { + if (serverApplication == null || CollectionUtils.isEmpty(serverApplication.getInstances())) { + return; + } + Set instances = serverApplication.getInstances(); + SERVER_APPLICATION_INSTANCE.addAll(instances); + } + + /** + * 刷新客户端信息 + * + * @param clientApplications clientApplications + */ + private void refreshClientApplications(List clientApplications) { + if (CollectionUtils.isEmpty(clientApplications)) { + return; + } + + } + + /** + * 刷新注册地址信息和加载数据 + * + * @param instance instance + * @return 返回当前服务器的所有信息 + */ + public RasRefreshDTO refreshAndLoad(Instance instance) { + return null; + } + + public RasRefreshDTO clientRegister(RegisterDTO toBean) { + return null; + } +} diff --git a/server/src/main/java/cn/icanci/loopstack/ras/server/readme.txt b/server/src/main/java/cn/icanci/loopstack/ras/server/readme.txt new file mode 100644 index 0000000..d813384 --- /dev/null +++ b/server/src/main/java/cn/icanci/loopstack/ras/server/readme.txt @@ -0,0 +1,26 @@ +- 服务端业务流程 + - 服务端启动,然后向目标ip地址和端口注册自己 + - 如果向所有的目标服务器注册失败,则向自己注册自己,此时获取自身的ip地址和自己的服务端口,存储在缓存中 + - 如果想目标服务器注册成功,那么根据注册的返回结果,刷新当前注册服务器的数据信息,包括服务信息和客户端信息 + - 我们这里暂时不考虑因为配置产生的服务孤岛、也暂不考虑服务端注册过程的数据不一致问题 + - 当其他服务端向当前服务端进行注册的时候,先检查数据是否是重复注册,如果重复注册,则进行数据更新;否则则刷新本地缓存,然后请求各个服务端和客户端 + - 刷新数据,然后将数据刷新之后的最新数据,同步给请求调用方,此处需要并行执行,超时时间为2秒,重试次数为1。 + - 上面的数据交互不太合理,暂定先不修正 + - 对于服务端和客户端的探活,本版本的设计,会每个服务端都会去探活,但是探活会有一个周期,在活动limit时间之内,不会再次触发探活,这样可以节省部分性能 +- API + - 服务端向服务端注册自己:注册返回的数据,需要刷新到内存 + - 客户端向服务端注册自己:注册成功之后,需要同步到其他服务端 + - 服务端向服务端发起探活:探活成功,刷新时间,否则暂时设置为离线状态,并且通知所有的服务端和客户端,如果超过指定的离线阈值,则丢弃 + 如果探活过程中发现服务端或者客户端离线,依旧执行上述流程,直到最终成功为止 + - 服务端向客户端发起探活:探活成功,刷新时间,如果在线状态变更,则需要进行通知所有的服务端和客户端,如果超过指定的离线阈值,则丢弃 + 如果探活过程中发现服务端或者客户端离线,依旧执行上述流程,直到最终成功为止 + - Admin向服务端发起变更:如果属于状态变更,则需要进行通知所有的服务端和客户端,如果超过指定的离线阈值,则丢弃 + 如果探活过程中发现服务端或者客户端离线,依旧执行上述流程,直到最终成功为止 + - 客户端向服务端注册请求:客户端向服务端注册请求,注册成功之后,返回对应的服务端和客户端信息,并且加载到缓存,并开始构建负载均衡数据 + 如果注册失败,则启动失败,需要抛出异常 + - 服务端向客户端发起变更:当服务端的数据发生变化的时候,需要向客户端发送数据变更数据,如果通知失败,则进行正向反馈,直到数据一致才终止 +- 小结 + - 从总体上来言,这种设计,在有限的时间和变更范围之内,是可以保证分布式数据的一致性的 + - 在数据的设计上,在进行发布请求的时候,需要Copy出一份数据进行请求,这是为了发生分布式服务死锁。然后拿到结果再正反馈给原始数据 + +- 可能存在的问题:集群场景下,数据的交互可能一直发生变更,为了数据的伪分布式一致性可能会导致集群整体性能下降,甚至是分布式应用场景下的分布式服务死锁 \ No newline at end of file diff --git a/server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedNettyServerHandler.java b/server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedNettyServerHandler.java new file mode 100644 index 0000000..42b7edb --- /dev/null +++ b/server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedNettyServerHandler.java @@ -0,0 +1,143 @@ +package cn.icanci.loopstack.ras.server.server; + +import cn.hutool.json.JSONUtil; +import cn.icanci.loopstack.ras.common.model.Instance; +import cn.icanci.loopstack.ras.common.socket.RasRefreshDTO; +import cn.icanci.loopstack.ras.common.socket.RegisterDTO; +import cn.icanci.loopstack.ras.common.socket.SocketMessage; +import cn.icanci.loopstack.ras.common.socket.UriConstant; +import cn.icanci.loopstack.ras.server.cache.RasServerRepositoryHolder; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.*; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; +import io.netty.util.internal.ThrowableUtil; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:49 + */ +@SuppressWarnings("all") +public class NamedNettyServerHandler extends SimpleChannelInboundHandler { + + private static final Logger logger = LoggerFactory.getLogger(NamedNettyServerHandler.class); + + private static RasServerRepositoryHolder rasServerRepositoryHolder; + + private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); + + private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(CORE_SIZE, // + CORE_SIZE << 1, // + 60L, // + TimeUnit.SECONDS, // + new LinkedBlockingQueue<>(2000), // + runnable -> new Thread(runnable, "RasServerThread Pool-" + runnable.hashCode()), // + (r, executor) -> { + throw new RuntimeException("RasServerThread Pool is EXHAUSTED!"); + });; + + public static void setRasRepositoryHolder(RasServerRepositoryHolder rasServerRepositoryHolder) { + NamedNettyServerHandler.rasServerRepositoryHolder = rasServerRepositoryHolder; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { + String requestData = msg.content().toString(CharsetUtil.UTF_8); + String uri = msg.uri(); + HttpMethod httpMethod = msg.method(); + boolean keepAlive = HttpUtil.isKeepAlive(msg); + // 对于这种配置数据,会有很频繁的变更 + POOL.execute(() -> { + Object responseObj = process(httpMethod, uri, requestData); + + String responseJson = JSONUtil.toJsonStr(responseObj); + + writeResponse(ctx, keepAlive, responseJson); + }); + } + + /** + * 后置处理 + * + * @param httpMethod httpMethod + * @param uri uri + * @param requestData requestData + * @return Object + */ + private Object process(HttpMethod httpMethod, String uri, String requestData) { + if (HttpMethod.POST != httpMethod) { + return SocketMessage.fail("Only post requests are supported"); + } + if (StringUtils.isBlank(uri)) { + return SocketMessage.fail("Request uri is null"); + } + try { + switch (uri) { + // 客户端向服务器注册自己 + case UriConstant.ClientToServer.REGISTER: + RasRefreshDTO respForClientRegister = rasServerRepositoryHolder.clientRegister(JSONUtil.toBean(requestData, RegisterDTO.class)); + logger.info("[{}][NamedNettyServerHandler][process] {} was refreshed!", Thread.currentThread().getName(), requestData); + return SocketMessage.success(respForClientRegister); + // 服务端向服务端心跳 + case UriConstant.ServerToServer.HEARTBEAT: + logger.info("[{}][NamedNettyServerHandler][process] heartbeat", Thread.currentThread().getName()); + return SocketMessage.success(); + // 服务端信息刷新,主要是针对客户端的信息刷新 + case UriConstant.ServerToServer.REFRESH: + rasServerRepositoryHolder.refresh(JSONUtil.toBean(requestData, RasRefreshDTO.class)); + logger.info("[{}][NamedNettyServerHandler][process] {} was refreshed!", Thread.currentThread().getName(), requestData); + return SocketMessage.success(); + // 服务端向服务端注册自己 + case UriConstant.ServerToServer.LOAD: + RasRefreshDTO respForServerLoad = rasServerRepositoryHolder.refreshAndLoad(JSONUtil.toBean(requestData, Instance.class)); + logger.info("[{}][NamedNettyServerHandler][process] {} was refreshed!", Thread.currentThread().getName(), requestData); + return SocketMessage.success(respForServerLoad); + default: + return SocketMessage.fail("Invalid request, uri-mapping(" + uri + ") not found"); + } + } catch (Throwable e) { + logger.error("[{}][NamedNettyServerHandler][process] ex,error msg:{}", e, Thread.currentThread().getName(), ThrowableUtil.stackTraceToString(e)); + return SocketMessage.fail(ThrowableUtil.stackTraceToString(e)); + } + } + + private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + if (keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } + ctx.writeAndFlush(response); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error(ThrowableUtil.stackTraceToString(cause)); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } +} diff --git a/server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedServer.java b/server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedServer.java new file mode 100644 index 0000000..0e1aa3c --- /dev/null +++ b/server/src/main/java/cn/icanci/loopstack/ras/server/server/NamedServer.java @@ -0,0 +1,114 @@ +package cn.icanci.loopstack.ras.server.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 注册中心的Server有向自己注册的功能 + * - 如果配置的注册中心的地址就是自己,则先进行ip进行请求 + * - 如果请求失败,则将配置的ip地址和port进行缓存,此时注册中心就只有自己 + * - 这种设计有一种弊端:如果每个服务都是注册自己,那么服务端就数据孤岛了。服务端集群则不存在 + * + * @author icanci + * @since 1.0 Created in 2023/01/01 09:55 + */ +@SuppressWarnings("all") +public class NamedServer { + + private static final Logger logger = LoggerFactory.getLogger(NamedServer.class); + + private static RegisterServer registerServer; + + public static void setRegisterService(RegisterServer registerServer) { + NamedServer.registerServer = registerServer; + } + + public static void startClient(String serverIps, int serverPort, int clientPort) { + // 启动时候注册 + startClient0(serverIps, serverPort, clientPort); + // 自动进行注册 + // Tips: 项目启动了,但是没有配置项目信息,此时注册失败,如果不自动注册,则需要进行重启才能注册。因此开启自助注册 + autoRegister(serverIps, serverPort, clientPort); + } + + private static void startClient0(String serverAddress, int serverPort, int clientPort) { + Thread rasThread = new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + ServerBootstrap bootstrap = new ServerBootstrap(); + + bootstrap.group(bossGroup, workerGroup) // + .channel(NioServerSocketChannel.class) // + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS)); + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(5 * 1024 * 1024)); + pipeline.addLast(new NamedNettyServerHandler()); + } + }).childOption(ChannelOption.SO_KEEPALIVE, true); + + try { + ChannelFuture future = bootstrap.bind(clientPort).sync(); + + doRegistry(serverAddress, serverPort, clientPort); + + future.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + logger.info("RAS remoting server interruptedException", e); + } catch (Exception e) { + logger.info("RAS remoting server error", e); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + + }); + rasThread.setDaemon(true); + rasThread.start(); + } + + /** + * 将SDK所在服务注册到注册中心 + * + * @param serverAddress 服务端ip地址 + * @param serverPort 服务端端口 + * @param clientPort 客户端端口 + */ + private static void doRegistry(String serverAddress, int serverPort, int clientPort) { + registerServer.register(serverAddress, serverPort, clientPort); + } + + /** + * 自动注册 + * + * @param serverAddress 服务端ip地址 + * @param serverPort 服务端端口 + * @param clientPort 客户端端口 + */ + private static void autoRegister(String serverAddress, int serverPort, int clientPort) { + Thread autoRegisterThread = new Thread(() -> { + // 每120秒刷新注册一次 + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(120)); + doRegistry(serverAddress, serverPort, clientPort); + }); + autoRegisterThread.setDaemon(true); + autoRegisterThread.start(); + } +} diff --git a/server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java b/server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java new file mode 100644 index 0000000..085be74 --- /dev/null +++ b/server/src/main/java/cn/icanci/loopstack/ras/server/server/RegisterServer.java @@ -0,0 +1,137 @@ +package cn.icanci.loopstack.ras.server.server; + +import cn.hutool.http.Method; +import cn.hutool.json.JSONUtil; +import cn.icanci.loopstack.api.client.Client; +import cn.icanci.loopstack.api.client.http.HttpClientImpl; +import cn.icanci.loopstack.lsi.common.result.R; +import cn.icanci.loopstack.ras.common.socket.RegisterDTO; +import cn.icanci.loopstack.ras.common.socket.UriConstant; +import cn.icanci.loopstack.ras.common.utils.RandomAddressUtils; +import cn.icanci.loopstack.ras.server.cache.RasServerRepositoryHolder; +import cn.icanci.loopstack.ras.server.properties.RasServerProperties; +import cn.icanci.loopstack.utils.IPUtils; + +import java.util.concurrent.*; + +import javax.annotation.Resource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Maps; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:30 + */ +@Service +@SuppressWarnings("all") +public class RegisterServer implements InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(RegisterServer.class); + /** http实例 */ + private static final Client CLIENT = HttpClientImpl.getInstance(); + @Resource + private RasServerProperties rasServerProperties; + @Resource + private RasServerRepositoryHolder rasServerRepositoryHolder; + + private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); + + private static final ThreadPoolExecutor REGISTER_POOL = new ThreadPoolExecutor(CORE_SIZE, // + CORE_SIZE << 1, // + 60L, // + TimeUnit.SECONDS, // + new LinkedBlockingQueue<>(2000), // + runnable -> new Thread(runnable, "RegisterServer Biz Pool-" + runnable.hashCode()), // + (r, executor) -> { + throw new RuntimeException("RegisterServer Biz Pool is EXHAUSTED!"); + }); + + /** 注册请求地址 */ + private static final String REQ_URL_FORMAT = "http://%s:%s%s"; + + /** + * 注册到注册中心 + * + * @param serverAddress 服务端ip地址 + * @param serverPort 服务端端口 + * @param clientPort 客户端端口 + */ + public void register(String serverAddress, int serverPort, int clientPort) { + String appId = rasServerProperties.getAppId(); + String[] addresses = serverAddress.split(","); + // 注册地址打散,防止压力在同一个机器上 + RandomAddressUtils.randomAddress(addresses); + // 执行注册 + for (String address : addresses) { + try { + FutureTask task = new FutureTask<>(new RegisterCallable(address, clientPort, appId, serverPort)); + REGISTER_POOL.execute(task); + R r = task.get(10, TimeUnit.SECONDS); + if (r.isOk()) { + return; + } else { + logger.error("Task Register Exception:{}", r.getMessage()); + } + } catch (ExecutionException | InterruptedException | TimeoutException e) { + logger.warn("Register Exception:{}", e.getMessage()); + } + } + // 如果全部失败了,则会走到这里,此时自身就是服务器 + fixServerRegister(clientPort); + // + } + + /** + * 注册自己 + * + * @param clientPort clientPort + */ + private void fixServerRegister(int clientPort) { + String hostIpAddress = IPUtils.getHostIpAddress(); + rasServerRepositoryHolder.fixServerRegister(hostIpAddress, clientPort, rasServerProperties.getAppId()); + } + + @Override + public void afterPropertiesSet() throws Exception { + + // 注入注册bean + NamedServer.setRegisterService(this); + + // 启动服务器 + NamedServer.startClient(rasServerProperties.getServerIps(), rasServerProperties.getServerPort(), rasServerProperties.getClientPort()); + } + + /** 注册器 */ + private static class RegisterCallable implements Callable { + + private final String address; + private final int clientPort; + private final String appId; + private final int serverPort; + + public RegisterCallable(String address, int clientPort, String appId, int serverPort) { + this.address = address; + this.clientPort = clientPort; + this.appId = appId; + this.serverPort = serverPort; + } + + @Override + public R call() throws Exception { + RegisterDTO registerDTO = new RegisterDTO(IPUtils.getHostIpAddress(), clientPort, appId); + String reqUrl = String.format(REQ_URL_FORMAT, address, serverPort, UriConstant.ServerToServer.REGISTER); + + Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, registerDTO, Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 3); + + R call = CLIENT.call(rpcRequest, R.class); + + logger.info("[RegisterCallable][call] Register result:{}", JSONUtil.toJsonStr(call)); + + return call; + } + } +} -- Gitee