diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..ed086eeaa4f1a80fbbd2dc7363fb347a53367b9c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,8 @@
+.idea
+*.iml
+/target
+/store
+/store2
+/zbus-dist/store
+/zbus-dist/nohup.out
+/mq
diff --git a/ChangeLog b/ChangeLog
new file mode 100644
index 0000000000000000000000000000000000000000..6a5ffb073acd0069320e4056375ebb10d3e3ccd6
--- /dev/null
+++ b/ChangeLog
@@ -0,0 +1,17 @@
+zbus-6.2.0 发布,整个项目模块化重构,改进内容:
+1. MQ默认直接采用持久化。MQ速度在SSD磁盘上可达200M+/s,数十亿级消息堆积测试(100G+磁盘)
+2. RPC支持分为直接RPC与基于MQ的RPC,直接RPC性能优于基于MQ的RPC。
+3. Broker独立模型重构,支持HA高可用,不只面向MQ工作,可以方便支持直接RPC的负载均衡。
+4. 添加Thrfit协议接入(RPC),整体上支持HTTP短链接,Extend HTTP的TCP长连接,Thrift客户端接入。
+5. 消息通讯基础zbus.NET保持小、模块化(~50K),方便个性化协议扩展
+6. 添加支持普通TCP代理,DMZ网络安全结构的的TCP代理
+7. 提供大量示例,包括MQ,PubSub,RPC,Proxy,Thrift,Simple HTTP等。
+
+
+zbus-6.3.0 changelog
+1. fix net.Client close, disable disconnected handler when close actively.
+2. "content-encoding" in HTTP changed to "encoding" (extend HTTP) for
+ compatibility to standard HTTP when using browser.
+3. 增加支持不指定随机端口启动Server
+4. 修复Rpc客户端运行中失败引起Sevice响应实例清除的bug
+5. zbus重启,service快速注册返回,InvokingClient增加Sync清除
\ No newline at end of file
diff --git a/Readme.md b/README.md
similarity index 94%
rename from Readme.md
rename to README.md
index 3c3054a8ddd9e82917c799e6910db6a1449009b2..9bfa560eec228451994bbcaecadbfd65787f0d14 100644
--- a/Readme.md
+++ b/README.md
@@ -4,13 +4,13 @@
##ZBUS = MQ + RPC + PROXY
-* **支持消息队列, 发布订阅, RPC, 代理(TCP/HTTP/DMZ)**
+* **支持消息队列, 发布订阅, RPC, 代理(TCP/DMZ)**
* **亿级消息堆积能力、支持HA高可用**
* **单个Jar包无依赖 ~300K**
* **服务代理 -- 适配改造已有业务系统,使之具备跨平台与语言**
* **丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入**
-## QQ群: 467741880
+## QQ群: 467741880, 如果喜欢该项目,请右上角star,让更多人参与改进
@@ -70,7 +70,7 @@ ZBUS项目不依赖其他第三方库,消息通讯基于NIO完成(NET子项
org.zbus
zbus
- 6.2.0
+ 6.2.6
### 生产者
diff --git a/ToDo b/ToDo
new file mode 100644
index 0000000000000000000000000000000000000000..4dafb2cfa29a24fa577ce78330ffb58e0a96af47
--- /dev/null
+++ b/ToDo
@@ -0,0 +1,37 @@
+支持消息队列复制(不同业务维度消费同一个消息)
+
+Index与Block分离
+MyMQ.idx
+readNum
+readPos
+readCount
+
+MyMQ.idx
+MyMQ.fork
+
+
+
+sub-index-id,二级指针
+
+支持zbus内部事件消息通知PubSub
+
+增加直接返回网络测试 :
+http://localhost:15555/test
+
+增加RPC请求的简单URL解释支持:
+http://localhost:15555/MyRpc/echo?hong
+
+增加监控单个服务的详细信息
+http://localhost:15555/MyRpc
+
+/MyRpc/plus?1,2
+/MyRpc/plus?params=1,2&&module=xxx
+/mq=MyRpc&&method=plus&¶ms=1,2
+
+
+/
+/?cmd=admin&&sub_cmd=data
+/?cmd=admin&&sub_cmd=jquery
+/?cmd=test
+
+
diff --git a/doc/IoAdaptor.png b/doc/IoAdaptor.png
new file mode 100644
index 0000000000000000000000000000000000000000..229374cb6c6e5ee85a068c3803359e8c54375656
Binary files /dev/null and b/doc/IoAdaptor.png differ
diff --git a/doc/tcp_proxy_example.png b/doc/tcp_proxy_example.png
new file mode 100644
index 0000000000000000000000000000000000000000..7ee821e5472a074e4eca104d65f209a9ab60ced2
Binary files /dev/null and b/doc/tcp_proxy_example.png differ
diff --git "a/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\270\200\357\274\211.doc" "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\270\200\357\274\211.doc"
new file mode 100644
index 0000000000000000000000000000000000000000..bd8fe00cdd3cbf4775c032360cd1403391c0cb53
Binary files /dev/null and "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\270\200\357\274\211.doc" differ
diff --git "a/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\272\214\357\274\211.doc" "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\272\214\357\274\211.doc"
new file mode 100644
index 0000000000000000000000000000000000000000..975a3790d45ef0a19bcdc75be5883f0149e88b10
Binary files /dev/null and "b/doc/zbus.NET\351\200\232\350\256\257\346\250\241\345\235\227\357\274\210\344\272\214\357\274\211.doc" differ
diff --git a/license.txt b/license.txt
deleted file mode 100644
index b047275612d06b86b318eb4c63cce981e7fd8e68..0000000000000000000000000000000000000000
--- a/license.txt
+++ /dev/null
@@ -1,21 +0,0 @@
-The MIT License (MIT)
-
-Copyright (c) 2009-2015 HONG LEIMING
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 53bffe193d67301ca60112ca4c3377bb4368cf1d..eda596928e7a96383e14c433475e4f312c3b41f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,177 +1,223 @@
-
- 4.0.0
- jar
- org.zbus
- zbus
- 6.2.0
- zbus
- lightweight MQ, RPC, PROXY
- https://zbus.org
-
-
-
-
- MIT License
- http://www.opensource.org/licenses/mit-license.php
- repo
-
-
-
-
-
- rushmore
- 44194462@qq.com
-
-
-
-
- scm:git@git.oschina.net:rushmore/zbus.git
- scm:git@git.oschina.net:rushmore/zbus.git
- git@git.oschina.net:rushmore/zbus.git
-
-
-
- UTF-8
-
-
-
-
- oss
- https://oss.sonatype.org/content/repositories/snapshots/
-
-
-
-
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ 4.0.0
+ com.jingcai.apps
+ zbus
+ 1.1-RELEASE
+ jar
+ zbus
+ lightweight MQ, RPC, PROXY
+ https://zbus.org
+
+
+
+
+ MIT License
+ http://www.opensource.org/licenses/mit-license.php
+ repo
+
+
+
+
+ scm:git:git@192.168.0.20:java/zbus.git
+ scm:git:git@192.168.0.20:java/zbus.git
+ http://192.168.0.20/java/zbus
+
+
+
+ UTF-8
+
+
+
+
+
+
+
+ org.apache.commons
+ commons-pool2
+ 2.2
+ provided
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.3
+ provided
+
+
- org.apache.commons
- commons-pool2
- 2.2
+ org.slf4j
+ slf4j-api
+ 1.7.5
provided
+
- com.alibaba
- fastjson
- 1.2.3
+ org.slf4j
+ slf4j-log4j12
+ 1.7.5
provided
-
+
log4j
log4j
- 1.2.17
+ 1.2.16
provided
-
- org.springframework
- spring
- 2.5.6
- true
- test
-
-
- org.apache.httpcomponents
- httpclient
- 4.4
- test
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.1
-
- 1.6
- 1.6
- UTF8
-
-
-
-
-
-
-
-
- release
-
-
-
- src/main/resources
-
- *.properties
- *.xml
-
- false
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-source-plugin
- 2.2.1
-
-
- package
-
- jar-no-fork
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-javadoc-plugin
- 2.9.1
-
-
- package
-
- jar
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-gpg-plugin
- 1.5
-
-
- verify
-
- sign
-
-
-
-
-
-
-
-
- oss
- https://oss.sonatype.org/content/repositories/snapshots/
-
-
- oss
- https://oss.sonatype.org/service/local/staging/deploy/maven2/
-
-
-
-
+
+
+ org.springframework
+ spring-context
+ 3.2.8.RELEASE
+ true
+ test
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.4
+ test
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+ 1.6
+ 1.6
+ UTF-8
+
+
+
+
+
+
+
+
+ release
+
+
+
+ src/main/resources
+
+ *.properties
+ *.xml
+
+ false
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 2.2.1
+
+
+ package
+
+ jar-no-fork
+
+
+
+
+
+
+
+
+
+
+ dajiang_snapshots
+ http://djip:8081/content/repositories/dajiang_snapshots/
+
+
+ dajiang_release
+ http://djip:8081/content/repositories/dajiang_release/
+
+
+
+
diff --git a/src/main/java/org/zbus/broker/BrokerConfig.java b/src/main/java/org/zbus/broker/BrokerConfig.java
index 77b9334ea25ed862fa29f6cb7c5bec8602f4f1e6..4c9947baaa1d59d5210bb5faf6bbbdb722f0d3b4 100644
--- a/src/main/java/org/zbus/broker/BrokerConfig.java
+++ b/src/main/java/org/zbus/broker/BrokerConfig.java
@@ -50,8 +50,8 @@ import org.zbus.net.core.Dispatcher;
public class BrokerConfig extends PoolConfig{
private String trackServerList = "127.0.0.1:16666"; //只在HA模式下才有效
private String serverAddress = "127.0.0.1:15555";
- private int selectorCount = 1;
- private int executorCount = 64;
+ private int selectorCount = 0; //0代表使用默认值
+ private int executorCount = 0; //0代表使用默认值
/**
* 可选项
* 如果配置不给出,Dispatcher内部生成,并自己管理关闭
diff --git a/src/main/java/org/zbus/broker/SingleBroker.java b/src/main/java/org/zbus/broker/SingleBroker.java
index 50bf12ab9cc4803d438725d8f1b224abea39abc9..52a23b1e51887a1882bacb86de6b526b94d79102 100644
--- a/src/main/java/org/zbus/broker/SingleBroker.java
+++ b/src/main/java/org/zbus/broker/SingleBroker.java
@@ -49,6 +49,8 @@ public class SingleBroker implements Broker {
if(config.getDispatcher() == null){
this.ownDispatcher = true;
this.dispatcher = new Dispatcher();
+ this.dispatcher.selectorCount(config.getSelectorCount());
+ this.dispatcher.executorCount(config.getExecutorCount());
this.config.setDispatcher(dispatcher);
} else {
this.dispatcher = config.getDispatcher();
diff --git a/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java b/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java
index c306f066a2189ee14b2c8879f4a3d87c36b4610a..c50f7308b82455d63ff06fe53f8159b78963ac4c 100644
--- a/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java
+++ b/src/main/java/org/zbus/broker/ha/DefaultBrokerSelector.java
@@ -166,9 +166,13 @@ public class DefaultBrokerSelector implements BrokerSelector{
if(se.consumerCount > 0) activeCount++;
}
if(activeCount == 0){
- activeCount = allBrokers.size();
+ activeCount = serverList.consumerFirstList.size();
}
- int idx = localIpHashCode%activeCount;
+ if(activeCount == 0){
+ return null;
+ }
+
+ int idx = localIpHashCode%activeCount;
ServerEntry se = serverList.consumerFirstList.get(idx);
if(se != null){
broker = getBroker(se.serverAddr);
diff --git a/src/main/java/org/zbus/broker/ha/ServerEntryTable.java b/src/main/java/org/zbus/broker/ha/ServerEntryTable.java
index 40710a6db9b305382bed945a026e8a5f9438232a..2b3a3620b5f7ff4fd58e0d2d2ae634ea3243fe32 100644
--- a/src/main/java/org/zbus/broker/ha/ServerEntryTable.java
+++ b/src/main/java/org/zbus/broker/ha/ServerEntryTable.java
@@ -207,10 +207,20 @@ public class ServerEntryTable implements Closeable{
public void setVerbose(boolean verbose) {
this.verbose = verbose;
+ }
+
+ public Map getEntry2ServerList() {
+ return entry2ServerList;
+ }
+
+ public Map> getServer2EntryList() {
+ return server2EntryList;
}
+
+
public static class ServerList implements Iterable{
public final String entryId;
public Map serverTable = new ConcurrentHashMap();
diff --git a/src/main/java/org/zbus/broker/ha/TrackSub.java b/src/main/java/org/zbus/broker/ha/TrackSub.java
index c0b0a0426782ccf4bb930a3a3bd73c8be6693fcf..0970422bcdccb97770100c184fa3b5f0454b931b 100644
--- a/src/main/java/org/zbus/broker/ha/TrackSub.java
+++ b/src/main/java/org/zbus/broker/ha/TrackSub.java
@@ -193,7 +193,7 @@ public class TrackSub implements Closeable{
@Override
public void close() throws IOException {
- for(MessageClient client : allTrackers){
+ for(MessageClient client : allTrackers){
client.close();
}
allTrackers.clear();
diff --git a/src/main/java/org/zbus/kit/NetKit.java b/src/main/java/org/zbus/kit/NetKit.java
index 2fb4f578a2e98637090affdda152e5acb023500c..6979e61dcdd9a505554aafb81a4ae86ec238073f 100644
--- a/src/main/java/org/zbus/kit/NetKit.java
+++ b/src/main/java/org/zbus/kit/NetKit.java
@@ -41,9 +41,7 @@ public class NetKit {
int port = 80;
if(blocks.length > 1){
port = Integer.valueOf(blocks[1]);
- } else {
- address += ":"+port; //use default 80
- }
+ }
String serverAddr = String.format("%s:%d", host, port);
return serverAddr;
}
@@ -62,30 +60,70 @@ public class NetKit {
return address;
}
- public static String getLocalIp() {
+ private static int matchedIndex(String ip, String[] prefix){
+ for(int i=0; i ]+");
try {
- Pattern pattern = Pattern.compile("(192|172|10)\\.[0-9]+\\.[0-9]+\\.[0-9]+");
+ Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
-
+ String matchedIp = null;
+ int matchedIdx = -1;
while (interfaces.hasMoreElements()) {
NetworkInterface ni = interfaces.nextElement();
- Enumeration en = ni.getInetAddresses();
+ Enumeration en = ni.getInetAddresses();
while (en.hasMoreElements()) {
InetAddress addr = en.nextElement();
- String ip = addr.getHostAddress();
+ String ip = addr.getHostAddress();
Matcher matcher = pattern.matcher(ip);
- if (matcher.matches()) {
- return ip;
- }
- }
+ if (matcher.matches()) {
+ int idx = matchedIndex(ip, prefix);
+ if(idx == -1) continue;
+ if(matchedIdx == -1){
+ matchedIdx = idx;
+ matchedIp = ip;
+ } else {
+ if(matchedIdx>idx){
+ matchedIdx = idx;
+ matchedIp = ip;
+ }
+ }
+ }
+ }
}
- return "0.0.0.0";
- } catch (Throwable e) {
- e.printStackTrace();
- return "0.0.0.0";
+ if(matchedIp != null) return matchedIp;
+ return "127.0.0.1";
+ } catch (Throwable e) {
+ return "127.0.0.1";
}
}
-
+
+ public static String getLocalIp() {
+ return getLocalIp("*>10>172>192>127");
+ }
+
public static String remoteAddress(SocketChannel channel){
SocketAddress addr = channel.socket().getRemoteSocketAddress();
String res = String.format("%s", addr);
@@ -97,4 +135,10 @@ public class NetKit {
String res = String.format("%s", addr);
return addr==null? res: res.substring(1);
}
+
+ public static void main(String[] args){
+ String ip = getLocalIp("*>192>10");
+ System.out.println(ip);
+ System.out.println(getLocalIp());
+ }
}
diff --git a/src/main/java/org/zbus/kit/log/Logger.java b/src/main/java/org/zbus/kit/log/Logger.java
index 7d04baad351a874347951864631a21baba3685f1..63271c9d55526fe727299b00be1003f8d6d214c5 100644
--- a/src/main/java/org/zbus/kit/log/Logger.java
+++ b/src/main/java/org/zbus/kit/log/Logger.java
@@ -49,13 +49,28 @@ public abstract class Logger {
if (factory != null){
return ;
}
- String defaultFactory = String.format("%s.impl.Log4jLoggerFactory", Logger.class.getPackage().getName());
+
try {
//default to Log4j
Class.forName("org.apache.log4j.Logger");
+ String defaultFactory = String.format("%s.impl.Log4jLoggerFactory", Logger.class.getPackage().getName());
Class> factoryClass = Class.forName(defaultFactory);
factory = (LoggerFactory)factoryClass.newInstance();
+ return;
+ } catch (Exception e) {
+ }
+
+ try {
+ //try slf4j
+ Class.forName("org.slf4j.Logger");
+ String defaultFactory = String.format("%s.impl.Sl4jLoggerFactory", Logger.class.getPackage().getName());
+ Class> factoryClass = Class.forName(defaultFactory);
+ factory = (LoggerFactory)factoryClass.newInstance();
+ return;
} catch (Exception e) {
+ }
+
+ if(factory == null){
factory = new JdkLoggerFactory();
}
}
diff --git a/src/main/java/org/zbus/kit/log/impl/Sl4jLoggerFactory.java b/src/main/java/org/zbus/kit/log/impl/Sl4jLoggerFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..4751585c0d8ce7cf168ca9255a2c9cd486eebfec
--- /dev/null
+++ b/src/main/java/org/zbus/kit/log/impl/Sl4jLoggerFactory.java
@@ -0,0 +1,137 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2009-2015 HONG LEIMING
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package org.zbus.kit.log.impl;
+
+
+import org.slf4j.spi.LocationAwareLogger;
+import org.zbus.kit.log.Logger;
+import org.zbus.kit.log.LoggerFactory;
+
+public class Sl4jLoggerFactory implements LoggerFactory {
+
+ public Logger getLogger(Class> clazz) {
+ return new Sl4jLogger(clazz);
+ }
+
+ public Logger getLogger(String name) {
+ return new Sl4jLogger(name);
+ }
+
+ public static void main(String[] args){
+ Logger log = new Sl4jLogger(Sl4jLoggerFactory.class);
+ log.info("test");
+ }
+}
+
+class Sl4jLogger extends Logger {
+ private org.slf4j.Logger log;
+ private final String FQCN = Sl4jLogger.class.getName();
+
+ Sl4jLogger(Class> clazz) {
+ log = org.slf4j.LoggerFactory.getLogger(clazz);
+ }
+
+ Sl4jLogger(String name) {
+ log = org.slf4j.LoggerFactory.getLogger(name);
+ }
+
+ public void info(String message) {
+ info(message, (Throwable)null);
+ }
+
+ public void info(String message, Throwable t) {
+ if (log instanceof LocationAwareLogger) {
+ ((LocationAwareLogger) log).log(null, FQCN,
+ LocationAwareLogger.INFO_INT, message, null, t);
+ } else {
+ log.info(message, t);
+ }
+ }
+
+ public void debug(String message) {
+ debug(message, (Throwable)null);
+ }
+
+ public void debug(String message, Throwable t) {
+ if (log instanceof LocationAwareLogger) {
+ ((LocationAwareLogger) log).log(null, FQCN,
+ LocationAwareLogger.DEBUG_INT, message, null, t);
+ } else {
+ log.debug(message, t);
+ }
+ }
+
+ public void warn(String message) {
+ warn(message, (Throwable)null);
+ }
+
+ public void warn(String message, Throwable t) {
+ if (log instanceof LocationAwareLogger) {
+ ((LocationAwareLogger) log).log(null, FQCN,
+ LocationAwareLogger.WARN_INT, message, null, t);
+ } else {
+ log.warn(message);
+ }
+ }
+
+ public void error(String message) {
+ error(message, (Throwable)null);
+ }
+
+ public void error(String message, Throwable t) {
+ if (log instanceof LocationAwareLogger) {
+ ((LocationAwareLogger) log).log(null, FQCN,
+ LocationAwareLogger.ERROR_INT, message, null, t);
+ } else {
+ log.error(message);
+ }
+ }
+
+ public void fatal(String message) {
+ error(message);
+ }
+
+ public void fatal(String message, Throwable t) {
+ error(message, t);
+ }
+
+ public boolean isDebugEnabled() {
+ return log.isDebugEnabled();
+ }
+
+ public boolean isInfoEnabled() {
+ return log.isInfoEnabled();
+ }
+
+ public boolean isWarnEnabled() {
+ return log.isWarnEnabled();
+ }
+
+ public boolean isErrorEnabled() {
+ return log.isErrorEnabled();
+ }
+
+ public boolean isFatalEnabled() {
+ return log.isErrorEnabled();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/zbus/kit/pool/PoolConfig.java b/src/main/java/org/zbus/kit/pool/PoolConfig.java
index 0a3c5548564011bfe1823a2f73439403aee20d0b..3d13f79904bed0375740be85de402dd75243eaf1 100644
--- a/src/main/java/org/zbus/kit/pool/PoolConfig.java
+++ b/src/main/java/org/zbus/kit/pool/PoolConfig.java
@@ -23,9 +23,9 @@
package org.zbus.kit.pool;
public class PoolConfig implements Cloneable{
- private int maxTotal = 8;
- private int maxIdle = 8;
- private int minIdle = 0;
+ private int maxTotal = 64;
+ private int maxIdle = 64;
+ private int minIdle = 64;
private long minEvictableIdleTimeMillis = 1000L * 60L * 30L;
private Object support;
diff --git a/src/main/java/org/zbus/mq/Consumer.java b/src/main/java/org/zbus/mq/Consumer.java
index 8221a60908556512de93ed7cf2a2a64827d2b69f..8c8439b558f017ac3359a2cc918fe110df38949e 100644
--- a/src/main/java/org/zbus/mq/Consumer.java
+++ b/src/main/java/org/zbus/mq/Consumer.java
@@ -38,7 +38,7 @@ public class Consumer extends MqAdmin implements Closeable {
private MessageClient client; // 消费者拥有一个物理链接
private String topic = null; // 为发布订阅者的主题,当Consumer的模式为发布订阅时候起作用
- private int consumeTimeout = 10000; //ms
+ private int consumeTimeout = 300000; //5 minutes
public Consumer(Broker broker, String mq, MqMode... mode) {
super(broker, mq, mode);
}
@@ -52,7 +52,7 @@ public class Consumer extends MqAdmin implements Closeable {
if(this.client == null){
synchronized (this) {
if(this.client == null){
- this.client = broker.getClient(brokerHint());
+ this.client = broker.getClient(brokerHint());
}
}
}
@@ -64,6 +64,7 @@ public class Consumer extends MqAdmin implements Closeable {
Message req = new Message();
req.setCmd(Protocol.Consume);
req.setMq(mq);
+ req.setHead("token", accessToken);
if (MqMode.isEnabled(this.mode, MqMode.PubSub)) {
if (this.topic != null) {
req.setTopic(this.topic);
@@ -73,16 +74,18 @@ public class Consumer extends MqAdmin implements Closeable {
Message res = null;
try {
res = client.invokeSync(req, timeout);
- if (res != null && res.isStatus404()) {
+ if(res == null) return res;
+ res.setId(res.getRawId());
+ res.removeHead(Message.RAWID);
+ if(res.isStatus200()) return res;
+
+ if(res.isStatus404()) {
if (!this.createMQ()) {
- throw new IllegalStateException("register error");
+ throw new MqException(res.getBodyString());
}
return recv(timeout);
}
- if(res != null){
- res.setId(res.getRawId());
- res.removeHead(Message.RAWID);
- }
+ throw new MqException(res.getBodyString());
} catch (ClosedByInterruptException e) {
throw new InterruptedException(e.getMessage());
} catch (IOException e) {
@@ -105,7 +108,7 @@ public class Consumer extends MqAdmin implements Closeable {
}
@Override
- protected Message invokeCreateMQ(Message req) throws IOException,
+ protected Message invokeSync(Message req) throws IOException,
InterruptedException {
ensureClient();
return client.invokeSync(req);
diff --git a/src/main/java/org/zbus/mq/MqAdmin.java b/src/main/java/org/zbus/mq/MqAdmin.java
index 6cb81a147f38d596407de3a33a5db3dac618154c..bfbbb0df97460305f9fd09f46d8f87e5ac9c263e 100644
--- a/src/main/java/org/zbus/mq/MqAdmin.java
+++ b/src/main/java/org/zbus/mq/MqAdmin.java
@@ -34,6 +34,8 @@ public class MqAdmin{
protected final Broker broker;
protected String mq; //队列唯一性标识
protected final int mode;
+ protected String accessToken = "";
+ protected String registerToken = "";
public MqAdmin(Broker broker, String mq, MqMode... mode){
this.broker = broker;
@@ -49,6 +51,8 @@ public class MqAdmin{
this.broker = config.getBroker();
this.mq = config.getMq();
this.mode = config.getMode();
+ this.accessToken = config.getAccessToken();
+ this.registerToken = config.getRegisterToken();
}
protected BrokerHint brokerHint(){
@@ -63,17 +67,29 @@ public class MqAdmin{
* @return
* @throws IOException
*/
- protected Message invokeCreateMQ(Message req) throws IOException, InterruptedException{
+ protected Message invokeSync(Message req) throws IOException, InterruptedException{
return broker.invokeSync(req, 2500);
}
+
+ public Message queryMQ() throws IOException, InterruptedException{
+ Message req = new Message();
+ req.setCmd(Protocol.Query);
+ req.setMq(this.mq);
+ req.setHead("register_token", registerToken);
+ req.setHead("access_token", accessToken);
+
+ return invokeSync(req);
+ }
public boolean createMQ() throws IOException, InterruptedException{
Message req = new Message();
req.setCmd(Protocol.CreateMQ);
req.setHead("mq_name", mq);
- req.setHead("mq_mode", "" + mode);
+ req.setHead("mq_mode", "" + mode);
+ req.setHead("register_token", registerToken);
+ req.setHead("access_token", accessToken);
- Message res = invokeCreateMQ(req);
+ Message res = invokeSync(req);
if(res == null) return false;
return res.isStatus200();
}
diff --git a/src/main/java/org/zbus/mq/MqConfig.java b/src/main/java/org/zbus/mq/MqConfig.java
index c6affd53f568691e53865213960b6cd9dde59b22..b4d1289f16d0aa25f184bd0fc48001d6e7c12e58 100644
--- a/src/main/java/org/zbus/mq/MqConfig.java
+++ b/src/main/java/org/zbus/mq/MqConfig.java
@@ -30,7 +30,10 @@ public class MqConfig implements Cloneable {
protected String mq; //MQ标识,必须设置
protected int mode = MqMode.MQ.intValue(); //创建消息队列时采用到
protected String topic = null; //发布订阅模式下使用
- private boolean verbose = false;
+ protected boolean verbose = false;
+ protected String registerToken = "";
+ protected String accessToken = "";
+
public Broker getBroker() {
return broker;
@@ -76,6 +79,24 @@ public class MqConfig implements Cloneable {
this.verbose = verbose;
}
+
+
+ public String getRegisterToken() {
+ return registerToken;
+ }
+
+ public void setRegisterToken(String registerToken) {
+ this.registerToken = registerToken;
+ }
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public void setAccessToken(String accessToken) {
+ this.accessToken = accessToken;
+ }
+
@Override
public MqConfig clone() {
try {
diff --git a/src/main/java/org/zbus/mq/Producer.java b/src/main/java/org/zbus/mq/Producer.java
index 242ea8ed4e59e761634595c87a8d31f0854d8a73..4c23ccf83436793d6fc063ded8558d7574796fb8 100644
--- a/src/main/java/org/zbus/mq/Producer.java
+++ b/src/main/java/org/zbus/mq/Producer.java
@@ -1,68 +1,82 @@
-/**
- * The MIT License (MIT)
- * Copyright (c) 2009-2015 HONG LEIMING
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-package org.zbus.mq;
-
-import java.io.IOException;
-
-import org.zbus.broker.Broker;
-import org.zbus.mq.Protocol.MqMode;
-import org.zbus.net.Sync.ResultCallback;
-import org.zbus.net.http.Message;
-
-public class Producer extends MqAdmin{
-
- public Producer(Broker broker, String mq, MqMode... mode) {
- super(broker, mq, mode);
- }
-
- public Producer(MqConfig config){
- super(config);
- }
-
- public void sendAsync(Message msg, final ResultCallback callback)
- throws IOException {
- msg.setCmd(Protocol.Produce);
- msg.setMq(this.mq);
- msg.setAck(true);
-
- broker.invokeAsync(msg, callback);
- }
-
- public void sendAsync(Message msg) throws IOException {
- sendAsync(msg, null);
- }
-
-
- public Message sendSync(Message msg, int timeout) throws IOException, InterruptedException{
- msg.setCmd(Protocol.Produce);
- msg.setMq(this.mq);
- msg.setAck(true);
-
- return broker.invokeSync(msg, timeout);
- }
-
- public Message sendSync(Message msg) throws IOException, InterruptedException{
- return sendSync(msg, 10000);
- }
-
-}
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2009-2015 HONG LEIMING
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package org.zbus.mq;
+
+import java.io.IOException;
+
+import org.zbus.broker.Broker;
+import org.zbus.mq.Protocol.MqMode;
+import org.zbus.net.Sync.ResultCallback;
+import org.zbus.net.http.Message;
+import org.zbus.net.http.Message.MessageInvoker;
+
+public class Producer extends MqAdmin implements MessageInvoker{
+
+ public Producer(Broker broker, String mq, MqMode... mode) {
+ super(broker, mq, mode);
+ }
+
+ public Producer(MqConfig config){
+ super(config);
+ }
+
+ public void sendAsync(Message msg, final ResultCallback callback)
+ throws IOException {
+ invokeAsync(msg, callback);
+ }
+
+ public void sendAsync(Message msg) throws IOException {
+ sendAsync(msg, null);
+ }
+
+
+ public Message sendSync(Message msg, int timeout) throws IOException, InterruptedException{
+ return invokeSync(msg, timeout);
+ }
+
+ public Message sendSync(Message msg) throws IOException, InterruptedException{
+ return sendSync(msg, 10000);
+ }
+
+
+ @Override
+ public Message invokeSync(Message msg, int timeout) throws IOException,
+ InterruptedException {
+ msg.setCmd(Protocol.Produce);
+ msg.setMq(this.mq);
+ msg.setAck(true);
+
+ return broker.invokeSync(msg, timeout);
+ }
+
+ @Override
+ public void invokeAsync(Message msg, ResultCallback callback)
+ throws IOException {
+ msg.setCmd(Protocol.Produce);
+ msg.setMq(this.mq);
+ msg.setAck(true);
+
+ broker.invokeAsync(msg, callback);
+ }
+
+}
diff --git a/src/main/java/org/zbus/mq/Protocol.java b/src/main/java/org/zbus/mq/Protocol.java
index 5e7b67f492ad11015a89f5ed8d955a1ac7b3034b..5de0f944da43743607b66e6068ed5ac42a03e11e 100644
--- a/src/main/java/org/zbus/mq/Protocol.java
+++ b/src/main/java/org/zbus/mq/Protocol.java
@@ -34,14 +34,20 @@ public class Protocol {
public static final String Consume = "consume"; //消费消息
public static final String Route = "route"; //请求等待应答消息
public static final String CreateMQ = "create_mq"; //创建队列
- public static final String Auth = "auth";
+ public static final String Auth = "auth";
+ public static final String Query = "query";
+ public static final String Test = "test";
+
+
+ public static final String Data = "data";
+ public static final String Jquery = "jquery";
- public static final String Admin = "admin"; //管理类消息
public static enum MqMode {
MQ, //消息队列
PubSub, //发布订阅
- Memory; //是否临时
+ Memory, //是否临时
+ RPC;
private MqMode(){
mask = (1 << ordinal());
diff --git a/src/main/java/org/zbus/mq/server/AbstractMQ.java b/src/main/java/org/zbus/mq/server/AbstractMQ.java
index 4f6a9cd36f121f9d1dc5022afbc8bca8942a20be..417b0b73732d308c9065ffbd5b97fd3275daedf3 100644
--- a/src/main/java/org/zbus/mq/server/AbstractMQ.java
+++ b/src/main/java/org/zbus/mq/server/AbstractMQ.java
@@ -32,9 +32,13 @@ import org.zbus.net.http.Message;
public abstract class AbstractMQ{
protected final String name;
protected String creator;
+ protected int mode;
protected long lastUpdateTime = System.currentTimeMillis();
protected final AbstractQueue msgQ;
+ protected String accessToken = "";
+
+ protected Auth auth;
public AbstractMQ(String name, AbstractQueue msgQ) {
this.msgQ = msgQ;
@@ -59,4 +63,26 @@ public abstract class AbstractMQ{
public abstract void cleanSession();
public abstract MqInfo getMqInfo();
+
+ public void setAccessToken(String accessToken){
+ this.accessToken = accessToken;
+ this.auth = new DefaultAuth(this.accessToken);
+ }
+
+ public void setAuth(Auth auth){
+ this.auth = auth;
+ }
+
+ public boolean auth(String appid, String token){
+ if(this.auth == null) return true;
+ return this.auth.auth(appid, token);
+ }
+
+ public int getMode() {
+ return mode;
+ }
+
+ public void setMode(int mode) {
+ this.mode = mode;
+ }
}
diff --git a/src/main/java/org/zbus/mq/server/Auth.java b/src/main/java/org/zbus/mq/server/Auth.java
new file mode 100644
index 0000000000000000000000000000000000000000..5a74cd196dfda5311cf422af4b2a30a4f29912bb
--- /dev/null
+++ b/src/main/java/org/zbus/mq/server/Auth.java
@@ -0,0 +1,16 @@
+package org.zbus.mq.server;
+
+public interface Auth {
+ boolean auth(String appid, String token);
+}
+
+class DefaultAuth implements Auth{
+ private String accessToken;
+ public DefaultAuth(String accessToken){
+ this.accessToken = accessToken;
+ }
+ @Override
+ public boolean auth(String appid, String token) {
+ return accessToken.equals(token);
+ }
+}
diff --git a/src/main/java/org/zbus/mq/server/MQ.java b/src/main/java/org/zbus/mq/server/MQ.java
index 8c46228ff1f1c6b0861bbb2a7f32dec7ae336023..04273cba395c1567decdeb38f1c47d18af4ef8f5 100644
--- a/src/main/java/org/zbus/mq/server/MQ.java
+++ b/src/main/java/org/zbus/mq/server/MQ.java
@@ -34,7 +34,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol.ConsumerInfo;
import org.zbus.mq.Protocol.MqInfo;
-import org.zbus.mq.Protocol.MqMode;
import org.zbus.net.core.Session;
import org.zbus.net.http.Message;
@@ -133,7 +132,7 @@ public class MQ extends AbstractMQ{
info.name = name;
info.lastUpdateTime = lastUpdateTime;
info.creator = creator;
- info.mode = MqMode.MQ.intValue();
+ info.mode = this.mode;
info.unconsumedMsgCount = msgQ.size();
info.consumerCount = pullSessions.size();
info.consumerInfoList = new ArrayList();
diff --git a/src/main/java/org/zbus/mq/server/MqAdaptor.java b/src/main/java/org/zbus/mq/server/MqAdaptor.java
index edd4541dea84976b15f8d1d3f3cba084e9ea9223..83c00d5ad36fc6292b680964d2833c547c191090 100644
--- a/src/main/java/org/zbus/mq/server/MqAdaptor.java
+++ b/src/main/java/org/zbus/mq/server/MqAdaptor.java
@@ -34,6 +34,7 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
private boolean verbose = false;
private final MqServer mqServer;
+ private String registerToken = "";
public MqAdaptor(MqServer mqServer){
@@ -41,15 +42,65 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
this.mqServer = mqServer;
this.mqTable = mqServer.getMqTable();
this.sessionTable = mqServer.getSessionTable();
+ this.registerToken = mqServer.getRegisterToken();
registerHandler(Protocol.Produce, produceHandler);
registerHandler(Protocol.Consume, consumeHandler);
registerHandler(Protocol.Route, routeHandler);
- registerHandler(Protocol.CreateMQ, createMqHandler);
- registerHandler(Protocol.Admin, new AdminHandler());
+ registerHandler(Protocol.CreateMQ, createMqHandler);
+ registerHandler(Protocol.Test, testHandler);
+ registerHandler(Protocol.Query, queryHandler);
+
+ registerHandler("", homeHandler);
+ registerHandler(Protocol.Data, dataHandler);
+ registerHandler(Protocol.Jquery, jqueryHandler);
+
registerHandler(Message.HEARTBEAT, heartbeatHandler);
}
+
+ private Message handleUrlMessage(Message msg){
+ UrlInfo url = new UrlInfo(msg.getRequestString());
+ if(url.empty){
+ msg.setCmd(""); //default to home monitor
+ return msg;
+ }
+
+ if(url.mq != null){
+ if(msg.getMq() == null){
+ msg.setMq(url.mq);
+ }
+ String method = url.method;
+ if(method == null){
+ method = "";
+ }
+ if(url.method != null || url.cmd == null){
+ AbstractMQ mq = mqTable.get(url.mq);
+ if(mq != null && MqMode.isEnabled(mq.getMode(), MqMode.RPC)){
+ msg.setMq(url.mq);
+ msg.setAck(false);
+ msg.setCmd(Protocol.Produce);
+ String module = url.module == null? "" : url.module;
+ String json = "{";
+ json += "\"module\": " + "\"" + module + "\"";
+ json += ", \"method\": " + "\"" + method + "\"";
+ if(url.params != null){
+ json += ", \"params\": " + "[" + url.params + "]";
+ }
+ json += "}";
+ msg.setJsonBody(json);
+ }
+ }
+ }
+
+ if(url.cmd != null){
+ if(msg.getCmd() == null){
+ msg.setCmd(url.cmd);
+ }
+ }
+
+ return msg;
+ }
public void onMessage(Object obj, Session sess) throws IOException {
Message msg = (Message)obj;
@@ -62,14 +113,17 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
}
String cmd = msg.getCmd();
- if(cmd == null){ //default to Admin
- cmd = Protocol.Admin;
- }
-
- MessageHandler handler = handlerMap.get(cmd);
- if(handler != null){
- handler.handle(msg, sess);
- return;
+
+ if(cmd == null){ //处理URL消息格式,否则url忽略不计
+ msg = handleUrlMessage(msg);
+ cmd = msg.getCmd();
+ }
+ if(cmd != null){
+ MessageHandler handler = handlerMap.get(cmd);
+ if(handler != null){
+ handler.handle(msg, sess);
+ return;
+ }
}
Message res = new Message();
@@ -99,6 +153,11 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
public void handle(Message msg, Session sess) throws IOException {
AbstractMQ mq = findMQ(msg, sess);
if(mq == null) return;
+ if(!auth(mq, msg)){
+ ReplyKit.reply403(msg, sess);
+ return;
+ }
+
boolean ack = msg.isAck();
msg.removeHead(Message.CMD);
msg.removeHead(Message.ACK);
@@ -116,6 +175,11 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
public void handle(Message msg, Session sess) throws IOException {
AbstractMQ mq = findMQ(msg, sess);
if(mq == null) return;
+ if(!auth(mq, msg)){
+ ReplyKit.reply403(msg, sess);
+ return;
+ }
+
mq.consume(msg, sess);
String mqName = sess.attr("mq");
@@ -141,13 +205,25 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
msg.removeHead(Message.ACK);
msg.removeHead(Message.RECVER);
msg.removeHead(Message.CMD);
- target.write(msg);
+ try{
+ target.write(msg);
+ } catch(Exception ex){
+ log.warn("Target(%s) write failed, Ignore", recver);
+ return; //just ignore
+ }
}
};
private MessageHandler createMqHandler = new MessageHandler() {
@Override
public void handle(Message msg, Session sess) throws IOException {
+ String registerToken = msg.getHead("register_token", "");
+ if(!MqAdaptor.this.registerToken.equals(registerToken)){
+ msg.setBody("registerToken unmatched");
+ ReplyKit.reply403(msg, sess);
+ return;
+ }
+
String mqName = msg.getHead("mq_name", "");
mqName = mqName.trim();
if("".equals(mqName)){
@@ -166,11 +242,12 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
try{
mode = Integer.valueOf(mqMode);
} catch (Exception e){
- msg.setBody("mqMode invalid");
+ msg.setBody("mq_mode invalid");
ReplyKit.reply400(msg, sess);
return;
}
+ String accessToken = msg.getHead("access_token", "");
AbstractMQ mq = null;
synchronized (mqTable) {
mq = mqTable.get(mqName);
@@ -180,7 +257,8 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
}
AbstractQueue support = null;
- if(MqMode.isEnabled(mode, MqMode.Memory)){
+ if(MqMode.isEnabled(mode, MqMode.Memory) ||
+ MqMode.isEnabled(mode, MqMode.RPC)){
support = new MessageMemoryQueue();
} else {
support = new MessageDiskQueue(mqName, mode);
@@ -191,7 +269,10 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
} else {
mq = new MQ(mqName, support);
}
+ mq.setMode(mode);
mq.creator = sess.getRemoteAddress();
+ mq.setAccessToken(accessToken);
+
log.info("MQ Created: %s", mq);
mqTable.put(mqName, mq);
ReplyKit.reply200(msg, sess);
@@ -201,6 +282,78 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
}
};
+ private MessageHandler testHandler = new MessageHandler() {
+ public void handle(Message msg, Session sess) throws IOException {
+ Message res = new Message();
+ res.setResponseStatus(200);
+ res.setId(msg.getId());
+ res.setBody("OK");
+ sess.write(res);
+ }
+ };
+
+ private MessageHandler homeHandler = new MessageHandler() {
+ public void handle(Message msg, Session sess) throws IOException {
+ msg = new Message();
+ msg.setResponseStatus("200");
+ msg.setHead("content-type", "text/html");
+ String body = FileKit.loadFileContent("zbus.htm");
+ if ("".equals(body)) {
+ body = "zbus.htm file missing";
+ }
+ msg.setBody(body);
+ sess.write(msg);
+ }
+ };
+
+ private MessageHandler jqueryHandler = new MessageHandler() {
+ public void handle(Message msg, Session sess) throws IOException {
+ msg = new Message();
+ msg.setResponseStatus("200");
+ msg.setHead("content-type", "application/javascript");
+ String body = FileKit.loadFileContent("jquery.js");
+ msg.setBody(body);
+ sess.write(msg);
+ }
+ };
+
+ private MessageHandler dataHandler = new MessageHandler() {
+ public void handle(Message msg, Session sess) throws IOException {
+ BrokerInfo info = getStatInfo();
+
+ Message data = new Message();
+ data.setResponseStatus("200");
+ data.setId(msg.getId());
+ data.setHead("content-type", "application/json");
+ data.setBody(JsonKit.toJson(info));
+ sess.write(data);
+ }
+ };
+
+ private MessageHandler queryHandler = new MessageHandler() {
+ public void handle(Message msg, Session sess) throws IOException {
+ String json = "";
+ if(msg.getMq() == null){
+ BrokerInfo info = getStatInfo();
+ json = JsonKit.toJson(info);
+ } else {
+ AbstractMQ mq = findMQ(msg, sess);
+ if(mq == null){
+ return;
+ } else {
+ json = JsonKit.toJson(mq.getMqInfo());
+ }
+ }
+
+ Message data = new Message();
+ data.setResponseStatus("200");
+ data.setId(msg.getId());
+ data.setHead("content-type", "application/json");
+ data.setBody(json);
+ sess.write(data);
+ }
+ };
+
private MessageHandler heartbeatHandler = new MessageHandler() {
@Override
public void handle(Message msg, Session sess) throws IOException {
@@ -238,6 +391,12 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
super.onSessionToDestroy(sess);
}
+ private boolean auth(AbstractMQ mq, Message msg){
+ String appid = msg.getHead("appid", "");
+ String token = msg.getHead("token", "");
+ return mq.auth(appid, token);
+ }
+
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
@@ -245,7 +404,9 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
public BrokerInfo getStatInfo(){
Map table = new HashMap();
for(Map.Entry e : this.mqTable.entrySet()){
- table.put(e.getKey(), e.getValue().getMqInfo());
+ MqInfo info = e.getValue().getMqInfo();
+ info.consumerInfoList.clear(); //clear to avoid long list
+ table.put(e.getKey(), info);
}
BrokerInfo info = new BrokerInfo();
info.broker = mqServer.getServerAddr();
@@ -266,10 +427,11 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
int flag = diskq.getFlag();
AbstractQueue queue = new MessageDiskQueue(name, diskq);
if( MqMode.isEnabled(flag, MqMode.PubSub)){
- mq = new PubSub(name, queue);
+ mq = new PubSub(name, queue);
} else {
- mq = new MQ(name, queue);
+ mq = new MQ(name, queue);
}
+ mq.setMode(flag);
mq.lastUpdateTime = System.currentTimeMillis();
mq.creator = "System";
mqTable.put(name, mq);
@@ -281,73 +443,5 @@ public class MqAdaptor extends IoAdaptor implements Closeable {
public void close() throws IOException {
DiskQueuePool.destory();
- }
-
-
- private class AdminHandler implements MessageHandler {
- private Map handlerMap = new ConcurrentHashMap();
- public AdminHandler() {
- handlerMap.put("", homeHandler);
- handlerMap.put("jquery", jqueryHandler);
- handlerMap.put("data", dataHandler);
- }
-
- public void handle(Message msg, Session sess) throws IOException {
- String subCmd = msg.getSubCmd(); //from header first
- if (subCmd == null){
- subCmd = msg.getRequestParam(Message.SUB_CMD); //from request Param
- }
- if(subCmd == null){ //default to home
- subCmd = "";
- }
-
- MessageHandler handler = this.handlerMap.get(subCmd);
- if (handler == null) {
- msg.setBody("sub_cmd=%s Not Found", subCmd);
- ReplyKit.reply404(msg, sess);
- return;
- }
- handler.handle(msg, sess);
- }
-
- private MessageHandler homeHandler = new MessageHandler() {
- public void handle(Message msg, Session sess) throws IOException {
- msg = new Message();
- msg.setResponseStatus("200");
- msg.setHead("content-type", "text/html");
- String body = FileKit.loadFileContent("zbus.htm");
- if ("".equals(body)) {
- body = "zbus.htm file missing";
- }
- msg.setBody(body);
- sess.write(msg);
- }
- };
-
- private MessageHandler jqueryHandler = new MessageHandler() {
- public void handle(Message msg, Session sess) throws IOException {
- msg = new Message();
- msg.setResponseStatus("200");
- msg.setHead("content-type", "application/javascript");
- String body = FileKit.loadFileContent("jquery.js");
- msg.setBody(body);
- sess.write(msg);
- }
- };
-
- private MessageHandler dataHandler = new MessageHandler() {
- public void handle(Message msg, Session sess) throws IOException {
- BrokerInfo info = getStatInfo();
-
- Message data = new Message();
- data.setResponseStatus("200");
- data.setId(msg.getId());
- data.setHead("content-type", "application/json");
- data.setBody(JsonKit.toJson(info));
- sess.write(data);
- }
- };
-
- }
-
+ }
}
\ No newline at end of file
diff --git a/src/main/java/org/zbus/mq/server/MqServer.java b/src/main/java/org/zbus/mq/server/MqServer.java
index 4d2c564c0654957e9737232a2ea415f2d6d1ba13..8ff40228def1c1f21530e6dc5b963584d451c4f5 100644
--- a/src/main/java/org/zbus/mq/server/MqServer.java
+++ b/src/main/java/org/zbus/mq/server/MqServer.java
@@ -52,10 +52,13 @@ public class MqServer extends Server{
private long trackInterval = 5000;
private MqServerConfig config;
+ private String registerToken = "";
public MqServer(MqServerConfig config){
this.config = config;
serverName = "MqServer";
+ registerToken = config.registerToken;
+ serverMainIpOrder = config.serverMainIpOrder;
dispatcher = new Dispatcher();
dispatcher.selectorCount(config.selectorCount);
dispatcher.executorCount(config.executorCount);
@@ -149,6 +152,10 @@ public class MqServer extends Server{
trackPub.pubEntryUpdate(se);
}
+ public String getRegisterToken(){
+ return registerToken;
+ }
+
public Map getMqTable() {
return mqTable;
}
@@ -161,12 +168,13 @@ public class MqServer extends Server{
MqServerConfig config = new MqServerConfig();
config.serverHost = ConfigKit.option(args, "-h", "0.0.0.0");
config.serverPort = ConfigKit.option(args, "-p", 15555);
- config.selectorCount = ConfigKit.option(args, "-selector", 1);
+ config.selectorCount = ConfigKit.option(args, "-selector", 0); //0 means default to CPU/4
config.executorCount = ConfigKit.option(args, "-executor", 64);
config.verbose = ConfigKit.option(args, "-verbose", false);
config.storePath = ConfigKit.option(args, "-store", "store");
config.trackServerList = ConfigKit.option(args, "-track", null);
config.thriftServer = ConfigKit.option(args, "-thrift", null);
+ config.serverMainIpOrder = ConfigKit.option(args, "-ipOrder", null);
final MqServer server = new MqServer(config);
if(config.thriftServer != null){
diff --git a/src/main/java/org/zbus/mq/server/MqServerConfig.java b/src/main/java/org/zbus/mq/server/MqServerConfig.java
index 833e1edd1c3a92ae03ab4d766bf8edd4a7971919..860f8ee743003e8b8c05ff621bda6e73201e6d2a 100644
--- a/src/main/java/org/zbus/mq/server/MqServerConfig.java
+++ b/src/main/java/org/zbus/mq/server/MqServerConfig.java
@@ -22,21 +22,8 @@
*/
package org.zbus.mq.server;
-import static org.zbus.kit.ConfigKit.value;
-import static org.zbus.kit.ConfigKit.valueSet;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import org.zbus.kit.FileKit;
-import org.zbus.kit.log.Logger;
-
-public class MqServerConfig{
- private static final Logger log = Logger.getLogger(MqServerConfig.class);
-
+public class MqServerConfig{
public String trackServerList = "127.0.0.1:16666;127.0.0.1:16667";
public String serverHost = "0.0.0.0";
@@ -46,111 +33,11 @@ public class MqServerConfig{
public int selectorCount = 1;
public int executorCount = 64;
public boolean verbose = true;
- public String storePath = "mq";
-
-
- public Set roles = new HashSet();
- public Set users = new HashSet();
- public Set mqs = new HashSet();
- public Set pubsubs = new HashSet();
-
+ public String storePath = "store";
+ public String registerToken = "";
+ public String serverMainIpOrder = null;
public String getServerAddress(){
return serverHost + ":" + serverPort;
- }
-
- public void load(InputStream is) throws IOException{
- Properties props = new Properties();
- if(is == null){
- log.info("Using default settings, missing config file");
- } else {
- props.load(is);
- }
-
- this.serverHost = value(props, "zbus.serverHost", "0.0.0.0");
- this.serverPort = value(props, "zbus.serverPort", 15555);
- this.selectorCount = value(props, "zbus.selectorCount", 1);
- this.executorCount = value(props, "zbus.executorCount", 64);
- this.verbose = value(props, "zbus.verbose", true);
- this.storePath = value(props, "zbus.storePath", "mq");
-
- this.roles = valueSet(props, "zbus.role");
- Set userNames = valueSet(props, "zbus.user");
- for(String name : userNames){
- User user = new User();
- user.name = name;
- user.password = value(props, String.format("zbus.user.%s.password", name));
- user.roles = valueSet(props, String.format("zbus.user.%s.role", name));
-
- this.users.add(user);
- }
-
- for(String name : valueSet(props, "zbus.mq")){
- MqEntry entry = new MqEntry();
- entry.name = name;
- entry.allowUsers = valueSet(props, String.format("zbus.mq.%s.allowUser", name));
- entry.allowRoles = valueSet(props, String.format("zbus.mq.%s.allowRole", name));
-
- this.mqs.add(entry);
- }
-
- for(String name : valueSet(props, "zbus.pubsub")){
- MqEntry entry = new MqEntry();
- entry.name = name;
- entry.allowUsers = valueSet(props, String.format("zbus.pubsub.%s.allowUser", name));
- entry.allowRoles = valueSet(props, String.format("zbus.pubsub.%s.allowRole", name));
-
- this.pubsubs.add(entry);
- }
- }
-
- public void load(String fileName) throws IOException{
- try{
- InputStream is = FileKit.loadFile(fileName);
- if(is == null){
- log.info("Using default settings, missing config file");
- } else {
- load(is);
- }
- } catch(Exception e){
- log.error(e.getMessage(), e);
- }
}
-
- public static class User {
- public String name;
- public String password;
- public Set roles = new HashSet();
- @Override
- public String toString() {
- return "User [name=" + name + ", password=" + password + ", roles="
- + roles + "]";
- }
- }
-
- public static class MqEntry {
- public String name;
- public Set allowUsers = new HashSet();
- public Set allowRoles = new HashSet();
- @Override
- public String toString() {
- return "MqEntry [name=" + name + ", allowUsers=" + allowUsers
- + ", allowRoles=" + allowRoles + "]";
- }
-
- }
-
-
-
- public static void main(String[] args) throws Exception{
- MqServerConfig config = new MqServerConfig();
-
- config.load("zbus.properties");
-
- System.out.println("roles:"+config.roles);
- System.out.println("users:"+config.users);
- System.out.println("mqs:"+config.mqs);
- System.out.println("pubsubs:"+config.pubsubs);
- }
-
}
\ No newline at end of file
diff --git a/src/main/java/org/zbus/mq/server/PubSub.java b/src/main/java/org/zbus/mq/server/PubSub.java
index 8e22f2604e91be69d94d11c6231b2559b76df6aa..db8eaa9c386929d438e50caefad58990a851ff20 100644
--- a/src/main/java/org/zbus/mq/server/PubSub.java
+++ b/src/main/java/org/zbus/mq/server/PubSub.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol.ConsumerInfo;
import org.zbus.mq.Protocol.MqInfo;
-import org.zbus.mq.Protocol.MqMode;
import org.zbus.net.core.Session;
import org.zbus.net.http.Message;
@@ -92,7 +91,7 @@ public class PubSub extends AbstractMQ{
if(pullMessage == null) continue;
msg = pull.getMsgQ().poll();
- if(msg == null) break;
+ if(msg == null) continue;
msg.setResponseStatus(200); //支持浏览器
msg.setRawId(msg.getId());
@@ -137,12 +136,14 @@ public class PubSub extends AbstractMQ{
info.name = name;
info.lastUpdateTime = lastUpdateTime;
info.creator = creator;
- info.mode = MqMode.PubSub.intValue();
+ info.mode = this.mode;
+
info.unconsumedMsgCount = msgQ.size();
info.consumerInfoList = new ArrayList();
for(PullSession pull : pullMap.values()){
info.consumerInfoList.add(pull.getConsumerInfo());
}
+ info.consumerCount = info.consumerInfoList.size();
return info;
}
}
diff --git a/src/main/java/org/zbus/mq/server/UrlInfo.java b/src/main/java/org/zbus/mq/server/UrlInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..5888bec045d530d39f80ed5a3abe6eccfe933ef3
--- /dev/null
+++ b/src/main/java/org/zbus/mq/server/UrlInfo.java
@@ -0,0 +1,131 @@
+package org.zbus.mq.server;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class UrlInfo{
+ public String cmd;
+ public String mq;
+ public String module;
+ public String method;
+ public String params;
+ public Map extra;
+ public boolean empty = false;
+
+ public UrlInfo(String url){
+ this(url, false);
+ }
+
+ public UrlInfo(String url, boolean directRpc){
+ if(url == null || "".equals(url)){
+ empty = true;
+ return;
+ }
+ if(url.length() == 1 && url.charAt(0) == '/'){
+ empty = true;
+ return;
+ }
+ try {
+ url = URLDecoder.decode(url, "UTF8");
+ } catch (UnsupportedEncodingException e) {
+ return;
+ }
+ String kvs;
+ boolean hasKvs = url.indexOf('?') != -1;
+ String[] bb = url.split("[/?]+");
+ if(hasKvs){
+ kvs = bb[bb.length-1];
+ if(!directRpc){
+ if(bb.length >= 5){
+ mq = bb[1];
+ module = bb[2];
+ method = bb[3];
+ } else if(bb.length >= 4){
+ mq = bb[1];
+ method = bb[2];
+ } else if(bb.length >= 3){
+ mq = bb[1];
+ }
+ } else {
+ if(bb.length >= 4){
+ module = bb[1];
+ method = bb[2];
+ } else if(bb.length >= 3){
+ method = bb[1];
+ }
+ }
+ } else {
+ if(!directRpc){
+ if(bb.length >= 4){
+ mq = bb[1];
+ module = bb[2];
+ method = bb[3];
+ } else if(bb.length >= 3){
+ mq = bb[1];
+ method = bb[2];
+ } else if(bb.length >= 2){
+ mq = bb[1];
+ }
+ } else {
+ if(bb.length >= 3){
+ module = bb[1];
+ method = bb[2];
+ } else if(bb.length >= 2){
+ method = bb[1];
+ }
+ }
+ return;
+ }
+
+ bb = kvs.split("[&]+");
+ for(String b : bb){
+ if("".equals(b)) continue;
+ int idx = b.indexOf('=');
+ if(idx < 0){
+ if(params == null || params.equals("")){
+ params = b;
+ }
+ continue;
+ }
+ String key = b.substring(0, idx);
+ String val = b.substring(idx+1);
+ if(extra == null){
+ synchronized (this) {
+ if(extra == null){
+ extra = new ConcurrentHashMap();
+ }
+ }
+ }
+ extra.put(key, val);
+ }
+ if(extra != null){
+ if(cmd == null && extra.containsKey("cmd")){
+ cmd = extra.get("cmd");
+ }
+ if(mq == null && extra.containsKey("mq")){
+ mq = extra.get("mq");
+ }
+ if(method == null && extra.containsKey("method")){
+ method = extra.get("method");
+ }
+ if(params == null && extra.containsKey("params")){
+ params = extra.get("params");
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "UrlInfo [cmd=" + cmd + ", mq=" + mq + ", module=" + module
+ + ", method=" + method + ", params=" + params + ", extra="
+ + extra + "]";
+ }
+
+ public static void main(String[] args){
+ UrlInfo url = new UrlInfo("/t2/xxx?ok", true);
+ System.out.println(url);
+ }
+
+}
diff --git a/src/main/java/org/zbus/net/Client.java b/src/main/java/org/zbus/net/Client.java
index 09200f90ab67169be17c0832f55c1ac4bcd3266e..1c06bec4c78a9a400c251af0a6bbde8587e81ede 100644
--- a/src/main/java/org/zbus/net/Client.java
+++ b/src/main/java/org/zbus/net/Client.java
@@ -53,9 +53,9 @@ public class Client extends IoAdaptor implements Closeable {
protected Session session;
protected volatile MsgHandler msgHandler;
- protected ErrorHandler errorHandler;
- protected ConnectedHandler connectedHandler;
- protected DisconnectedHandler disconnectedHandler;
+ protected volatile ErrorHandler errorHandler;
+ protected volatile ConnectedHandler connectedHandler;
+ protected volatile DisconnectedHandler disconnectedHandler;
public Client(String address, Dispatcher dispatcher) {
String[] blocks = address.split("[:]");
@@ -167,6 +167,7 @@ public class Client extends IoAdaptor implements Closeable {
@Override
public void close() throws IOException {
+ this.onDisconnected(null); //clear disconnection handler
if (this.session != null) {
this.session.close();
}
diff --git a/src/main/java/org/zbus/net/InvokingClient.java b/src/main/java/org/zbus/net/InvokingClient.java
index 4ac54102613fa1f79eb41f306db57f91c22d4051..6b299506ce225e700b1dbacd22b1b207350def36 100644
--- a/src/main/java/org/zbus/net/InvokingClient.java
+++ b/src/main/java/org/zbus/net/InvokingClient.java
@@ -90,6 +90,12 @@ public class InvokingClient
return this.invokeSync(req, this.readTimeout);
}
+ @Override
+ protected void onSessionDestroyed(Session sess) throws IOException {
+ super.onSessionDestroyed(sess);
+ sync.clearTicket();
+ }
+
public RES invokeSync(REQ req, int timeout) throws IOException, InterruptedException {
Ticket ticket = null;
try {
diff --git a/src/main/java/org/zbus/net/MsgInvoker.java b/src/main/java/org/zbus/net/MsgInvoker.java
index 18a355d16baf4e653624b3d781c30a711e4a07a5..5782fa3d1cab8c7c442da5546aa04dd1f4dd7401 100644
--- a/src/main/java/org/zbus/net/MsgInvoker.java
+++ b/src/main/java/org/zbus/net/MsgInvoker.java
@@ -20,7 +20,7 @@ public interface MsgInvoker {
/**
* 异步消息模式
*
- * @param msg
+ * @param req
* @param callback
* @throws IOException
*/
diff --git a/src/main/java/org/zbus/net/Server.java b/src/main/java/org/zbus/net/Server.java
index 0ba41e2174cc749f2a6e2775c86636a0af67ff36..617783ec28f048d67407bbd7285c53f36a75874b 100644
--- a/src/main/java/org/zbus/net/Server.java
+++ b/src/main/java/org/zbus/net/Server.java
@@ -24,6 +24,7 @@ package org.zbus.net;
import java.io.Closeable;
import java.io.IOException;
+import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.util.Map;
import java.util.Map.Entry;
@@ -45,8 +46,9 @@ public class Server implements Closeable{
}
protected Dispatcher dispatcher;
- protected String serverAddr; //第一个Server注册的地址
+ protected String serverAddr; //第一个Server注册的地址, 当侦听启动后才有效
protected String serverName = "Server";
+ protected String serverMainIpOrder = null;
protected Map adaptors = new ConcurrentHashMap();
@@ -87,14 +89,9 @@ public class Server implements Closeable{
adaptor.adaptorName = name;
adaptor.adaptorAddr = address;
adaptor.serverAdaptor = ioAdaptor;
-
- if(adaptors.isEmpty()){
+
+ if(adaptors.isEmpty()){ //the first
this.serverAddr = address;
- String host = blocks[0];
- int port = Integer.valueOf(blocks[1]);
- if("0.0.0.0".equals(host)){
- this.serverAddr = String.format("%s:%d",NetKit.getLocalIp(), port);
- }
}
adaptors.put(address, adaptor);
@@ -110,8 +107,31 @@ public class Server implements Closeable{
IoAdaptorInfo adaptor = e.getValue();
if(adaptor.serverChannel != null) continue;
+ String address = adaptor.adaptorAddr;
+ String[] bb = address.split("[:]");
+ if(bb.length !=2 ) continue;
+ String host = bb[0];
+ String port = bb[1];
+
adaptor.serverChannel = dispatcher.registerServerChannel(adaptor.adaptorAddr, adaptor.serverAdaptor);
- String info = String.format("%s-%s listening [%s]", this.serverName, adaptor.adaptorName, adaptor.adaptorAddr);
+
+ ServerSocket ss = adaptor.serverChannel.socket();
+ if(address.equals(serverAddr)){
+ String localHost = host;
+ if("0.0.0.0".equals(host)){
+ if(serverMainIpOrder == null){
+ localHost = NetKit.getLocalIp();
+ } else {
+ localHost = NetKit.getLocalIp(serverMainIpOrder);
+ }
+ }
+ serverAddr = String.format("%s:%d", localHost, ss.getLocalPort());
+ }
+ if("0".equals(port)){
+ adaptor.adaptorAddr = String.format("%s:%d", host, ss.getLocalPort());
+ }
+
+ String info = String.format("%s-%s listening [%s]", this.serverName, adaptor.adaptorName, adaptor.adaptorAddr);
if(adaptor.adaptorName == null){
info = String.format("%s listening [%s]", this.serverName, adaptor.adaptorAddr);
}
diff --git a/src/main/java/org/zbus/net/Sync.java b/src/main/java/org/zbus/net/Sync.java
index 92ae835455a2ebe1d4b4fa0dbf0a623d9e67b484..0dc060da083e724280e9a53715cc7b4d96ff522c 100644
--- a/src/main/java/org/zbus/net/Sync.java
+++ b/src/main/java/org/zbus/net/Sync.java
@@ -151,6 +151,13 @@ public class Sync {
return tickets.remove(id);
}
+ public void clearTicket(){
+ for(Ticket ticket : tickets.values()){
+ ticket.countDown();
+ }
+ tickets.clear();
+ }
+
public static interface Id {
void setId(String id);
diff --git a/src/main/java/org/zbus/net/core/Dispatcher.java b/src/main/java/org/zbus/net/core/Dispatcher.java
index 9bdd41d23b319720d38fe55862a8c4a9a9f4ef5c..2cd66c2197f9e1a14a5d247671eca4893a918f78 100644
--- a/src/main/java/org/zbus/net/core/Dispatcher.java
+++ b/src/main/java/org/zbus/net/core/Dispatcher.java
@@ -42,12 +42,14 @@ import org.zbus.kit.log.Logger;
public class Dispatcher implements Closeable {
+ public static final int DEFAULT_EXECUTOR_COUNT = 64;
+
private static final Logger log = Logger.getLogger(Dispatcher.class);
private ExecutorService executor;
- private int selectorCount = 1;
- private int executorCount = 32;
+ private int selectorCount = defaultSelectorCount();
+ private int executorCount = DEFAULT_EXECUTOR_COUNT;
private SelectorThread[] selectors;
private AtomicInteger selectorIndex = new AtomicInteger(0);
private String dispatcherName = "Dispatcher";
@@ -166,12 +168,26 @@ public class Dispatcher implements Closeable {
}
public Dispatcher selectorCount(int count){
- this.selectorCount = count;
+ if(count <= 0){
+ this.selectorCount = defaultSelectorCount();
+ } else {
+ this.selectorCount = count;
+ }
return this;
}
+ public static int defaultSelectorCount(){
+ int c = Runtime.getRuntime().availableProcessors()/2;
+ if(c <= 0) c = 1;
+ return c;
+ }
+
public Dispatcher executorCount(int count){
- this.executorCount = count;
+ if(count <= 0){
+ this.executorCount = DEFAULT_EXECUTOR_COUNT;
+ } else {
+ this.executorCount = count;
+ }
return this;
}
diff --git a/src/main/java/org/zbus/net/core/SelectorThread.java b/src/main/java/org/zbus/net/core/SelectorThread.java
index d81695b14414785cde0931dac21b11fa8350ff7d..1b1da25e716a6035f82ebae24a9b80057e3b17e1 100644
--- a/src/main/java/org/zbus/net/core/SelectorThread.java
+++ b/src/main/java/org/zbus/net/core/SelectorThread.java
@@ -1,275 +1,274 @@
-/**
- * The MIT License (MIT)
- * Copyright (c) 2009-2015 HONG LEIMING
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-package org.zbus.net.core;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.zbus.kit.NetKit;
-import org.zbus.kit.log.Logger;
-import org.zbus.net.core.Session.SessionStatus;
-
-public class SelectorThread extends Thread {
- private static final Logger log = Logger.getLogger(SelectorThread.class);
-
- protected volatile java.nio.channels.Selector selector = null;
- protected final Dispatcher dispatcher;
- private final Queue