diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..ed086eeaa4f1a80fbbd2dc7363fb347a53367b9c --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.idea +*.iml +/target +/store +/store2 +/zbus-dist/store +/zbus-dist/nohup.out +/mq diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 0000000000000000000000000000000000000000..6a5ffb073acd0069320e4056375ebb10d3e3ccd6 --- /dev/null +++ b/ChangeLog @@ -0,0 +1,17 @@ +zbus-6.2.0 发布,整个项目模块化重构,改进内容: +1. MQ默认直接采用持久化。MQ速度在SSD磁盘上可达200M+/s,数十亿级消息堆积测试(100G+磁盘) +2. RPC支持分为直接RPC与基于MQ的RPC,直接RPC性能优于基于MQ的RPC。 +3. Broker独立模型重构,支持HA高可用,不只面向MQ工作,可以方便支持直接RPC的负载均衡。 +4. 添加Thrfit协议接入(RPC),整体上支持HTTP短链接,Extend HTTP的TCP长连接,Thrift客户端接入。 +5. 消息通讯基础zbus.NET保持小、模块化(~50K),方便个性化协议扩展 +6. 添加支持普通TCP代理,DMZ网络安全结构的的TCP代理 +7. 提供大量示例,包括MQ,PubSub,RPC,Proxy,Thrift,Simple HTTP等。 + + +zbus-6.3.0 changelog +1. fix net.Client close, disable disconnected handler when close actively. +2. "content-encoding" in HTTP changed to "encoding" (extend HTTP) for + compatibility to standard HTTP when using browser. +3. 增加支持不指定随机端口启动Server +4. 修复Rpc客户端运行中失败引起Sevice响应实例清除的bug +5. zbus重启,service快速注册返回,InvokingClient增加Sync清除 \ No newline at end of file diff --git a/Readme.md b/README.md similarity index 94% rename from Readme.md rename to README.md index 3c3054a8ddd9e82917c799e6910db6a1449009b2..9bfa560eec228451994bbcaecadbfd65787f0d14 100644 --- a/Readme.md +++ b/README.md @@ -4,13 +4,13 @@ ##ZBUS = MQ + RPC + PROXY -* **支持消息队列, 发布订阅, RPC, 代理(TCP/HTTP/DMZ)** +* **支持消息队列, 发布订阅, RPC, 代理(TCP/DMZ)** * **亿级消息堆积能力、支持HA高可用** * **单个Jar包无依赖 ~300K** * **服务代理 -- 适配改造已有业务系统,使之具备跨平台与语言** * **丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入** -## QQ群: 467741880 +## QQ群: 467741880, 如果喜欢该项目,请右上角star,让更多人参与改进 @@ -70,7 +70,7 @@ ZBUS项目不依赖其他第三方库,消息通讯基于NIO完成(NET子项 org.zbus zbus - 6.2.0 + 6.2.6 ### 生产者 diff --git a/ToDo b/ToDo new file mode 100644 index 0000000000000000000000000000000000000000..4dafb2cfa29a24fa577ce78330ffb58e0a96af47 --- /dev/null +++ b/ToDo @@ -0,0 +1,37 @@ +支持消息队列复制(不同业务维度消费同一个消息) + +Index与Block分离 +MyMQ.idx +readNum +readPos +readCount + +MyMQ.idx +MyMQ.fork + + + +sub-index-id,二级指针 + +支持zbus内部事件消息通知PubSub + +增加直接返回网络测试 : +http://localhost:15555/test + +增加RPC请求的简单URL解释支持: +http://localhost:15555/MyRpc/echo?hong + +增加监控单个服务的详细信息 +http://localhost:15555/MyRpc + +/MyRpc/plus?1,2 +/MyRpc/plus?params=1,2&&module=xxx +/mq=MyRpc&&method=plus&¶ms=1,2 + + +/ +/?cmd=admin&&sub_cmd=data +/?cmd=admin&&sub_cmd=jquery +/?cmd=test + + diff --git a/doc/IoAdaptor.png b/doc/IoAdaptor.png new file mode 100644 index 0000000000000000000000000000000000000000..229374cb6c6e5ee85a068c3803359e8c54375656 Binary files /dev/null and b/doc/IoAdaptor.png differ diff --git a/doc/tcp_proxy_example.png b/doc/tcp_proxy_example.png new file mode 100644 index 0000000000000000000000000000000000000000..7ee821e5472a074e4eca104d65f209a9ab60ced2 Binary files /dev/null and b/doc/tcp_proxy_example.png differ diff --git "a/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\270\200\357\274\211.doc" "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\270\200\357\274\211.doc" new file mode 100644 index 0000000000000000000000000000000000000000..bd8fe00cdd3cbf4775c032360cd1403391c0cb53 Binary files /dev/null and "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\270\200\357\274\211.doc" differ diff --git "a/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\272\214\357\274\211.doc" "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\272\214\357\274\211.doc" new file mode 100644 index 0000000000000000000000000000000000000000..975a3790d45ef0a19bcdc75be5883f0149e88b10 Binary files /dev/null and "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\272\214\357\274\211.doc" differ diff --git a/license.txt b/license.txt deleted file mode 100644 index b047275612d06b86b318eb4c63cce981e7fd8e68..0000000000000000000000000000000000000000 --- a/license.txt +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2009-2015 HONG LEIMING - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 53bffe193d67301ca60112ca4c3377bb4368cf1d..eda596928e7a96383e14c433475e4f312c3b41f9 100644 --- a/pom.xml +++ b/pom.xml @@ -1,177 +1,223 @@ - - 4.0.0 - jar - org.zbus - zbus - 6.2.0 - zbus - lightweight MQ, RPC, PROXY - https://zbus.org - - - - - MIT License - http://www.opensource.org/licenses/mit-license.php - repo - - - - - - rushmore - 44194462@qq.com - - - - - scm:git@git.oschina.net:rushmore/zbus.git - scm:git@git.oschina.net:rushmore/zbus.git - git@git.oschina.net:rushmore/zbus.git - - - - UTF-8 - - - - - oss - https://oss.sonatype.org/content/repositories/snapshots/ - - - - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + 4.0.0 + com.jingcai.apps + zbus + 1.1-RELEASE + jar + zbus + lightweight MQ, RPC, PROXY + https://zbus.org + + + + + MIT License + http://www.opensource.org/licenses/mit-license.php + repo + + + + + scm:git:git@192.168.0.20:java/zbus.git + scm:git:git@192.168.0.20:java/zbus.git + http://192.168.0.20/java/zbus + + + + UTF-8 + + + + + + + + org.apache.commons + commons-pool2 + 2.2 + provided + + + + com.alibaba + fastjson + 1.2.3 + provided + + - org.apache.commons - commons-pool2 - 2.2 + org.slf4j + slf4j-api + 1.7.5 provided + - com.alibaba - fastjson - 1.2.3 + org.slf4j + slf4j-log4j12 + 1.7.5 provided - + log4j log4j - 1.2.17 + 1.2.16 provided - - org.springframework - spring - 2.5.6 - true - test - - - org.apache.httpcomponents - httpclient - 4.4 - test - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - 1.6 - 1.6 - UTF8 - - - - - - - - - release - - - - src/main/resources - - *.properties - *.xml - - false - - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.2.1 - - - package - - jar-no-fork - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.9.1 - - - package - - jar - - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.5 - - - verify - - sign - - - - - - - - - oss - https://oss.sonatype.org/content/repositories/snapshots/ - - - oss - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - - - + + + org.springframework + spring-context + 3.2.8.RELEASE + true + test + + + org.apache.httpcomponents + httpclient + 4.4 + test + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.6 + 1.6 + UTF-8 + + + + + + + + + release + + + + src/main/resources + + *.properties + *.xml + + false + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + package + + jar-no-fork + + + + + + + + + + + dajiang_snapshots + http://djip:8081/content/repositories/dajiang_snapshots/ + + + dajiang_release + http://djip:8081/content/repositories/dajiang_release/ + + + + diff --git a/src/main/java/org/zbus/broker/BrokerConfig.java b/src/main/java/org/zbus/broker/BrokerConfig.java index 77b9334ea25ed862fa29f6cb7c5bec8602f4f1e6..4c9947baaa1d59d5210bb5faf6bbbdb722f0d3b4 100644 --- a/src/main/java/org/zbus/broker/BrokerConfig.java +++ b/src/main/java/org/zbus/broker/BrokerConfig.java @@ -50,8 +50,8 @@ import org.zbus.net.core.Dispatcher; public class BrokerConfig extends PoolConfig{ private String trackServerList = "127.0.0.1:16666"; //只在HA模式下才有效 private String serverAddress = "127.0.0.1:15555"; - private int selectorCount = 1; - private int executorCount = 64; + private int selectorCount = 0; //0代表使用默认值 + private int executorCount = 0; //0代表使用默认值 /** * 可选项 * 如果配置不给出,Dispatcher内部生成,并自己管理关闭 diff --git a/src/main/java/org/zbus/broker/SingleBroker.java b/src/main/java/org/zbus/broker/SingleBroker.java index 50bf12ab9cc4803d438725d8f1b224abea39abc9..52a23b1e51887a1882bacb86de6b526b94d79102 100644 --- a/src/main/java/org/zbus/broker/SingleBroker.java +++ b/src/main/java/org/zbus/broker/SingleBroker.java @@ -49,6 +49,8 @@ public class SingleBroker implements Broker { if(config.getDispatcher() == null){ this.ownDispatcher = true; this.dispatcher = new Dispatcher(); + this.dispatcher.selectorCount(config.getSelectorCount()); + this.dispatcher.executorCount(config.getExecutorCount()); this.config.setDispatcher(dispatcher); } else { this.dispatcher = config.getDispatcher(); diff --git a/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java b/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java index c306f066a2189ee14b2c8879f4a3d87c36b4610a..c50f7308b82455d63ff06fe53f8159b78963ac4c 100644 --- a/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java +++ b/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java @@ -166,9 +166,13 @@ public class DefaultBrokerSelector implements BrokerSelector{ if(se.consumerCount > 0) activeCount++; } if(activeCount == 0){ - activeCount = allBrokers.size(); + activeCount = serverList.consumerFirstList.size(); } - int idx = localIpHashCode%activeCount; + if(activeCount == 0){ + return null; + } + + int idx = localIpHashCode%activeCount; ServerEntry se = serverList.consumerFirstList.get(idx); if(se != null){ broker = getBroker(se.serverAddr); diff --git a/src/main/java/org/zbus/broker/ha/ServerEntryTable.java b/src/main/java/org/zbus/broker/ha/ServerEntryTable.java index 40710a6db9b305382bed945a026e8a5f9438232a..2b3a3620b5f7ff4fd58e0d2d2ae634ea3243fe32 100644 --- a/src/main/java/org/zbus/broker/ha/ServerEntryTable.java +++ b/src/main/java/org/zbus/broker/ha/ServerEntryTable.java @@ -207,10 +207,20 @@ public class ServerEntryTable implements Closeable{ public void setVerbose(boolean verbose) { this.verbose = verbose; + } + + public Map getEntry2ServerList() { + return entry2ServerList; + } + + public Map> getServer2EntryList() { + return server2EntryList; } + + public static class ServerList implements Iterable{ public final String entryId; public Map serverTable = new ConcurrentHashMap(); diff --git a/src/main/java/org/zbus/broker/ha/TrackSub.java b/src/main/java/org/zbus/broker/ha/TrackSub.java index c0b0a0426782ccf4bb930a3a3bd73c8be6693fcf..0970422bcdccb97770100c184fa3b5f0454b931b 100644 --- a/src/main/java/org/zbus/broker/ha/TrackSub.java +++ b/src/main/java/org/zbus/broker/ha/TrackSub.java @@ -193,7 +193,7 @@ public class TrackSub implements Closeable{ @Override public void close() throws IOException { - for(MessageClient client : allTrackers){ + for(MessageClient client : allTrackers){ client.close(); } allTrackers.clear(); diff --git a/src/main/java/org/zbus/kit/NetKit.java b/src/main/java/org/zbus/kit/NetKit.java index 2fb4f578a2e98637090affdda152e5acb023500c..6979e61dcdd9a505554aafb81a4ae86ec238073f 100644 --- a/src/main/java/org/zbus/kit/NetKit.java +++ b/src/main/java/org/zbus/kit/NetKit.java @@ -41,9 +41,7 @@ public class NetKit { int port = 80; if(blocks.length > 1){ port = Integer.valueOf(blocks[1]); - } else { - address += ":"+port; //use default 80 - } + } String serverAddr = String.format("%s:%d", host, port); return serverAddr; } @@ -62,30 +60,70 @@ public class NetKit { return address; } - public static String getLocalIp() { + private static int matchedIndex(String ip, String[] prefix){ + for(int i=0; i ]+"); try { - Pattern pattern = Pattern.compile("(192|172|10)\\.[0-9]+\\.[0-9]+\\.[0-9]+"); + Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+"); Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); - + String matchedIp = null; + int matchedIdx = -1; while (interfaces.hasMoreElements()) { NetworkInterface ni = interfaces.nextElement(); - Enumeration en = ni.getInetAddresses(); + Enumeration en = ni.getInetAddresses(); while (en.hasMoreElements()) { InetAddress addr = en.nextElement(); - String ip = addr.getHostAddress(); + String ip = addr.getHostAddress(); Matcher matcher = pattern.matcher(ip); - if (matcher.matches()) { - return ip; - } - } + if (matcher.matches()) { + int idx = matchedIndex(ip, prefix); + if(idx == -1) continue; + if(matchedIdx == -1){ + matchedIdx = idx; + matchedIp = ip; + } else { + if(matchedIdx>idx){ + matchedIdx = idx; + matchedIp = ip; + } + } + } + } } - return "0.0.0.0"; - } catch (Throwable e) { - e.printStackTrace(); - return "0.0.0.0"; + if(matchedIp != null) return matchedIp; + return "127.0.0.1"; + } catch (Throwable e) { + return "127.0.0.1"; } } - + + public static String getLocalIp() { + return getLocalIp("*>10>172>192>127"); + } + public static String remoteAddress(SocketChannel channel){ SocketAddress addr = channel.socket().getRemoteSocketAddress(); String res = String.format("%s", addr); @@ -97,4 +135,10 @@ public class NetKit { String res = String.format("%s", addr); return addr==null? res: res.substring(1); } + + public static void main(String[] args){ + String ip = getLocalIp("*>192>10"); + System.out.println(ip); + System.out.println(getLocalIp()); + } } diff --git a/src/main/java/org/zbus/kit/log/Logger.java b/src/main/java/org/zbus/kit/log/Logger.java index 7d04baad351a874347951864631a21baba3685f1..63271c9d55526fe727299b00be1003f8d6d214c5 100644 --- a/src/main/java/org/zbus/kit/log/Logger.java +++ b/src/main/java/org/zbus/kit/log/Logger.java @@ -49,13 +49,28 @@ public abstract class Logger { if (factory != null){ return ; } - String defaultFactory = String.format("%s.impl.Log4jLoggerFactory", Logger.class.getPackage().getName()); + try { //default to Log4j Class.forName("org.apache.log4j.Logger"); + String defaultFactory = String.format("%s.impl.Log4jLoggerFactory", Logger.class.getPackage().getName()); Class factoryClass = Class.forName(defaultFactory); factory = (LoggerFactory)factoryClass.newInstance(); + return; + } catch (Exception e) { + } + + try { + //try slf4j + Class.forName("org.slf4j.Logger"); + String defaultFactory = String.format("%s.impl.Sl4jLoggerFactory", Logger.class.getPackage().getName()); + Class factoryClass = Class.forName(defaultFactory); + factory = (LoggerFactory)factoryClass.newInstance(); + return; } catch (Exception e) { + } + + if(factory == null){ factory = new JdkLoggerFactory(); } } diff --git a/src/main/java/org/zbus/kit/log/impl/Sl4jLoggerFactory.java b/src/main/java/org/zbus/kit/log/impl/Sl4jLoggerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..4751585c0d8ce7cf168ca9255a2c9cd486eebfec --- /dev/null +++ b/src/main/java/org/zbus/kit/log/impl/Sl4jLoggerFactory.java @@ -0,0 +1,137 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2009-2015 HONG LEIMING + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package org.zbus.kit.log.impl; + + +import org.slf4j.spi.LocationAwareLogger; +import org.zbus.kit.log.Logger; +import org.zbus.kit.log.LoggerFactory; + +public class Sl4jLoggerFactory implements LoggerFactory { + + public Logger getLogger(Class clazz) { + return new Sl4jLogger(clazz); + } + + public Logger getLogger(String name) { + return new Sl4jLogger(name); + } + + public static void main(String[] args){ + Logger log = new Sl4jLogger(Sl4jLoggerFactory.class); + log.info("test"); + } +} + +class Sl4jLogger extends Logger { + private org.slf4j.Logger log; + private final String FQCN = Sl4jLogger.class.getName(); + + Sl4jLogger(Class clazz) { + log = org.slf4j.LoggerFactory.getLogger(clazz); + } + + Sl4jLogger(String name) { + log = org.slf4j.LoggerFactory.getLogger(name); + } + + public void info(String message) { + info(message, (Throwable)null); + } + + public void info(String message, Throwable t) { + if (log instanceof LocationAwareLogger) { + ((LocationAwareLogger) log).log(null, FQCN, + LocationAwareLogger.INFO_INT, message, null, t); + } else { + log.info(message, t); + } + } + + public void debug(String message) { + debug(message, (Throwable)null); + } + + public void debug(String message, Throwable t) { + if (log instanceof LocationAwareLogger) { + ((LocationAwareLogger) log).log(null, FQCN, + LocationAwareLogger.DEBUG_INT, message, null, t); + } else { + log.debug(message, t); + } + } + + public void warn(String message) { + warn(message, (Throwable)null); + } + + public void warn(String message, Throwable t) { + if (log instanceof LocationAwareLogger) { + ((LocationAwareLogger) log).log(null, FQCN, + LocationAwareLogger.WARN_INT, message, null, t); + } else { + log.warn(message); + } + } + + public void error(String message) { + error(message, (Throwable)null); + } + + public void error(String message, Throwable t) { + if (log instanceof LocationAwareLogger) { + ((LocationAwareLogger) log).log(null, FQCN, + LocationAwareLogger.ERROR_INT, message, null, t); + } else { + log.error(message); + } + } + + public void fatal(String message) { + error(message); + } + + public void fatal(String message, Throwable t) { + error(message, t); + } + + public boolean isDebugEnabled() { + return log.isDebugEnabled(); + } + + public boolean isInfoEnabled() { + return log.isInfoEnabled(); + } + + public boolean isWarnEnabled() { + return log.isWarnEnabled(); + } + + public boolean isErrorEnabled() { + return log.isErrorEnabled(); + } + + public boolean isFatalEnabled() { + return log.isErrorEnabled(); + } +} \ No newline at end of file diff --git a/src/main/java/org/zbus/kit/pool/PoolConfig.java b/src/main/java/org/zbus/kit/pool/PoolConfig.java index 0a3c5548564011bfe1823a2f73439403aee20d0b..3d13f79904bed0375740be85de402dd75243eaf1 100644 --- a/src/main/java/org/zbus/kit/pool/PoolConfig.java +++ b/src/main/java/org/zbus/kit/pool/PoolConfig.java @@ -23,9 +23,9 @@ package org.zbus.kit.pool; public class PoolConfig implements Cloneable{ - private int maxTotal = 8; - private int maxIdle = 8; - private int minIdle = 0; + private int maxTotal = 64; + private int maxIdle = 64; + private int minIdle = 64; private long minEvictableIdleTimeMillis = 1000L * 60L * 30L; private Object support; diff --git a/src/main/java/org/zbus/mq/Consumer.java b/src/main/java/org/zbus/mq/Consumer.java index 8221a60908556512de93ed7cf2a2a64827d2b69f..8c8439b558f017ac3359a2cc918fe110df38949e 100644 --- a/src/main/java/org/zbus/mq/Consumer.java +++ b/src/main/java/org/zbus/mq/Consumer.java @@ -38,7 +38,7 @@ public class Consumer extends MqAdmin implements Closeable { private MessageClient client; // 消费者拥有一个物理链接 private String topic = null; // 为发布订阅者的主题,当Consumer的模式为发布订阅时候起作用 - private int consumeTimeout = 10000; //ms + private int consumeTimeout = 300000; //5 minutes public Consumer(Broker broker, String mq, MqMode... mode) { super(broker, mq, mode); } @@ -52,7 +52,7 @@ public class Consumer extends MqAdmin implements Closeable { if(this.client == null){ synchronized (this) { if(this.client == null){ - this.client = broker.getClient(brokerHint()); + this.client = broker.getClient(brokerHint()); } } } @@ -64,6 +64,7 @@ public class Consumer extends MqAdmin implements Closeable { Message req = new Message(); req.setCmd(Protocol.Consume); req.setMq(mq); + req.setHead("token", accessToken); if (MqMode.isEnabled(this.mode, MqMode.PubSub)) { if (this.topic != null) { req.setTopic(this.topic); @@ -73,16 +74,18 @@ public class Consumer extends MqAdmin implements Closeable { Message res = null; try { res = client.invokeSync(req, timeout); - if (res != null && res.isStatus404()) { + if(res == null) return res; + res.setId(res.getRawId()); + res.removeHead(Message.RAWID); + if(res.isStatus200()) return res; + + if(res.isStatus404()) { if (!this.createMQ()) { - throw new IllegalStateException("register error"); + throw new MqException(res.getBodyString()); } return recv(timeout); } - if(res != null){ - res.setId(res.getRawId()); - res.removeHead(Message.RAWID); - } + throw new MqException(res.getBodyString()); } catch (ClosedByInterruptException e) { throw new InterruptedException(e.getMessage()); } catch (IOException e) { @@ -105,7 +108,7 @@ public class Consumer extends MqAdmin implements Closeable { } @Override - protected Message invokeCreateMQ(Message req) throws IOException, + protected Message invokeSync(Message req) throws IOException, InterruptedException { ensureClient(); return client.invokeSync(req); diff --git a/src/main/java/org/zbus/mq/MqAdmin.java b/src/main/java/org/zbus/mq/MqAdmin.java index 6cb81a147f38d596407de3a33a5db3dac618154c..bfbbb0df97460305f9fd09f46d8f87e5ac9c263e 100644 --- a/src/main/java/org/zbus/mq/MqAdmin.java +++ b/src/main/java/org/zbus/mq/MqAdmin.java @@ -34,6 +34,8 @@ public class MqAdmin{ protected final Broker broker; protected String mq; //队列唯一性标识 protected final int mode; + protected String accessToken = ""; + protected String registerToken = ""; public MqAdmin(Broker broker, String mq, MqMode... mode){ this.broker = broker; @@ -49,6 +51,8 @@ public class MqAdmin{ this.broker = config.getBroker(); this.mq = config.getMq(); this.mode = config.getMode(); + this.accessToken = config.getAccessToken(); + this.registerToken = config.getRegisterToken(); } protected BrokerHint brokerHint(){ @@ -63,17 +67,29 @@ public class MqAdmin{ * @return * @throws IOException */ - protected Message invokeCreateMQ(Message req) throws IOException, InterruptedException{ + protected Message invokeSync(Message req) throws IOException, InterruptedException{ return broker.invokeSync(req, 2500); } + + public Message queryMQ() throws IOException, InterruptedException{ + Message req = new Message(); + req.setCmd(Protocol.Query); + req.setMq(this.mq); + req.setHead("register_token", registerToken); + req.setHead("access_token", accessToken); + + return invokeSync(req); + } public boolean createMQ() throws IOException, InterruptedException{ Message req = new Message(); req.setCmd(Protocol.CreateMQ); req.setHead("mq_name", mq); - req.setHead("mq_mode", "" + mode); + req.setHead("mq_mode", "" + mode); + req.setHead("register_token", registerToken); + req.setHead("access_token", accessToken); - Message res = invokeCreateMQ(req); + Message res = invokeSync(req); if(res == null) return false; return res.isStatus200(); } diff --git a/src/main/java/org/zbus/mq/MqConfig.java b/src/main/java/org/zbus/mq/MqConfig.java index c6affd53f568691e53865213960b6cd9dde59b22..b4d1289f16d0aa25f184bd0fc48001d6e7c12e58 100644 --- a/src/main/java/org/zbus/mq/MqConfig.java +++ b/src/main/java/org/zbus/mq/MqConfig.java @@ -30,7 +30,10 @@ public class MqConfig implements Cloneable { protected String mq; //MQ标识,必须设置 protected int mode = MqMode.MQ.intValue(); //创建消息队列时采用到 protected String topic = null; //发布订阅模式下使用 - private boolean verbose = false; + protected boolean verbose = false; + protected String registerToken = ""; + protected String accessToken = ""; + public Broker getBroker() { return broker; @@ -76,6 +79,24 @@ public class MqConfig implements Cloneable { this.verbose = verbose; } + + + public String getRegisterToken() { + return registerToken; + } + + public void setRegisterToken(String registerToken) { + this.registerToken = registerToken; + } + + public String getAccessToken() { + return accessToken; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + @Override public MqConfig clone() { try { diff --git a/src/main/java/org/zbus/mq/Producer.java b/src/main/java/org/zbus/mq/Producer.java index 242ea8ed4e59e761634595c87a8d31f0854d8a73..4c23ccf83436793d6fc063ded8558d7574796fb8 100644 --- a/src/main/java/org/zbus/mq/Producer.java +++ b/src/main/java/org/zbus/mq/Producer.java @@ -1,68 +1,82 @@ -/** - * The MIT License (MIT) - * Copyright (c) 2009-2015 HONG LEIMING - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package org.zbus.mq; - -import java.io.IOException; - -import org.zbus.broker.Broker; -import org.zbus.mq.Protocol.MqMode; -import org.zbus.net.Sync.ResultCallback; -import org.zbus.net.http.Message; - -public class Producer extends MqAdmin{ - - public Producer(Broker broker, String mq, MqMode... mode) { - super(broker, mq, mode); - } - - public Producer(MqConfig config){ - super(config); - } - - public void sendAsync(Message msg, final ResultCallback callback) - throws IOException { - msg.setCmd(Protocol.Produce); - msg.setMq(this.mq); - msg.setAck(true); - - broker.invokeAsync(msg, callback); - } - - public void sendAsync(Message msg) throws IOException { - sendAsync(msg, null); - } - - - public Message sendSync(Message msg, int timeout) throws IOException, InterruptedException{ - msg.setCmd(Protocol.Produce); - msg.setMq(this.mq); - msg.setAck(true); - - return broker.invokeSync(msg, timeout); - } - - public Message sendSync(Message msg) throws IOException, InterruptedException{ - return sendSync(msg, 10000); - } - -} +/** + * The MIT License (MIT) + * Copyright (c) 2009-2015 HONG LEIMING + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package org.zbus.mq; + +import java.io.IOException; + +import org.zbus.broker.Broker; +import org.zbus.mq.Protocol.MqMode; +import org.zbus.net.Sync.ResultCallback; +import org.zbus.net.http.Message; +import org.zbus.net.http.Message.MessageInvoker; + +public class Producer extends MqAdmin implements MessageInvoker{ + + public Producer(Broker broker, String mq, MqMode... mode) { + super(broker, mq, mode); + } + + public Producer(MqConfig config){ + super(config); + } + + public void sendAsync(Message msg, final ResultCallback callback) + throws IOException { + invokeAsync(msg, callback); + } + + public void sendAsync(Message msg) throws IOException { + sendAsync(msg, null); + } + + + public Message sendSync(Message msg, int timeout) throws IOException, InterruptedException{ + return invokeSync(msg, timeout); + } + + public Message sendSync(Message msg) throws IOException, InterruptedException{ + return sendSync(msg, 10000); + } + + + @Override + public Message invokeSync(Message msg, int timeout) throws IOException, + InterruptedException { + msg.setCmd(Protocol.Produce); + msg.setMq(this.mq); + msg.setAck(true); + + return broker.invokeSync(msg, timeout); + } + + @Override + public void invokeAsync(Message msg, ResultCallback callback) + throws IOException { + msg.setCmd(Protocol.Produce); + msg.setMq(this.mq); + msg.setAck(true); + + broker.invokeAsync(msg, callback); + } + +} diff --git a/src/main/java/org/zbus/mq/Protocol.java b/src/main/java/org/zbus/mq/Protocol.java index 5e7b67f492ad11015a89f5ed8d955a1ac7b3034b..5de0f944da43743607b66e6068ed5ac42a03e11e 100644 --- a/src/main/java/org/zbus/mq/Protocol.java +++ b/src/main/java/org/zbus/mq/Protocol.java @@ -34,14 +34,20 @@ public class Protocol { public static final String Consume = "consume"; //消费消息 public static final String Route = "route"; //请求等待应答消息 public static final String CreateMQ = "create_mq"; //创建队列 - public static final String Auth = "auth"; + public static final String Auth = "auth"; + public static final String Query = "query"; + public static final String Test = "test"; + + + public static final String Data = "data"; + public static final String Jquery = "jquery"; - public static final String Admin = "admin"; //管理类消息 public static enum MqMode { MQ, //消息队列 PubSub, //发布订阅 - Memory; //是否临时 + Memory, //是否临时 + RPC; private MqMode(){ mask = (1 << ordinal()); diff --git a/src/main/java/org/zbus/mq/server/AbstractMQ.java b/src/main/java/org/zbus/mq/server/AbstractMQ.java index 4f6a9cd36f121f9d1dc5022afbc8bca8942a20be..417b0b73732d308c9065ffbd5b97fd3275daedf3 100644 --- a/src/main/java/org/zbus/mq/server/AbstractMQ.java +++ b/src/main/java/org/zbus/mq/server/AbstractMQ.java @@ -32,9 +32,13 @@ import org.zbus.net.http.Message; public abstract class AbstractMQ{ protected final String name; protected String creator; + protected int mode; protected long lastUpdateTime = System.currentTimeMillis(); protected final AbstractQueue msgQ; + protected String accessToken = ""; + + protected Auth auth; public AbstractMQ(String name, AbstractQueue msgQ) { this.msgQ = msgQ; @@ -59,4 +63,26 @@ public abstract class AbstractMQ{ public abstract void cleanSession(); public abstract MqInfo getMqInfo(); + + public void setAccessToken(String accessToken){ + this.accessToken = accessToken; + this.auth = new DefaultAuth(this.accessToken); + } + + public void setAuth(Auth auth){ + this.auth = auth; + } + + public boolean auth(String appid, String token){ + if(this.auth == null) return true; + return this.auth.auth(appid, token); + } + + public int getMode() { + return mode; + } + + public void setMode(int mode) { + this.mode = mode; + } } diff --git a/src/main/java/org/zbus/mq/server/Auth.java b/src/main/java/org/zbus/mq/server/Auth.java new file mode 100644 index 0000000000000000000000000000000000000000..5a74cd196dfda5311cf422af4b2a30a4f29912bb --- /dev/null +++ b/src/main/java/org/zbus/mq/server/Auth.java @@ -0,0 +1,16 @@ +package org.zbus.mq.server; + +public interface Auth { + boolean auth(String appid, String token); +} + +class DefaultAuth implements Auth{ + private String accessToken; + public DefaultAuth(String accessToken){ + this.accessToken = accessToken; + } + @Override + public boolean auth(String appid, String token) { + return accessToken.equals(token); + } +} diff --git a/src/main/java/org/zbus/mq/server/MQ.java b/src/main/java/org/zbus/mq/server/MQ.java index 8c46228ff1f1c6b0861bbb2a7f32dec7ae336023..04273cba395c1567decdeb38f1c47d18af4ef8f5 100644 --- a/src/main/java/org/zbus/mq/server/MQ.java +++ b/src/main/java/org/zbus/mq/server/MQ.java @@ -34,7 +34,6 @@ import java.util.concurrent.LinkedBlockingQueue; import org.zbus.kit.log.Logger; import org.zbus.mq.Protocol.ConsumerInfo; import org.zbus.mq.Protocol.MqInfo; -import org.zbus.mq.Protocol.MqMode; import org.zbus.net.core.Session; import org.zbus.net.http.Message; @@ -133,7 +132,7 @@ public class MQ extends AbstractMQ{ info.name = name; info.lastUpdateTime = lastUpdateTime; info.creator = creator; - info.mode = MqMode.MQ.intValue(); + info.mode = this.mode; info.unconsumedMsgCount = msgQ.size(); info.consumerCount = pullSessions.size(); info.consumerInfoList = new ArrayList(); diff --git a/src/main/java/org/zbus/mq/server/MqAdaptor.java b/src/main/java/org/zbus/mq/server/MqAdaptor.java index edd4541dea84976b15f8d1d3f3cba084e9ea9223..83c00d5ad36fc6292b680964d2833c547c191090 100644 --- a/src/main/java/org/zbus/mq/server/MqAdaptor.java +++ b/src/main/java/org/zbus/mq/server/MqAdaptor.java @@ -34,6 +34,7 @@ public class MqAdaptor extends IoAdaptor implements Closeable { private boolean verbose = false; private final MqServer mqServer; + private String registerToken = ""; public MqAdaptor(MqServer mqServer){ @@ -41,15 +42,65 @@ public class MqAdaptor extends IoAdaptor implements Closeable { this.mqServer = mqServer; this.mqTable = mqServer.getMqTable(); this.sessionTable = mqServer.getSessionTable(); + this.registerToken = mqServer.getRegisterToken(); registerHandler(Protocol.Produce, produceHandler); registerHandler(Protocol.Consume, consumeHandler); registerHandler(Protocol.Route, routeHandler); - registerHandler(Protocol.CreateMQ, createMqHandler); - registerHandler(Protocol.Admin, new AdminHandler()); + registerHandler(Protocol.CreateMQ, createMqHandler); + registerHandler(Protocol.Test, testHandler); + registerHandler(Protocol.Query, queryHandler); + + registerHandler("", homeHandler); + registerHandler(Protocol.Data, dataHandler); + registerHandler(Protocol.Jquery, jqueryHandler); + registerHandler(Message.HEARTBEAT, heartbeatHandler); } + + private Message handleUrlMessage(Message msg){ + UrlInfo url = new UrlInfo(msg.getRequestString()); + if(url.empty){ + msg.setCmd(""); //default to home monitor + return msg; + } + + if(url.mq != null){ + if(msg.getMq() == null){ + msg.setMq(url.mq); + } + String method = url.method; + if(method == null){ + method = ""; + } + if(url.method != null || url.cmd == null){ + AbstractMQ mq = mqTable.get(url.mq); + if(mq != null && MqMode.isEnabled(mq.getMode(), MqMode.RPC)){ + msg.setMq(url.mq); + msg.setAck(false); + msg.setCmd(Protocol.Produce); + String module = url.module == null? "" : url.module; + String json = "{"; + json += "\"module\": " + "\"" + module + "\""; + json += ", \"method\": " + "\"" + method + "\""; + if(url.params != null){ + json += ", \"params\": " + "[" + url.params + "]"; + } + json += "}"; + msg.setJsonBody(json); + } + } + } + + if(url.cmd != null){ + if(msg.getCmd() == null){ + msg.setCmd(url.cmd); + } + } + + return msg; + } public void onMessage(Object obj, Session sess) throws IOException { Message msg = (Message)obj; @@ -62,14 +113,17 @@ public class MqAdaptor extends IoAdaptor implements Closeable { } String cmd = msg.getCmd(); - if(cmd == null){ //default to Admin - cmd = Protocol.Admin; - } - - MessageHandler handler = handlerMap.get(cmd); - if(handler != null){ - handler.handle(msg, sess); - return; + + if(cmd == null){ //处理URL消息格式,否则url忽略不计 + msg = handleUrlMessage(msg); + cmd = msg.getCmd(); + } + if(cmd != null){ + MessageHandler handler = handlerMap.get(cmd); + if(handler != null){ + handler.handle(msg, sess); + return; + } } Message res = new Message(); @@ -99,6 +153,11 @@ public class MqAdaptor extends IoAdaptor implements Closeable { public void handle(Message msg, Session sess) throws IOException { AbstractMQ mq = findMQ(msg, sess); if(mq == null) return; + if(!auth(mq, msg)){ + ReplyKit.reply403(msg, sess); + return; + } + boolean ack = msg.isAck(); msg.removeHead(Message.CMD); msg.removeHead(Message.ACK); @@ -116,6 +175,11 @@ public class MqAdaptor extends IoAdaptor implements Closeable { public void handle(Message msg, Session sess) throws IOException { AbstractMQ mq = findMQ(msg, sess); if(mq == null) return; + if(!auth(mq, msg)){ + ReplyKit.reply403(msg, sess); + return; + } + mq.consume(msg, sess); String mqName = sess.attr("mq"); @@ -141,13 +205,25 @@ public class MqAdaptor extends IoAdaptor implements Closeable { msg.removeHead(Message.ACK); msg.removeHead(Message.RECVER); msg.removeHead(Message.CMD); - target.write(msg); + try{ + target.write(msg); + } catch(Exception ex){ + log.warn("Target(%s) write failed, Ignore", recver); + return; //just ignore + } } }; private MessageHandler createMqHandler = new MessageHandler() { @Override public void handle(Message msg, Session sess) throws IOException { + String registerToken = msg.getHead("register_token", ""); + if(!MqAdaptor.this.registerToken.equals(registerToken)){ + msg.setBody("registerToken unmatched"); + ReplyKit.reply403(msg, sess); + return; + } + String mqName = msg.getHead("mq_name", ""); mqName = mqName.trim(); if("".equals(mqName)){ @@ -166,11 +242,12 @@ public class MqAdaptor extends IoAdaptor implements Closeable { try{ mode = Integer.valueOf(mqMode); } catch (Exception e){ - msg.setBody("mqMode invalid"); + msg.setBody("mq_mode invalid"); ReplyKit.reply400(msg, sess); return; } + String accessToken = msg.getHead("access_token", ""); AbstractMQ mq = null; synchronized (mqTable) { mq = mqTable.get(mqName); @@ -180,7 +257,8 @@ public class MqAdaptor extends IoAdaptor implements Closeable { } AbstractQueue support = null; - if(MqMode.isEnabled(mode, MqMode.Memory)){ + if(MqMode.isEnabled(mode, MqMode.Memory) || + MqMode.isEnabled(mode, MqMode.RPC)){ support = new MessageMemoryQueue(); } else { support = new MessageDiskQueue(mqName, mode); @@ -191,7 +269,10 @@ public class MqAdaptor extends IoAdaptor implements Closeable { } else { mq = new MQ(mqName, support); } + mq.setMode(mode); mq.creator = sess.getRemoteAddress(); + mq.setAccessToken(accessToken); + log.info("MQ Created: %s", mq); mqTable.put(mqName, mq); ReplyKit.reply200(msg, sess); @@ -201,6 +282,78 @@ public class MqAdaptor extends IoAdaptor implements Closeable { } }; + private MessageHandler testHandler = new MessageHandler() { + public void handle(Message msg, Session sess) throws IOException { + Message res = new Message(); + res.setResponseStatus(200); + res.setId(msg.getId()); + res.setBody("OK"); + sess.write(res); + } + }; + + private MessageHandler homeHandler = new MessageHandler() { + public void handle(Message msg, Session sess) throws IOException { + msg = new Message(); + msg.setResponseStatus("200"); + msg.setHead("content-type", "text/html"); + String body = FileKit.loadFileContent("zbus.htm"); + if ("".equals(body)) { + body = "zbus.htm file missing"; + } + msg.setBody(body); + sess.write(msg); + } + }; + + private MessageHandler jqueryHandler = new MessageHandler() { + public void handle(Message msg, Session sess) throws IOException { + msg = new Message(); + msg.setResponseStatus("200"); + msg.setHead("content-type", "application/javascript"); + String body = FileKit.loadFileContent("jquery.js"); + msg.setBody(body); + sess.write(msg); + } + }; + + private MessageHandler dataHandler = new MessageHandler() { + public void handle(Message msg, Session sess) throws IOException { + BrokerInfo info = getStatInfo(); + + Message data = new Message(); + data.setResponseStatus("200"); + data.setId(msg.getId()); + data.setHead("content-type", "application/json"); + data.setBody(JsonKit.toJson(info)); + sess.write(data); + } + }; + + private MessageHandler queryHandler = new MessageHandler() { + public void handle(Message msg, Session sess) throws IOException { + String json = ""; + if(msg.getMq() == null){ + BrokerInfo info = getStatInfo(); + json = JsonKit.toJson(info); + } else { + AbstractMQ mq = findMQ(msg, sess); + if(mq == null){ + return; + } else { + json = JsonKit.toJson(mq.getMqInfo()); + } + } + + Message data = new Message(); + data.setResponseStatus("200"); + data.setId(msg.getId()); + data.setHead("content-type", "application/json"); + data.setBody(json); + sess.write(data); + } + }; + private MessageHandler heartbeatHandler = new MessageHandler() { @Override public void handle(Message msg, Session sess) throws IOException { @@ -238,6 +391,12 @@ public class MqAdaptor extends IoAdaptor implements Closeable { super.onSessionToDestroy(sess); } + private boolean auth(AbstractMQ mq, Message msg){ + String appid = msg.getHead("appid", ""); + String token = msg.getHead("token", ""); + return mq.auth(appid, token); + } + public void setVerbose(boolean verbose) { this.verbose = verbose; } @@ -245,7 +404,9 @@ public class MqAdaptor extends IoAdaptor implements Closeable { public BrokerInfo getStatInfo(){ Map table = new HashMap(); for(Map.Entry e : this.mqTable.entrySet()){ - table.put(e.getKey(), e.getValue().getMqInfo()); + MqInfo info = e.getValue().getMqInfo(); + info.consumerInfoList.clear(); //clear to avoid long list + table.put(e.getKey(), info); } BrokerInfo info = new BrokerInfo(); info.broker = mqServer.getServerAddr(); @@ -266,10 +427,11 @@ public class MqAdaptor extends IoAdaptor implements Closeable { int flag = diskq.getFlag(); AbstractQueue queue = new MessageDiskQueue(name, diskq); if( MqMode.isEnabled(flag, MqMode.PubSub)){ - mq = new PubSub(name, queue); + mq = new PubSub(name, queue); } else { - mq = new MQ(name, queue); + mq = new MQ(name, queue); } + mq.setMode(flag); mq.lastUpdateTime = System.currentTimeMillis(); mq.creator = "System"; mqTable.put(name, mq); @@ -281,73 +443,5 @@ public class MqAdaptor extends IoAdaptor implements Closeable { public void close() throws IOException { DiskQueuePool.destory(); - } - - - private class AdminHandler implements MessageHandler { - private Map handlerMap = new ConcurrentHashMap(); - public AdminHandler() { - handlerMap.put("", homeHandler); - handlerMap.put("jquery", jqueryHandler); - handlerMap.put("data", dataHandler); - } - - public void handle(Message msg, Session sess) throws IOException { - String subCmd = msg.getSubCmd(); //from header first - if (subCmd == null){ - subCmd = msg.getRequestParam(Message.SUB_CMD); //from request Param - } - if(subCmd == null){ //default to home - subCmd = ""; - } - - MessageHandler handler = this.handlerMap.get(subCmd); - if (handler == null) { - msg.setBody("sub_cmd=%s Not Found", subCmd); - ReplyKit.reply404(msg, sess); - return; - } - handler.handle(msg, sess); - } - - private MessageHandler homeHandler = new MessageHandler() { - public void handle(Message msg, Session sess) throws IOException { - msg = new Message(); - msg.setResponseStatus("200"); - msg.setHead("content-type", "text/html"); - String body = FileKit.loadFileContent("zbus.htm"); - if ("".equals(body)) { - body = "zbus.htm file missing"; - } - msg.setBody(body); - sess.write(msg); - } - }; - - private MessageHandler jqueryHandler = new MessageHandler() { - public void handle(Message msg, Session sess) throws IOException { - msg = new Message(); - msg.setResponseStatus("200"); - msg.setHead("content-type", "application/javascript"); - String body = FileKit.loadFileContent("jquery.js"); - msg.setBody(body); - sess.write(msg); - } - }; - - private MessageHandler dataHandler = new MessageHandler() { - public void handle(Message msg, Session sess) throws IOException { - BrokerInfo info = getStatInfo(); - - Message data = new Message(); - data.setResponseStatus("200"); - data.setId(msg.getId()); - data.setHead("content-type", "application/json"); - data.setBody(JsonKit.toJson(info)); - sess.write(data); - } - }; - - } - + } } \ No newline at end of file diff --git a/src/main/java/org/zbus/mq/server/MqServer.java b/src/main/java/org/zbus/mq/server/MqServer.java index 4d2c564c0654957e9737232a2ea415f2d6d1ba13..8ff40228def1c1f21530e6dc5b963584d451c4f5 100644 --- a/src/main/java/org/zbus/mq/server/MqServer.java +++ b/src/main/java/org/zbus/mq/server/MqServer.java @@ -52,10 +52,13 @@ public class MqServer extends Server{ private long trackInterval = 5000; private MqServerConfig config; + private String registerToken = ""; public MqServer(MqServerConfig config){ this.config = config; serverName = "MqServer"; + registerToken = config.registerToken; + serverMainIpOrder = config.serverMainIpOrder; dispatcher = new Dispatcher(); dispatcher.selectorCount(config.selectorCount); dispatcher.executorCount(config.executorCount); @@ -149,6 +152,10 @@ public class MqServer extends Server{ trackPub.pubEntryUpdate(se); } + public String getRegisterToken(){ + return registerToken; + } + public Map getMqTable() { return mqTable; } @@ -161,12 +168,13 @@ public class MqServer extends Server{ MqServerConfig config = new MqServerConfig(); config.serverHost = ConfigKit.option(args, "-h", "0.0.0.0"); config.serverPort = ConfigKit.option(args, "-p", 15555); - config.selectorCount = ConfigKit.option(args, "-selector", 1); + config.selectorCount = ConfigKit.option(args, "-selector", 0); //0 means default to CPU/4 config.executorCount = ConfigKit.option(args, "-executor", 64); config.verbose = ConfigKit.option(args, "-verbose", false); config.storePath = ConfigKit.option(args, "-store", "store"); config.trackServerList = ConfigKit.option(args, "-track", null); config.thriftServer = ConfigKit.option(args, "-thrift", null); + config.serverMainIpOrder = ConfigKit.option(args, "-ipOrder", null); final MqServer server = new MqServer(config); if(config.thriftServer != null){ diff --git a/src/main/java/org/zbus/mq/server/MqServerConfig.java b/src/main/java/org/zbus/mq/server/MqServerConfig.java index 833e1edd1c3a92ae03ab4d766bf8edd4a7971919..860f8ee743003e8b8c05ff621bda6e73201e6d2a 100644 --- a/src/main/java/org/zbus/mq/server/MqServerConfig.java +++ b/src/main/java/org/zbus/mq/server/MqServerConfig.java @@ -22,21 +22,8 @@ */ package org.zbus.mq.server; -import static org.zbus.kit.ConfigKit.value; -import static org.zbus.kit.ConfigKit.valueSet; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; - -import org.zbus.kit.FileKit; -import org.zbus.kit.log.Logger; - -public class MqServerConfig{ - private static final Logger log = Logger.getLogger(MqServerConfig.class); - +public class MqServerConfig{ public String trackServerList = "127.0.0.1:16666;127.0.0.1:16667"; public String serverHost = "0.0.0.0"; @@ -46,111 +33,11 @@ public class MqServerConfig{ public int selectorCount = 1; public int executorCount = 64; public boolean verbose = true; - public String storePath = "mq"; - - - public Set roles = new HashSet(); - public Set users = new HashSet(); - public Set mqs = new HashSet(); - public Set pubsubs = new HashSet(); - + public String storePath = "store"; + public String registerToken = ""; + public String serverMainIpOrder = null; public String getServerAddress(){ return serverHost + ":" + serverPort; - } - - public void load(InputStream is) throws IOException{ - Properties props = new Properties(); - if(is == null){ - log.info("Using default settings, missing config file"); - } else { - props.load(is); - } - - this.serverHost = value(props, "zbus.serverHost", "0.0.0.0"); - this.serverPort = value(props, "zbus.serverPort", 15555); - this.selectorCount = value(props, "zbus.selectorCount", 1); - this.executorCount = value(props, "zbus.executorCount", 64); - this.verbose = value(props, "zbus.verbose", true); - this.storePath = value(props, "zbus.storePath", "mq"); - - this.roles = valueSet(props, "zbus.role"); - Set userNames = valueSet(props, "zbus.user"); - for(String name : userNames){ - User user = new User(); - user.name = name; - user.password = value(props, String.format("zbus.user.%s.password", name)); - user.roles = valueSet(props, String.format("zbus.user.%s.role", name)); - - this.users.add(user); - } - - for(String name : valueSet(props, "zbus.mq")){ - MqEntry entry = new MqEntry(); - entry.name = name; - entry.allowUsers = valueSet(props, String.format("zbus.mq.%s.allowUser", name)); - entry.allowRoles = valueSet(props, String.format("zbus.mq.%s.allowRole", name)); - - this.mqs.add(entry); - } - - for(String name : valueSet(props, "zbus.pubsub")){ - MqEntry entry = new MqEntry(); - entry.name = name; - entry.allowUsers = valueSet(props, String.format("zbus.pubsub.%s.allowUser", name)); - entry.allowRoles = valueSet(props, String.format("zbus.pubsub.%s.allowRole", name)); - - this.pubsubs.add(entry); - } - } - - public void load(String fileName) throws IOException{ - try{ - InputStream is = FileKit.loadFile(fileName); - if(is == null){ - log.info("Using default settings, missing config file"); - } else { - load(is); - } - } catch(Exception e){ - log.error(e.getMessage(), e); - } } - - public static class User { - public String name; - public String password; - public Set roles = new HashSet(); - @Override - public String toString() { - return "User [name=" + name + ", password=" + password + ", roles=" - + roles + "]"; - } - } - - public static class MqEntry { - public String name; - public Set allowUsers = new HashSet(); - public Set allowRoles = new HashSet(); - @Override - public String toString() { - return "MqEntry [name=" + name + ", allowUsers=" + allowUsers - + ", allowRoles=" + allowRoles + "]"; - } - - } - - - - public static void main(String[] args) throws Exception{ - MqServerConfig config = new MqServerConfig(); - - config.load("zbus.properties"); - - System.out.println("roles:"+config.roles); - System.out.println("users:"+config.users); - System.out.println("mqs:"+config.mqs); - System.out.println("pubsubs:"+config.pubsubs); - } - } \ No newline at end of file diff --git a/src/main/java/org/zbus/mq/server/PubSub.java b/src/main/java/org/zbus/mq/server/PubSub.java index 8e22f2604e91be69d94d11c6231b2559b76df6aa..db8eaa9c386929d438e50caefad58990a851ff20 100644 --- a/src/main/java/org/zbus/mq/server/PubSub.java +++ b/src/main/java/org/zbus/mq/server/PubSub.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; import org.zbus.kit.log.Logger; import org.zbus.mq.Protocol.ConsumerInfo; import org.zbus.mq.Protocol.MqInfo; -import org.zbus.mq.Protocol.MqMode; import org.zbus.net.core.Session; import org.zbus.net.http.Message; @@ -92,7 +91,7 @@ public class PubSub extends AbstractMQ{ if(pullMessage == null) continue; msg = pull.getMsgQ().poll(); - if(msg == null) break; + if(msg == null) continue; msg.setResponseStatus(200); //支持浏览器 msg.setRawId(msg.getId()); @@ -137,12 +136,14 @@ public class PubSub extends AbstractMQ{ info.name = name; info.lastUpdateTime = lastUpdateTime; info.creator = creator; - info.mode = MqMode.PubSub.intValue(); + info.mode = this.mode; + info.unconsumedMsgCount = msgQ.size(); info.consumerInfoList = new ArrayList(); for(PullSession pull : pullMap.values()){ info.consumerInfoList.add(pull.getConsumerInfo()); } + info.consumerCount = info.consumerInfoList.size(); return info; } } diff --git a/src/main/java/org/zbus/mq/server/UrlInfo.java b/src/main/java/org/zbus/mq/server/UrlInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..5888bec045d530d39f80ed5a3abe6eccfe933ef3 --- /dev/null +++ b/src/main/java/org/zbus/mq/server/UrlInfo.java @@ -0,0 +1,131 @@ +package org.zbus.mq.server; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class UrlInfo{ + public String cmd; + public String mq; + public String module; + public String method; + public String params; + public Map extra; + public boolean empty = false; + + public UrlInfo(String url){ + this(url, false); + } + + public UrlInfo(String url, boolean directRpc){ + if(url == null || "".equals(url)){ + empty = true; + return; + } + if(url.length() == 1 && url.charAt(0) == '/'){ + empty = true; + return; + } + try { + url = URLDecoder.decode(url, "UTF8"); + } catch (UnsupportedEncodingException e) { + return; + } + String kvs; + boolean hasKvs = url.indexOf('?') != -1; + String[] bb = url.split("[/?]+"); + if(hasKvs){ + kvs = bb[bb.length-1]; + if(!directRpc){ + if(bb.length >= 5){ + mq = bb[1]; + module = bb[2]; + method = bb[3]; + } else if(bb.length >= 4){ + mq = bb[1]; + method = bb[2]; + } else if(bb.length >= 3){ + mq = bb[1]; + } + } else { + if(bb.length >= 4){ + module = bb[1]; + method = bb[2]; + } else if(bb.length >= 3){ + method = bb[1]; + } + } + } else { + if(!directRpc){ + if(bb.length >= 4){ + mq = bb[1]; + module = bb[2]; + method = bb[3]; + } else if(bb.length >= 3){ + mq = bb[1]; + method = bb[2]; + } else if(bb.length >= 2){ + mq = bb[1]; + } + } else { + if(bb.length >= 3){ + module = bb[1]; + method = bb[2]; + } else if(bb.length >= 2){ + method = bb[1]; + } + } + return; + } + + bb = kvs.split("[&]+"); + for(String b : bb){ + if("".equals(b)) continue; + int idx = b.indexOf('='); + if(idx < 0){ + if(params == null || params.equals("")){ + params = b; + } + continue; + } + String key = b.substring(0, idx); + String val = b.substring(idx+1); + if(extra == null){ + synchronized (this) { + if(extra == null){ + extra = new ConcurrentHashMap(); + } + } + } + extra.put(key, val); + } + if(extra != null){ + if(cmd == null && extra.containsKey("cmd")){ + cmd = extra.get("cmd"); + } + if(mq == null && extra.containsKey("mq")){ + mq = extra.get("mq"); + } + if(method == null && extra.containsKey("method")){ + method = extra.get("method"); + } + if(params == null && extra.containsKey("params")){ + params = extra.get("params"); + } + } + } + + @Override + public String toString() { + return "UrlInfo [cmd=" + cmd + ", mq=" + mq + ", module=" + module + + ", method=" + method + ", params=" + params + ", extra=" + + extra + "]"; + } + + public static void main(String[] args){ + UrlInfo url = new UrlInfo("/t2/xxx?ok", true); + System.out.println(url); + } + +} diff --git a/src/main/java/org/zbus/net/Client.java b/src/main/java/org/zbus/net/Client.java index 09200f90ab67169be17c0832f55c1ac4bcd3266e..1c06bec4c78a9a400c251af0a6bbde8587e81ede 100644 --- a/src/main/java/org/zbus/net/Client.java +++ b/src/main/java/org/zbus/net/Client.java @@ -53,9 +53,9 @@ public class Client extends IoAdaptor implements Closeable { protected Session session; protected volatile MsgHandler msgHandler; - protected ErrorHandler errorHandler; - protected ConnectedHandler connectedHandler; - protected DisconnectedHandler disconnectedHandler; + protected volatile ErrorHandler errorHandler; + protected volatile ConnectedHandler connectedHandler; + protected volatile DisconnectedHandler disconnectedHandler; public Client(String address, Dispatcher dispatcher) { String[] blocks = address.split("[:]"); @@ -167,6 +167,7 @@ public class Client extends IoAdaptor implements Closeable { @Override public void close() throws IOException { + this.onDisconnected(null); //clear disconnection handler if (this.session != null) { this.session.close(); } diff --git a/src/main/java/org/zbus/net/InvokingClient.java b/src/main/java/org/zbus/net/InvokingClient.java index 4ac54102613fa1f79eb41f306db57f91c22d4051..6b299506ce225e700b1dbacd22b1b207350def36 100644 --- a/src/main/java/org/zbus/net/InvokingClient.java +++ b/src/main/java/org/zbus/net/InvokingClient.java @@ -90,6 +90,12 @@ public class InvokingClient return this.invokeSync(req, this.readTimeout); } + @Override + protected void onSessionDestroyed(Session sess) throws IOException { + super.onSessionDestroyed(sess); + sync.clearTicket(); + } + public RES invokeSync(REQ req, int timeout) throws IOException, InterruptedException { Ticket ticket = null; try { diff --git a/src/main/java/org/zbus/net/MsgInvoker.java b/src/main/java/org/zbus/net/MsgInvoker.java index 18a355d16baf4e653624b3d781c30a711e4a07a5..5782fa3d1cab8c7c442da5546aa04dd1f4dd7401 100644 --- a/src/main/java/org/zbus/net/MsgInvoker.java +++ b/src/main/java/org/zbus/net/MsgInvoker.java @@ -20,7 +20,7 @@ public interface MsgInvoker { /** * 异步消息模式 * - * @param msg + * @param req * @param callback * @throws IOException */ diff --git a/src/main/java/org/zbus/net/Server.java b/src/main/java/org/zbus/net/Server.java index 0ba41e2174cc749f2a6e2775c86636a0af67ff36..617783ec28f048d67407bbd7285c53f36a75874b 100644 --- a/src/main/java/org/zbus/net/Server.java +++ b/src/main/java/org/zbus/net/Server.java @@ -24,6 +24,7 @@ package org.zbus.net; import java.io.Closeable; import java.io.IOException; +import java.net.ServerSocket; import java.nio.channels.ServerSocketChannel; import java.util.Map; import java.util.Map.Entry; @@ -45,8 +46,9 @@ public class Server implements Closeable{ } protected Dispatcher dispatcher; - protected String serverAddr; //第一个Server注册的地址 + protected String serverAddr; //第一个Server注册的地址, 当侦听启动后才有效 protected String serverName = "Server"; + protected String serverMainIpOrder = null; protected Map adaptors = new ConcurrentHashMap(); @@ -87,14 +89,9 @@ public class Server implements Closeable{ adaptor.adaptorName = name; adaptor.adaptorAddr = address; adaptor.serverAdaptor = ioAdaptor; - - if(adaptors.isEmpty()){ + + if(adaptors.isEmpty()){ //the first this.serverAddr = address; - String host = blocks[0]; - int port = Integer.valueOf(blocks[1]); - if("0.0.0.0".equals(host)){ - this.serverAddr = String.format("%s:%d",NetKit.getLocalIp(), port); - } } adaptors.put(address, adaptor); @@ -110,8 +107,31 @@ public class Server implements Closeable{ IoAdaptorInfo adaptor = e.getValue(); if(adaptor.serverChannel != null) continue; + String address = adaptor.adaptorAddr; + String[] bb = address.split("[:]"); + if(bb.length !=2 ) continue; + String host = bb[0]; + String port = bb[1]; + adaptor.serverChannel = dispatcher.registerServerChannel(adaptor.adaptorAddr, adaptor.serverAdaptor); - String info = String.format("%s-%s listening [%s]", this.serverName, adaptor.adaptorName, adaptor.adaptorAddr); + + ServerSocket ss = adaptor.serverChannel.socket(); + if(address.equals(serverAddr)){ + String localHost = host; + if("0.0.0.0".equals(host)){ + if(serverMainIpOrder == null){ + localHost = NetKit.getLocalIp(); + } else { + localHost = NetKit.getLocalIp(serverMainIpOrder); + } + } + serverAddr = String.format("%s:%d", localHost, ss.getLocalPort()); + } + if("0".equals(port)){ + adaptor.adaptorAddr = String.format("%s:%d", host, ss.getLocalPort()); + } + + String info = String.format("%s-%s listening [%s]", this.serverName, adaptor.adaptorName, adaptor.adaptorAddr); if(adaptor.adaptorName == null){ info = String.format("%s listening [%s]", this.serverName, adaptor.adaptorAddr); } diff --git a/src/main/java/org/zbus/net/Sync.java b/src/main/java/org/zbus/net/Sync.java index 92ae835455a2ebe1d4b4fa0dbf0a623d9e67b484..0dc060da083e724280e9a53715cc7b4d96ff522c 100644 --- a/src/main/java/org/zbus/net/Sync.java +++ b/src/main/java/org/zbus/net/Sync.java @@ -151,6 +151,13 @@ public class Sync { return tickets.remove(id); } + public void clearTicket(){ + for(Ticket ticket : tickets.values()){ + ticket.countDown(); + } + tickets.clear(); + } + public static interface Id { void setId(String id); diff --git a/src/main/java/org/zbus/net/core/Dispatcher.java b/src/main/java/org/zbus/net/core/Dispatcher.java index 9bdd41d23b319720d38fe55862a8c4a9a9f4ef5c..2cd66c2197f9e1a14a5d247671eca4893a918f78 100644 --- a/src/main/java/org/zbus/net/core/Dispatcher.java +++ b/src/main/java/org/zbus/net/core/Dispatcher.java @@ -42,12 +42,14 @@ import org.zbus.kit.log.Logger; public class Dispatcher implements Closeable { + public static final int DEFAULT_EXECUTOR_COUNT = 64; + private static final Logger log = Logger.getLogger(Dispatcher.class); private ExecutorService executor; - private int selectorCount = 1; - private int executorCount = 32; + private int selectorCount = defaultSelectorCount(); + private int executorCount = DEFAULT_EXECUTOR_COUNT; private SelectorThread[] selectors; private AtomicInteger selectorIndex = new AtomicInteger(0); private String dispatcherName = "Dispatcher"; @@ -166,12 +168,26 @@ public class Dispatcher implements Closeable { } public Dispatcher selectorCount(int count){ - this.selectorCount = count; + if(count <= 0){ + this.selectorCount = defaultSelectorCount(); + } else { + this.selectorCount = count; + } return this; } + public static int defaultSelectorCount(){ + int c = Runtime.getRuntime().availableProcessors()/2; + if(c <= 0) c = 1; + return c; + } + public Dispatcher executorCount(int count){ - this.executorCount = count; + if(count <= 0){ + this.executorCount = DEFAULT_EXECUTOR_COUNT; + } else { + this.executorCount = count; + } return this; } diff --git a/src/main/java/org/zbus/net/core/SelectorThread.java b/src/main/java/org/zbus/net/core/SelectorThread.java index d81695b14414785cde0931dac21b11fa8350ff7d..1b1da25e716a6035f82ebae24a9b80057e3b17e1 100644 --- a/src/main/java/org/zbus/net/core/SelectorThread.java +++ b/src/main/java/org/zbus/net/core/SelectorThread.java @@ -1,275 +1,274 @@ -/** - * The MIT License (MIT) - * Copyright (c) 2009-2015 HONG LEIMING - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package org.zbus.net.core; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.zbus.kit.NetKit; -import org.zbus.kit.log.Logger; -import org.zbus.net.core.Session.SessionStatus; - -public class SelectorThread extends Thread { - private static final Logger log = Logger.getLogger(SelectorThread.class); - - protected volatile java.nio.channels.Selector selector = null; - protected final Dispatcher dispatcher; - private final Queue register = new LinkedBlockingQueue(); - private final Queue unregister = new LinkedBlockingQueue(); - - public SelectorThread(Dispatcher dispatcher, String name) throws IOException{ - super(name); - this.dispatcher = dispatcher; - this.selector = java.nio.channels.Selector.open(); - } - - public SelectorThread(Dispatcher dispatcher) throws IOException{ - this(dispatcher, "Selector"); - } - - - public void registerChannel(SelectableChannel channel, int ops) throws IOException{ - registerChannel(channel, ops, null); - } - - public void registerSession(int ops, Session sess) throws IOException{ - registerChannel(sess.getChannel(), ops, sess); - } - - public void registerChannel(SelectableChannel channel, int ops, Session sess) throws IOException{ - if(Thread.currentThread() == this){ - SelectionKey key = channel.register(this.selector, ops, sess); - if(sess != null){ - sess.setRegisteredKey(key); - sess.setStatus(SessionStatus.CONNECTED); - sess.getIoAdaptor().onSessionRegistered(sess); - } - } else { - this.register.offer(new Object[]{channel, ops, sess}); - this.selector.wakeup(); - } - } - - public void unregisterSession(Session sess){ - if(this.unregister.contains(sess)){ - return; - } - this.unregister.add(sess); - this.selector.wakeup(); - } - - - @Override - public void interrupt() { - super.interrupt(); - try { - this.selector.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } - - - @Override - public void run() { - try{ - while(!isInterrupted()){ - selector.select(); - handleRegister(); - - Iterator iter = selector.selectedKeys().iterator(); - while(iter.hasNext()){ - SelectionKey key = iter.next(); - iter.remove(); - if(!key.isValid()) continue; - - Object att = key.attachment(); - if(att != null && att instanceof Session){ - ((Session)att).updateLastOperationTime(); - } - try{ - if(key.isAcceptable()){ - handleAcceptEvent(key); - } else if (key.isConnectable()){ - handleConnectEvent(key); - } else if (key.isReadable()){ - handleReadEvent(key); - } else if (key.isWritable()){ - handleWriteEvent(key); - } - } catch(Throwable e){ - disconnectWithException(key, e); - } - } - handleUnregister(); - } - } catch(Throwable e) { - if(!dispatcher.isStarted()){ - if(log.isDebugEnabled()){ - log.debug(e.getMessage(), e); - } - } else { - log.error("SelectorThread exit for fatal error:%s", e); - } - } - } - - private void disconnectWithException(final SelectionKey key, final Throwable e){ - final Session sess = (Session)key.attachment(); - if(sess == null){ - try{ - key.channel().close(); - key.cancel(); - } catch(Throwable ex){ - log.error(e.getMessage(), ex); - } - return; - } - - sess.setStatus(SessionStatus.ON_ERROR); - dispatcher.asyncRun(new Runnable() { - @Override - public void run() { - try{ - sess.getIoAdaptor().onException(e, sess); - } catch (Throwable ex){ - if(!dispatcher.isStarted()){ - log.debug(ex.getMessage(), ex); - } else { - log.error(ex.getMessage(), ex); - } - } - } - }); - - try{ - sess.close(); - key.cancel(); - } catch(Throwable ex){ - log.error(e.getMessage(), ex); - } - - } - - protected void handleRegister(){ - Object[] item = null; - while( (item=this.register.poll()) != null){ - try{ - SelectableChannel channel = (SelectableChannel) item[0]; - if (!channel.isOpen() ) continue; - int ops = (Integer)item[1]; - Session sess = (Session) item[2]; - - SelectionKey key = channel.register(this.selector, ops, sess); - if(sess != null){ - sess.setRegisteredKey(key); - sess.getIoAdaptor().onSessionRegistered(sess); - } - - }catch(Exception e){ - log.error(e.getMessage(), e); - } - } - } - - protected void handleUnregister(){ - Session sess = null; - while( (sess = this.unregister.poll()) != null ){ - try { - sess.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } - } - - - protected void handleAcceptEvent(SelectionKey key) throws IOException{ - ServerSocketChannel server = (ServerSocketChannel) key.channel(); - - SocketChannel channel = server.accept(); - channel.configureBlocking(false); - - if(log.isDebugEnabled()){ - log.debug("ACCEPT: server(%s) %s=>%s", NetKit.remoteAddress(channel), NetKit.localAddress(channel)); - } - - SocketAddress serverAddress = server.socket().getLocalSocketAddress(); - - IoAdaptor ioAdaptor = dispatcher.ioAdaptor(serverAddress); - if(ioAdaptor == null){ - log.warn("Missing IoAdaptor for %s", serverAddress); - return; - } - - Session sess = new Session(dispatcher, channel, ioAdaptor); - sess.setStatus(SessionStatus.CONNECTED); //set connected - - sess.getIoAdaptor().onSessionAccepted(sess); - - } - - protected void handleConnectEvent(SelectionKey key) throws IOException{ - final SocketChannel channel = (SocketChannel) key.channel(); - - Session sess = (Session) key.attachment(); - if(sess == null){ - throw new IOException("Session not attached yet to SelectionKey"); - } - - if(channel.finishConnect()){ - sess.finishConnect(); - if(log.isDebugEnabled()){ - log.debug("CONNECT: %s=>%s", NetKit.localAddress(channel), NetKit.remoteAddress(channel)); - } - } - sess.setStatus(SessionStatus.CONNECTED); - key.interestOps(0); //!!!clear interest of OP_CONNECT to avoid looping CPU !!! - sess.getIoAdaptor().onSessionConnected(sess); - - } - - protected void handleReadEvent(SelectionKey key) throws IOException{ - Session sess = (Session) key.attachment(); - if(sess == null){ - throw new IOException("Session not attached yet to SelectionKey"); - } - sess.doRead(); - } - - protected void handleWriteEvent(SelectionKey key) throws IOException{ - Session sess = (Session) key.attachment(); - if(sess == null){ - throw new IOException("Session not attached yet to SelectionKey"); - } - sess.doWrite(); - } - -} +/** + * The MIT License (MIT) + * Copyright (c) 2009-2015 HONG LEIMING + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package org.zbus.net.core; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.zbus.kit.NetKit; +import org.zbus.kit.log.Logger; +import org.zbus.net.core.Session.SessionStatus; + +public class SelectorThread extends Thread { + private static final Logger log = Logger.getLogger(SelectorThread.class); + + protected volatile java.nio.channels.Selector selector = null; + protected final Dispatcher dispatcher; + private final Queue register = new LinkedBlockingQueue(); + private final Queue unregister = new LinkedBlockingQueue(); + + public SelectorThread(Dispatcher dispatcher, String name) throws IOException{ + super(name); + this.dispatcher = dispatcher; + this.selector = java.nio.channels.Selector.open(); + } + + public SelectorThread(Dispatcher dispatcher) throws IOException{ + this(dispatcher, "Selector"); + } + + + public void registerChannel(SelectableChannel channel, int ops) throws IOException{ + registerChannel(channel, ops, null); + } + + public void registerSession(int ops, Session sess) throws IOException{ + registerChannel(sess.getChannel(), ops, sess); + } + + public void registerChannel(SelectableChannel channel, int ops, Session sess) throws IOException{ + if(Thread.currentThread() == this){ + SelectionKey key = channel.register(this.selector, ops, sess); + if(sess != null){ + sess.setRegisteredKey(key); + sess.setStatus(SessionStatus.CONNECTED); + sess.getIoAdaptor().onSessionRegistered(sess); + } + } else { + this.register.offer(new Object[]{channel, ops, sess}); + this.selector.wakeup(); + } + } + + public void unregisterSession(Session sess){ + if(this.unregister.contains(sess)){ + return; + } + this.unregister.add(sess); + this.selector.wakeup(); + } + + + @Override + public void interrupt() { + super.interrupt(); + try { + this.selector.close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + + + @Override + public void run() { + try{ + while(!isInterrupted()){ + selector.select(); + handleRegister(); + + Iterator iter = selector.selectedKeys().iterator(); + while(iter.hasNext()){ + SelectionKey key = iter.next(); + iter.remove(); + if(!key.isValid()) continue; + + Object att = key.attachment(); + if(att != null && att instanceof Session){ + ((Session)att).updateLastOperationTime(); + } + try{ + if(key.isAcceptable()){ + handleAcceptEvent(key); + } else if (key.isConnectable()){ + handleConnectEvent(key); + } else if (key.isReadable()){ + handleReadEvent(key); + } else if (key.isWritable()){ + handleWriteEvent(key); + } + } catch(Throwable e){ + disconnectWithException(key, e); + } + } + handleUnregister(); + } + } catch(Throwable e) { + if(!dispatcher.isStarted()){ + if(log.isDebugEnabled()){ + log.debug(e.getMessage(), e); + } + } else { + log.error("SelectorThread exit for fatal error:%s", e); + } + } + } + + private void disconnectWithException(final SelectionKey key, final Throwable e){ + final Session sess = (Session)key.attachment(); + if(sess == null){ + try{ + key.channel().close(); + key.cancel(); + } catch(Throwable ex){ + log.error(e.getMessage(), ex); + } + return; + } + + sess.setStatus(SessionStatus.ON_ERROR); + dispatcher.asyncRun(new Runnable() { + @Override + public void run() { + try{ + sess.getIoAdaptor().onException(e, sess); + } catch (Throwable ex){ + if(!dispatcher.isStarted()){ + log.debug(ex.getMessage(), ex); + } else { + log.error(ex.getMessage(), ex); + } + } + } + }); + + try{ + sess.close(); + key.cancel(); + } catch(Throwable ex){ + log.error(e.getMessage(), ex); + } + + } + + protected void handleRegister(){ + Object[] item = null; + while( (item=this.register.poll()) != null){ + try{ + SelectableChannel channel = (SelectableChannel) item[0]; + if (!channel.isOpen() ) continue; + int ops = (Integer)item[1]; + Session sess = (Session) item[2]; + + SelectionKey key = channel.register(this.selector, ops, sess); + if(sess != null){ + sess.setRegisteredKey(key); + sess.getIoAdaptor().onSessionRegistered(sess); + } + + }catch(Exception e){ + log.error(e.getMessage(), e); + } + } + } + + protected void handleUnregister(){ + Session sess = null; + while( (sess = this.unregister.poll()) != null ){ + try { + sess.close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + } + + + protected void handleAcceptEvent(SelectionKey key) throws IOException{ + ServerSocketChannel server = (ServerSocketChannel) key.channel(); + + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + + SocketAddress serverAddress = server.socket().getLocalSocketAddress(); + if(log.isDebugEnabled()){ + log.debug("ACCEPT: server(%s) %s=>%s", serverAddress, NetKit.remoteAddress(channel), NetKit.localAddress(channel)); + } + + IoAdaptor ioAdaptor = dispatcher.ioAdaptor(serverAddress); + if(ioAdaptor == null){ + log.warn("Missing IoAdaptor for %s", serverAddress); + return; + } + + Session sess = new Session(dispatcher, channel, ioAdaptor); + sess.setStatus(SessionStatus.CONNECTED); //set connected + + sess.getIoAdaptor().onSessionAccepted(sess); + + } + + protected void handleConnectEvent(SelectionKey key) throws IOException{ + final SocketChannel channel = (SocketChannel) key.channel(); + + Session sess = (Session) key.attachment(); + if(sess == null){ + throw new IOException("Session not attached yet to SelectionKey"); + } + + if(channel.finishConnect()){ + sess.finishConnect(); + if(log.isDebugEnabled()){ + log.debug("CONNECT: %s=>%s", NetKit.localAddress(channel), NetKit.remoteAddress(channel)); + } + } + sess.setStatus(SessionStatus.CONNECTED); + key.interestOps(0); //!!!clear interest of OP_CONNECT to avoid looping CPU !!! + sess.getIoAdaptor().onSessionConnected(sess); + + } + + protected void handleReadEvent(SelectionKey key) throws IOException{ + Session sess = (Session) key.attachment(); + if(sess == null){ + throw new IOException("Session not attached yet to SelectionKey"); + } + sess.doRead(); + } + + protected void handleWriteEvent(SelectionKey key) throws IOException{ + Session sess = (Session) key.attachment(); + if(sess == null){ + throw new IOException("Session not attached yet to SelectionKey"); + } + sess.doWrite(); + } + +} diff --git a/src/main/java/org/zbus/net/http/Message.java b/src/main/java/org/zbus/net/http/Message.java index d802d434d725fe984ebe464c7864274a35044d4a..ff8c82e63a2512d7449f28b1c3506508e4d6e172 100644 --- a/src/main/java/org/zbus/net/http/Message.java +++ b/src/main/java/org/zbus/net/http/Message.java @@ -58,11 +58,12 @@ import org.zbus.net.core.IoBuffer; */ public class Message implements Id { private static final Logger log = Logger.getLogger(Message.class); + private static final String DEFAULT_ENCODING = "UTF-8"; + public static final String HEARTBEAT = "heartbeat"; //心跳消息 //使用到的标准HTTP头部 public static final String REMOTE_ADDR = "remote-addr"; - public static final String CONTENT_ENCODING = "content-encoding"; public static final String CONTENT_LENGTH = "content-length"; public static final String CONTENT_TYPE = "content-type"; @@ -78,6 +79,7 @@ public class Message implements Id { public static final String TOPIC = "topic"; //使用,分隔 public static final String ACK = "ack"; public static final String WINDOW = "window"; + public static final String ENCODING = "encoding"; //HTTP协议第一行(请求串或者返回状态码) @@ -89,14 +91,17 @@ public class Message implements Id { public Message(){ setBody((byte[])null); + setHead("connection", "Keep-Alive"); //since 6.3.0/6.2.8 } public Message(String body){ setBody(body); + setHead("connection", "Keep-Alive"); } public Message(byte[] body){ setBody(body); + setHead("connection", "Keep-Alive"); } public static Message copyWithoutBody(Message msg){ @@ -215,8 +220,16 @@ public class Message implements Id { } } - public Message setBody(String body){ - return setBody(body.getBytes()); + public Message setBody(String body){ + String encoding = this.getEncoding(); + if(encoding == null){ + encoding = DEFAULT_ENCODING; + } + try { + return setBody(body.getBytes(encoding)); + } catch (UnsupportedEncodingException e) { //just ignore + return setBody(body.getBytes()); + } } public Message setBody(String format, Object ...args) { @@ -236,7 +249,11 @@ public class Message implements Id { public String getBodyString() { if (this.getBody() == null) return null; - return new String(this.getBody()); + String encoding = this.getEncoding(); + if(encoding == null){ + encoding = DEFAULT_ENCODING; + } + return getBodyString(encoding); } public String getBodyString(String encoding) { @@ -340,10 +357,10 @@ public class Message implements Id { public String getEncoding() { - return this.getHead(CONTENT_ENCODING); + return this.getHead(ENCODING); } public Message setEncoding(String encoding) { - this.setHead(CONTENT_ENCODING, encoding); + this.setHead(ENCODING, encoding); return this; } diff --git a/src/main/java/org/zbus/proxy/DmzClient.java b/src/main/java/org/zbus/proxy/DmzClient.java index 3bb6825c484a3f06c3edd8fa77cda68918759cbc..511ea44926928b07fecef347e7b892aa028e894d 100644 --- a/src/main/java/org/zbus/proxy/DmzClient.java +++ b/src/main/java/org/zbus/proxy/DmzClient.java @@ -122,8 +122,8 @@ public class DmzClient extends Client{ public static void main(String[] args) throws Exception { String dmzDown = ConfigKit.option(args, "-dmzDown", "127.0.0.1:15557"); String dmzNotify = ConfigKit.option(args, "-dmzNotify", "127.0.0.1:15558"); - String target = ConfigKit.option(args, "-target", "10.17.2.30:3306"); - //String target = ConfigKit.option(args, "-target", "10.17.2.30:3306"); +// String target = ConfigKit.option(args, "-target", "10.17.2.30:3306"); + String target = ConfigKit.option(args, "-target", "192.168.0.11:3306"); final Dispatcher dispatcher = new Dispatcher(); diff --git a/src/main/java/org/zbus/proxy/TcpProxyServer.java b/src/main/java/org/zbus/proxy/TcpProxyServer.java index 8bedb27a8da8bffa4310f13a09fe568d9f141f9e..4dcfbde1db0861d3fa05b17351c5805597f7aa3d 100644 --- a/src/main/java/org/zbus/proxy/TcpProxyServer.java +++ b/src/main/java/org/zbus/proxy/TcpProxyServer.java @@ -23,9 +23,13 @@ package org.zbus.proxy; import java.io.IOException; +import java.io.InputStream; import java.nio.channels.SelectionKey; +import java.util.Properties; import org.zbus.kit.ConfigKit; +import org.zbus.kit.FileKit; +import org.zbus.kit.NetKit; import org.zbus.kit.log.Logger; import org.zbus.net.Server; import org.zbus.net.core.Dispatcher; @@ -62,14 +66,35 @@ public class TcpProxyServer extends BindingAdaptor{ dispatcher.registerSession(SelectionKey.OP_CONNECT, target); } - public static void main(String[] args) throws Exception { - int serverPort = ConfigKit.option(args, "-server", 3306); - String target = ConfigKit.option(args, "-target", "10.17.2.30:3306"); - Dispatcher dispatcher = new Dispatcher(); + public static void main(String[] args) throws Exception { + String conf = ConfigKit.option(args, "-conf", "proxy.properties"); - IoAdaptor ioAdaptor = new TcpProxyServer(target); - - final Server server = new Server(dispatcher, ioAdaptor, serverPort); + InputStream inputStream = FileKit.loadFile(conf); + if(inputStream == null){ + log.error("Missing configure file(%s)", conf); + return; + } + Properties props = new Properties(); + props.load(inputStream); + + Dispatcher dispatcher = new Dispatcher(); + final Server server = new Server(dispatcher); + for(Object key : props.keySet()){ + String sKey = (String)key; + if(!sKey.startsWith("tcp.")) continue; + String listenPort = sKey.substring(4); + int port; + try{ + port = Integer.valueOf(listenPort); + }catch(Exception ex){ + continue; + } + String target = props.getProperty(sKey); + IoAdaptor ioAdaptor = new TcpProxyServer(target); + server.registerAdaptor(port, ioAdaptor); + log.info("%s:%d====>%s", NetKit.getLocalIp(), port, target); + } + server.setServerName("TcpProxyServer"); server.start(); diff --git a/src/main/java/org/zbus/rpc/RpcInvoker.java b/src/main/java/org/zbus/rpc/RpcInvoker.java index da7f4b4caabcee8b21b34348bd2207f8b2da9a34..ac1f778ce7775fa7e316d312fb28e8851a94d96f 100644 --- a/src/main/java/org/zbus/rpc/RpcInvoker.java +++ b/src/main/java/org/zbus/rpc/RpcInvoker.java @@ -65,55 +65,44 @@ public class RpcInvoker{ } } - public T invokeSync(Class clazz, String method, Object... args){ + public T invokeSync(Class resultClass, String method, Object... args){ Request request = new Request() .module(module) .method(method) .params(args) .encoding(encoding); - return invokeSync(clazz, request); + return invokeSync(resultClass, request); } - public T invokeSync(Class clazz, String method, Class[] types, Object... args){ + public T invokeSync(Class resultClass, String method, Class[] paramTypes, Object... args){ Request request = new Request() .module(module) .method(method) - .paramTypes(types) + .paramTypes(paramTypes) .params(args) .encoding(encoding); - return invokeSync(clazz, request); - } - - - public T invokeSync(Class clazz, String module, String method, Class[] types, Object... args){ - Request request = new Request() - .module(module) - .method(method) - .paramTypes(types) - .params(args) - .encoding(encoding); - - return invokeSync(clazz, request); - } + return invokeSync(resultClass, request); + } - public T invokeSync(Class clazz, Request request){ + public T invokeSync(Class resultClass, Request request){ Response resp = invokeSync(request); try { @SuppressWarnings("unchecked") - T res = (T)codec.convert(extractResult(resp), clazz); + T res = (T)codec.convert(extractResult(resp), resultClass); return res; } catch (ClassNotFoundException e) { throw new RpcException(e.getMessage(), e.getCause()); } } - public Object invokeSync(String method, Class[] types, Object... args) { - return invokeSync(this.module, method, types, args); - } - public Object invokeSync(String module, String method, Class[] types, Object... args) { + public Object invokeSync(String method, Object... args) { + return invokeSync(method, null, args); + } + + public Object invokeSync(String method, Class[] types, Object... args) { Request req = new Request() .module(module) .method(method) diff --git a/src/main/java/org/zbus/rpc/RpcMethod.java b/src/main/java/org/zbus/rpc/RpcMethod.java new file mode 100644 index 0000000000000000000000000000000000000000..35dac5f28875503f283c6abdbee42357439b9964 --- /dev/null +++ b/src/main/java/org/zbus/rpc/RpcMethod.java @@ -0,0 +1,37 @@ +package org.zbus.rpc; + +import java.util.ArrayList; +import java.util.List; + + +public class RpcMethod { + private List modules = new ArrayList(); + private String name; + private List paramTypes = new ArrayList(); + private String returnType; + + public List getModules() { + return modules; + } + public void setModules(List modules) { + this.modules = modules; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public List getParamTypes() { + return paramTypes; + } + public void setParamTypes(List paramTypes) { + this.paramTypes = paramTypes; + } + public String getReturnType() { + return returnType; + } + public void setReturnType(String returnType) { + this.returnType = returnType; + } +} diff --git a/src/main/java/org/zbus/rpc/RpcProcessor.java b/src/main/java/org/zbus/rpc/RpcProcessor.java index b439204e4ba8837848909988fc2cd5a57a0072fe..081b5a774cbe347024b0cc1522cdead7dcf8a1a3 100644 --- a/src/main/java/org/zbus/rpc/RpcProcessor.java +++ b/src/main/java/org/zbus/rpc/RpcProcessor.java @@ -43,6 +43,12 @@ public class RpcProcessor implements MessageProcessor{ private RpcCodec codec = new JsonRpcCodec(); private Map methods = new HashMap(); + private Map> object2Methods = new HashMap>(); + + public RpcProcessor(){ + + } + public RpcProcessor(Object... services){ addModule(services); } @@ -69,7 +75,7 @@ public class RpcProcessor implements MessageProcessor{ addModule("", obj); addModule(obj.getClass().getSimpleName(), obj); addModule(obj.getClass().getCanonicalName(), obj); - } + } } public void addModule(String module, Object... services){ @@ -78,10 +84,77 @@ public class RpcProcessor implements MessageProcessor{ } } + private void addMoudleInfo(Object service){ + String serviceKey = service.getClass().getCanonicalName(); + if(object2Methods.containsKey(serviceKey)){ + return; + } + List modules = new ArrayList(); + modules.add(""); + for(Class intf : getAllInterfaces(service.getClass())){ + modules.add(intf.getSimpleName()); + modules.add(intf.getCanonicalName()); + } + modules.add(service.getClass().getSimpleName()); + modules.add(service.getClass().getCanonicalName()); + + Method [] methods = service.getClass().getMethods(); + List rpcMethods = new ArrayList(); + object2Methods.put(serviceKey,rpcMethods); + for (Method m : methods) { + String method = m.getName(); + Remote cmd = m.getAnnotation(Remote.class); + if(cmd != null){ + method = cmd.id(); + if(cmd.exclude()) continue; + if("".equals(method)){ + method = m.getName(); + } + } + RpcMethod rpcm = new RpcMethod(); + rpcm.setModules(modules); + rpcm.setName(method); + rpcm.setReturnType(m.getReturnType().getCanonicalName()); + List paramTypes = new ArrayList(); + for(Class t : m.getParameterTypes()){ + paramTypes.add(t.getCanonicalName()); + } + rpcm.setParamTypes(paramTypes); + rpcMethods.add(rpcm); + } + } + + private void removeMoudleInfo(Object service){ + String serviceKey = service.getClass().getCanonicalName(); + object2Methods.remove(serviceKey); + } + + public void removeModule(Object... services){ + for(Object obj : services){ + for(Class intf : getAllInterfaces(obj.getClass())){ + removeModule(intf.getSimpleName(), obj); + removeModule(intf.getCanonicalName(), obj); + } + removeModule("", obj); + removeModule(obj.getClass().getSimpleName(), obj); + removeModule(obj.getClass().getCanonicalName(), obj); + } + } + + public void removeModule(String module, Object... services){ + for(Object service: services){ + this.removeCommandTable(module, service); + } + } + private void initCommandTable(String module, Object service){ + addMoudleInfo(service); + try { Method [] methods = service.getClass().getMethods(); for (Method m : methods) { + if(m.getDeclaringClass() == Object.class) continue; + String method = m.getName(); Remote cmd = m.getAnnotation(Remote.class); if(cmd != null){ @@ -117,18 +190,44 @@ public class RpcProcessor implements MessageProcessor{ } } + private void removeCommandTable(String module, Object service){ + removeMoudleInfo(service); + + try { + Method [] methods = service.getClass().getMethods(); + for (Method m : methods) { + String method = m.getName(); + Remote cmd = m.getAnnotation(Remote.class); + if(cmd != null){ + method = cmd.id(); + if(cmd.exclude()) continue; + if("".equals(method)){ + method = m.getName(); + } + } + + String paramMD5 = ""; + Class[] paramTypes = m.getParameterTypes(); + StringBuilder sb = new StringBuilder(); + for(int i=0;i[] targetParamTypes = target.method.getParameterTypes(); - Object[] invokeParams = new Object[targetParamTypes.length]; - Object[] reqParams = req.getParams(); - for(int i=0; i[] targetParamTypes = target.method.getParameterTypes(); + Object[] invokeParams = new Object[targetParamTypes.length]; + Object[] reqParams = req.getParams(); + for(int i=0; i 0) { setBroker(brokers[0]); } - mode = MqMode.intValue(MqMode.MQ, MqMode.Memory); + mode = MqMode.intValue(MqMode.MQ, MqMode.RPC); } public void setBrokers(Broker[] brokers) { diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000000000000000000000000000000000000..c47f90151c88d02592723412e1131ab6e453fe09 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,20 @@ + + + + + + + %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/resources/proxy.properties b/src/main/resources/proxy.properties new file mode 100644 index 0000000000000000000000000000000000000000..cb66b5833f8e5fab3897582d6cd3951e57912c5b --- /dev/null +++ b/src/main/resources/proxy.properties @@ -0,0 +1,2 @@ +tcp.80=127.0.0.1:15555 +tcp.8080=127.0.0.1:15555 \ No newline at end of file diff --git a/src/main/resources/zbus.htm b/src/main/resources/zbus.htm index 11b4c7af919f450b681a8895d04b8df2e812e7c8..489668cebdcda99315575e7ecb21a6938fdfcb50 100644 --- a/src/main/resources/zbus.htm +++ b/src/main/resources/zbus.htm @@ -2,8 +2,8 @@ -ZBUS Admin Console - +ZBUS Monitor +