diff --git a/README.md b/README.md
index 865cb6f4b7f9f2128d1b0492ab759dcc8b03ac5a..47ff132f81831c41d2b68b48182881d5b9786d0f 100644
--- a/README.md
+++ b/README.md
@@ -1,11 +1,17 @@
# 鹊桥(内网穿透)
#### 介绍
-[使用java实现的内网穿透工具,致力于帮助开发者内网开发供外部调试使用。](http://blog.jiucheng.org/article/10097)
+[使用java基于aio/nio实现的内网穿透工具,致力于帮助开发者内网开发供外部调试使用。](http://blog.jiucheng.org/article/10097)
#### 软件架构
![软件架构图](https://images.gitee.com/uploads/images/2019/0616/165254_c3715fbf_120615.png "build-0.0.1.png")
+#### 源码打包正式包
+
+```
+mvn clean install -Dmaven.test.skip -Denv=release
+```
+
#### 安装教程
1. 准备公网(假设公网ip=10.1.1.22)服务器,安装openjdk1.7+
diff --git a/magpiebridge-client/.gitignore b/magpiebridge-client/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..b83d22266ac8aa2f8df2edef68082c789727841d
--- /dev/null
+++ b/magpiebridge-client/.gitignore
@@ -0,0 +1 @@
+/target/
diff --git a/magpiebridge-client/pom.xml b/magpiebridge-client/pom.xml
index c90ef067b4325cf36519a434bdeac5fe4eff57d2..d9fed5efdbfe3d06337b98de697053300a9632f8 100644
--- a/magpiebridge-client/pom.xml
+++ b/magpiebridge-client/pom.xml
@@ -4,7 +4,7 @@
org.jiucheng.magpiebridge
magpiebridge
- 0.0.1
+ 1.0.0
magpiebridge-client
diff --git a/magpiebridge-client/src/main/bin/startup.sh b/magpiebridge-client/src/main/bin/startup.sh
index 9f0d9252827acf2af077ccc0ddb63d3430088e66..cbad87a6d07aa81ef4ac3e865bf776181665ea46 100644
--- a/magpiebridge-client/src/main/bin/startup.sh
+++ b/magpiebridge-client/src/main/bin/startup.sh
@@ -52,7 +52,7 @@ CLASSPATH="$base/conf:$CLASSPATH";
echo CLASSPATH :$CLASSPATH
echo "cd to $bin_abs_path for workaround relative path"
cd $bin_abs_path
-$JAVA $JAVA_OPTS $APP_OPTS -classpath .:$CLASSPATH org.jiucheng.magpiebridge.client.nio.Client 1>>$base/logs/client.log 2>&1 &
+$JAVA $JAVA_OPTS $APP_OPTS -classpath .:$CLASSPATH org.jiucheng.magpiebridge.client.aio.Client 1>>$base/logs/client.log 2>&1 &
echo $! > $base/bin/client.pid
echo "cd to $current_path for continue"
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java
new file mode 100644
index 0000000000000000000000000000000000000000..ad56512a6db653fd3ca1bb22a6d891deab239124
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java
@@ -0,0 +1,84 @@
+package org.jiucheng.magpiebridge.client.aio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment;
+import org.jiucheng.magpiebridge.util.Cfg;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class Client {
+ private static final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2);
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ Cfg.loadProperties(Client.class);
+ final ClientAttachment attachment = new ClientAttachment();
+ startup(attachment);
+ schedule(attachment);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown(attachment);
+ }
+ });
+ }
+
+ private static void schedule(final ClientAttachment clientAttachment) {
+ // 重连
+ schedule.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ if (clientAttachment.isFailed()) {
+ try {
+ startup(clientAttachment);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }, 60, 60, TimeUnit.SECONDS);
+ // 心跳
+ schedule.scheduleWithFixedDelay(new Runnable() {
+ public void run() {
+ if (!clientAttachment.isFailed()) {
+ clientAttachment.heartbeat();
+ }
+ }
+ }, 30L, 30L, TimeUnit.SECONDS);
+ }
+
+ public static void startup(final ClientAttachment attachment) throws IOException {
+ ConcurrentHashMap tas = attachment.getTransferAttachments();
+ Enumeration uris = tas.keys();
+ if (uris != null) {
+ while (uris.hasMoreElements()) {
+ Integer uri = uris.nextElement();
+ TransferAttachment ta = tas.remove(uri);
+ if (ta != null) {
+ ta.disconnect();
+ }
+ }
+ }
+ AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
+ client.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ client.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024);
+ attachment.setClient(client);
+ attachment.setFailed(false);
+ client.connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()), attachment, attachment.getClientEstablishmentCompletionHandler());
+ }
+
+ public static void shutdown(ClientAttachment clientAttachment) {
+ schedule.shutdownNow();
+ clientAttachment.close();
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java
new file mode 100644
index 0000000000000000000000000000000000000000..060cc20d21df2891879e0f657ca2c36582bac5a7
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java
@@ -0,0 +1,141 @@
+package org.jiucheng.magpiebridge.client.aio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ClientAttachment {
+ private ClientEstablishmentCompletionHandler clientEstablishmentCompletionHandler = new ClientEstablishmentCompletionHandler();
+ private ClientReadCompletionHandler clientReadCompletionHandler = new ClientReadCompletionHandler();
+ private ClientWriteCompletionHandler clientWriteCompletionHandler = new ClientWriteCompletionHandler();
+ private ClientWriteNoReadCompletionHandler clientWriteNoReadCompletionHandler = new ClientWriteNoReadCompletionHandler();
+
+ private ConcurrentHashMap transferAttachments = new ConcurrentHashMap();
+ // 当前在写
+ public final AtomicBoolean writed = new AtomicBoolean(false);
+
+ private AsynchronousSocketChannel client;
+ private ByteBuffer writeByteBuffer;
+ private ByteBuffer readByteBuffer;
+ private boolean failed = false;
+
+ public ClientEstablishmentCompletionHandler getClientEstablishmentCompletionHandler() {
+ return clientEstablishmentCompletionHandler;
+ }
+
+ public ClientReadCompletionHandler getClientReadCompletionHandler() {
+ return clientReadCompletionHandler;
+ }
+
+ public ClientWriteCompletionHandler getClientWriteCompletionHandler() {
+ return clientWriteCompletionHandler;
+ }
+
+ public ClientWriteNoReadCompletionHandler getClientWriteNoReadCompletionHandler() {
+ return clientWriteNoReadCompletionHandler;
+ }
+
+ public AsynchronousSocketChannel getClient() {
+ return client;
+ }
+
+ public ClientAttachment setClient(AsynchronousSocketChannel client) {
+ this.client = client;
+ return this;
+ }
+
+ public ByteBuffer getReadByteBuffer() {
+ return readByteBuffer;
+ }
+
+ public ClientAttachment setReadByteBuffer(ByteBuffer readByteBuffer) {
+ this.readByteBuffer = readByteBuffer;
+ return this;
+ }
+
+ public ByteBuffer getWriteByteBuffer() {
+ return writeByteBuffer;
+ }
+
+ public ClientAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) {
+ this.writeByteBuffer = writeByteBuffer;
+ return this;
+ }
+
+ public boolean canWrited() {
+ while (!writed.compareAndSet(false, true)) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public ClientAttachment setFailed(boolean failed) {
+ this.failed = failed;
+ return this;
+ }
+
+ public ConcurrentHashMap getTransferAttachments() {
+ return transferAttachments;
+ }
+
+ // 代理连接断开消息发送
+ public void disconnect(int uri) {
+ if (canWrited()) {
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.DISCONNECT);
+ message.setUri(uri);
+ ByteBuffer writeByteBuffer = Message.toByteBuffer(message);
+ getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer), getClientWriteNoReadCompletionHandler());
+ }
+ }
+
+ // HEARTBEAT
+ public void heartbeat() {
+ if (canWrited()) {
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.HEARTBEAT);
+ ByteBuffer writeByteBuffer = Message.toByteBuffer(message);
+ getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer), getClientWriteNoReadCompletionHandler());
+ }
+ }
+
+ public void close () {
+ try {
+ getClient().close();
+ } catch (IOException e) {
+ }
+ ConcurrentHashMap tas = getTransferAttachments();
+ Enumeration uris = tas.keys();
+ if (uris != null) {
+ while (uris.hasMoreElements()) {
+ Integer uri = uris.nextElement();
+ TransferAttachment ta = tas.remove(uri);
+ if (ta != null) {
+ ta.disconnect();
+ }
+ }
+ }
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..d4d3cdf9ee9082ce5942445b6324c007f33c9182
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java
@@ -0,0 +1,33 @@
+package org.jiucheng.magpiebridge.client.aio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.util.Cfg;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ClientEstablishmentCompletionHandler implements CompletionHandler {
+
+ public void completed(Void result, ClientAttachment attachment) {
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.AUTH);
+ byte[] data = Cfg.getClientKey().getBytes();
+ message.setData(data);
+ message.setSize(data.length);
+ ByteBuffer buffer = Message.toByteBuffer(message);
+ if (attachment.canWrited()) {
+ attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), attachment.getClientWriteCompletionHandler());
+ }
+ }
+
+ public void failed(Throwable exc, ClientAttachment attachment) {
+ System.out.println("establish error");
+ attachment.setFailed(true);
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..8ebdf5e840752dc6dcb6b8d802c85589d2f5068d
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java
@@ -0,0 +1,113 @@
+package org.jiucheng.magpiebridge.client.aio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.SocketChannel;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment;
+import org.jiucheng.magpiebridge.transfer.aio.TransferEstablishmentCompletionHandler;
+import org.jiucheng.magpiebridge.util.Cfg;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ClientReadCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, ClientAttachment attachment) {
+ if (result == -1) {
+ close(attachment);
+ return;
+ }
+
+ ByteBuffer readByteBuffer = attachment.getReadByteBuffer();
+ if (readByteBuffer.position() != readByteBuffer.capacity()) {
+ attachment.getClient().read(readByteBuffer, attachment, this);
+ return;
+ }
+
+ if (readByteBuffer.capacity() == 13) {
+ // 验证消息头
+ readByteBuffer.flip();
+ int magic = readByteBuffer.getInt();
+ byte type = readByteBuffer.get();
+ int uri = readByteBuffer.getInt();
+ int size = readByteBuffer.getInt();
+ if (magic != Message.MAGIC) {
+ close(attachment);
+ return;
+ }
+ if (size > 0) {
+ readByteBuffer = ByteBuffer.allocate(size + 13);
+ readByteBuffer.putInt(magic);
+ readByteBuffer.put(type);
+ readByteBuffer.putInt(uri);
+ readByteBuffer.putInt(size);
+ attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler());
+ return;
+ }
+ }
+
+ Message message = Message.fromByteBuffer(readByteBuffer);
+ byte type = message.getType();
+ if (type == Message.Type.CONNECT) {
+ try {
+ handleConnectMessage(attachment, message);
+ } catch (IOException e) {
+ e.printStackTrace();
+ attachment.disconnect(message.getUri());
+ }
+ readByteBuffer = ByteBuffer.allocate(13);
+ attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler());
+ } else if (type == Message.Type.HEARTBEAT) {
+ // 心跳不处理
+ readByteBuffer = ByteBuffer.allocate(13);
+ attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler());
+ }
+ }
+
+ private void handleConnectMessage(ClientAttachment attachment, Message message) throws IOException {
+ int uri = message.getUri();
+ // lp.io2c.com
+ String ipport = new String(message.getData());
+ String[] strs = ipport.split(":");
+
+ SocketChannel realSocketChannel = null;
+ try {
+ realSocketChannel = SocketChannel.open();
+ realSocketChannel.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1])));
+ } catch (NumberFormatException e1) {
+ attachment.disconnect(uri);
+ return;
+ } catch (IOException e1) {
+ attachment.disconnect(uri);
+ return;
+ }
+
+ AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
+ client.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ client.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024);
+ TransferAttachment transferAttachment = new TransferAttachment().setClient(client).setUri(uri).setRealSocketChannel(realSocketChannel);
+
+ transferAttachment.getClient().connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()), transferAttachment.setClientAttachment(attachment), new TransferEstablishmentCompletionHandler());
+ }
+
+ public void close(ClientAttachment attachment) {
+ attachment.setFailed(true);
+ try {
+ attachment.getClient().close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void failed(Throwable exc, ClientAttachment attachment) {
+ attachment.setFailed(true);
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..1551bc0703b1999177984aafcec467b0dac1c1c5
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java
@@ -0,0 +1,30 @@
+package org.jiucheng.magpiebridge.client.aio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ClientWriteCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, ClientAttachment attachment) {
+ if (attachment.getWriteByteBuffer().hasRemaining()) {
+ attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this);
+ return;
+ }
+ attachment.writed.compareAndSet(true, false);
+
+ attachment.setWriteByteBuffer(null);
+ ByteBuffer readByteBuffer = ByteBuffer.allocate(13);
+ attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler());
+ }
+
+ public void failed(Throwable exc, ClientAttachment attachment) {
+ System.out.println("write error");
+ attachment.setFailed(true);
+ attachment.writed.compareAndSet(true, false);
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..16f9f1b806028f19c246f0b472f002d8e7ca18fe
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java
@@ -0,0 +1,25 @@
+package org.jiucheng.magpiebridge.client.aio;
+
+import java.nio.channels.CompletionHandler;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ClientWriteNoReadCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, ClientAttachment attachment) {
+ if (attachment.getWriteByteBuffer().hasRemaining()) {
+ attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this);
+ return;
+ }
+ attachment.writed.compareAndSet(true, false);
+ }
+
+ public void failed(Throwable exc, ClientAttachment attachment) {
+ System.out.println("write heartbeat error");
+ attachment.setFailed(true);
+ attachment.writed.compareAndSet(true, false);
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java
deleted file mode 100644
index eb67f50e6c40234a8d767995d6875b3dd0234b3c..0000000000000000000000000000000000000000
--- a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java
+++ /dev/null
@@ -1,340 +0,0 @@
-package org.jiucheng.magpiebridge.client.nio;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.jiucheng.magpiebridge.protocol.Message;
-import org.jiucheng.magpiebridge.util.Cfg;
-
-/**
- *
- * @author jiucheng
- *
- */
-public class Client {
-
- private static final Logger LOGGER = Logger.getLogger(Client.class.getName());
-
- private static final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2);
- private static final ExecutorService executorService = Executors.newCachedThreadPool();
- private static final Map realServers = new ConcurrentHashMap();
-
- // 当前在写
- private static final AtomicBoolean writed = new AtomicBoolean(false);
- private static volatile SocketChannel socketChannel;
- private static volatile boolean runed = false;
-
- public static void main(String[] args) throws IOException, InterruptedException {
- Cfg.loadProperties(Client.class);
- schedule();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- shutdown();
- }
- });
- startup();
- }
-
- private static void schedule() {
- // 重连
- schedule.scheduleAtFixedRate(new Runnable() {
- public void run() {
- if (!runed) {
- try {
- startup();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }, 60, 60, TimeUnit.SECONDS);
- // 心跳
- schedule.scheduleWithFixedDelay(new Runnable() {
- public void run() {
- SocketChannel sc = Client.socketChannel;
- if (sc != null) {
- Message message = new Message();
- message.setMagic(Message.MAGIC);
- message.setType(Message.Type.HEARTBEAT);
- ByteBuffer buffer = Message.toByteBuffer(message);
- while(!writed.compareAndSet(false, true)) {}
- try {
- while (buffer.hasRemaining()) {
- sc.write(buffer);
- }
- } catch (IOException e) {
- e.printStackTrace();
- closeSocketChannel();
- } finally {
- writed.compareAndSet(true, false);
- }
- }
- }
- }, 30L, 30L, TimeUnit.SECONDS);
- }
-
- public static void startup() {
- runed = true;
- try {
- socketChannel = SocketChannel.open();
- socketChannel.connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()));
- } catch (IOException e) {
- e.printStackTrace();
- closeSocketChannel();
- return;
- }
-
- auth(socketChannel);
-
- ByteBuffer headerByteBuffer = ByteBuffer.allocate(13);
- while (true) {
- headerByteBuffer.clear();
- ByteBuffer buf;
- try {
- int len = socketChannel.read(headerByteBuffer);
- if (len == -1) {
- closeSocketChannel();
- return;
- }
- while (headerByteBuffer.position() != headerByteBuffer.capacity()) {
- socketChannel.read(headerByteBuffer);
- }
-
- headerByteBuffer.flip();
-
- int magic = headerByteBuffer.getInt();
- byte type = headerByteBuffer.get();
- int uri = headerByteBuffer.getInt();
- int size = headerByteBuffer.getInt();
-
- buf = ByteBuffer.allocate(size + 13);
- buf.putInt(magic);
- buf.put(type);
- buf.putInt(uri);
- buf.putInt(size);
- while (buf.position() != buf.capacity()) {
- socketChannel.read(buf);
- }
- } catch (Exception e) {
- e.printStackTrace();
- closeSocketChannel();
- return;
- }
-
- Message message = Message.fromByteBuffer(buf);
- byte type = message.getType();
- if (type == Message.Type.CONNECT) {
- handleConnectMessage(socketChannel, message);
- } else if (type == Message.Type.DISCONNECT) {
- handleDisconnectMessage(socketChannel, message);
- } else if (type == Message.Type.TRANSFER) {
- handleTransferMessage(socketChannel, message);
- } else if (type == Message.Type.HEARTBEAT) {
- // 心跳不处理
- }
- }
- }
-
- public static void closeSocketChannel() {
- if (socketChannel != null) {
- try {
- socketChannel.close();
- } catch (IOException e) {
- // e.printStackTrace();
- }
- socketChannel = null;
- }
- for (Integer key : realServers.keySet()) {
- SocketChannel socket = realServers.remove(key);
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- // e.printStackTrace();
- }
- }
- }
- runed = false;
- }
-
- public static void shutdown() {
- schedule.shutdownNow();
- closeSocketChannel();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("shutdown");
- }
- }
-
- private static void handleDisconnectMessage(final SocketChannel socketChannel, final Message proxyMessage) {
- SocketChannel socket = realServers.remove(proxyMessage.getUri());
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- // e.printStackTrace();
- }
- }
- }
-
- private static void handleTransferMessage(final SocketChannel socketChannel, final Message message) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(message.getData().length);
- byteBuffer.put(message.getData());
- byteBuffer.flip();
- SocketChannel socket = realServers.get(message.getUri());
- if (socket != null) {
- try {
- while(byteBuffer.hasRemaining()) {
- socket.write(byteBuffer);
- }
- } catch (Exception e) {
- e.printStackTrace();
- realServers.remove(message.getUri());
- while(!writed.compareAndSet(false, true)) {}
- try {
- Message rmsg = new Message();
- rmsg.setMagic(Message.MAGIC);
- rmsg.setType(Message.Type.DISCONNECT);
- rmsg.setUri(message.getUri());
- ByteBuffer buffer = Message.toByteBuffer(rmsg);
- while(buffer.hasRemaining()) {
- socketChannel.write(buffer);
- }
- } catch (Exception e2) {
- e2.printStackTrace();
- closeSocketChannel();
- } finally {
- writed.compareAndSet(true, false);
- }
- }
- }
- }
-
- private static void handleConnectMessage(final SocketChannel socketChannel, final Message message) {
- int uri = message.getUri();
- // lp.io2c.com
- String ipport = new String(message.getData());
- String[] strs = ipport.split(":");
-
- SocketChannel realSocketChannel = null;
- try {
- realSocketChannel = SocketChannel.open();
- realSocketChannel.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1])));
- } catch (NumberFormatException e1) {
- e1.printStackTrace();
- disconnect(socketChannel, uri);
- return;
- } catch (IOException e1) {
- e1.printStackTrace();
- disconnect(socketChannel, uri);
- return;
- }
-
- realServers.put(uri, realSocketChannel);
-
- final Integer tmpUri = uri;
- executorService.execute(new Runnable() {
- private Integer uri = tmpUri;
- public void run() {
- ByteBuffer buf = ByteBuffer.allocate(4 * 1024 * 1024);
- while(true) {
- SocketChannel real = realServers.get(uri);
- if (real == null) {
- return;
- }
- int len;
- try {
- len = real.read(buf);
- } catch (IOException e) {
- realServers.remove(uri);
- disconnect(socketChannel, uri);
- // e.printStackTrace();
- return;
- }
- if (len == -1) {
- realServers.remove(uri);
- disconnect(socketChannel, uri);
- return;
- } else if (len > 0) {
- buf.flip();
- Message message = new Message();
- message.setMagic(Message.MAGIC);
- message.setType(Message.Type.TRANSFER);
- message.setUri(uri);
- byte[] bts = new byte[len];
- buf.get(bts);
- message.setData(bts);
- message.setSize(bts.length);
- ByteBuffer buffer = Message.toByteBuffer(message);
- while(!writed.compareAndSet(false, true)) {}
- try {
- while(buffer.hasRemaining()) {
- socketChannel.write(buffer);
- }
- } catch (Exception e) {
- e.printStackTrace();
- closeSocketChannel();
- return;
- } finally {
- writed.compareAndSet(true, false);
- }
- buf.clear();
- }
- }
- }
- });
- }
-
- // 代理连接断开消息发送
- private static void disconnect(SocketChannel socketChannel, int uri) {
- Message message = new Message();
- message.setMagic(Message.MAGIC);
- message.setType(Message.Type.DISCONNECT);
- message.setUri(uri);
- ByteBuffer buffer = Message.toByteBuffer(message);
- while (!writed.compareAndSet(false, true)) {}
- try {
- while(buffer.hasRemaining()) {
- socketChannel.write(buffer);
- }
- } catch (Exception e2) {
- e2.printStackTrace();
- closeSocketChannel();
- } finally {
- writed.compareAndSet(true, false);
- }
- }
-
- // 认证信息发送
- private static void auth(final SocketChannel socketChannel) {
- Message message = new Message();
- message.setMagic(Message.MAGIC);
- message.setType(Message.Type.AUTH);
- byte[] data = Cfg.getClientKey().getBytes();
- message.setData(data);
- message.setSize(data.length);
- ByteBuffer buffer = Message.toByteBuffer(message);
- while(!writed.compareAndSet(false, true)) {}
- try {
- while (buffer.hasRemaining()) {
- socketChannel.write(buffer);
- }
- } catch (Exception e) {
- e.printStackTrace();
- closeSocketChannel();
- } finally {
- writed.compareAndSet(true, false);
- }
- }
-}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java
new file mode 100644
index 0000000000000000000000000000000000000000..23518c9605b6c81caa7baf4caa63964eedeed227
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java
@@ -0,0 +1,151 @@
+package org.jiucheng.magpiebridge.transfer.aio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jiucheng.magpiebridge.client.aio.ClientAttachment;
+import org.jiucheng.magpiebridge.protocol.Message;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class TransferAttachment {
+ // 当前在写
+ public final AtomicBoolean writed = new AtomicBoolean(false);
+
+ private AsynchronousSocketChannel client;
+ private ByteBuffer writeByteBuffer;
+ private ByteBuffer readByteBuffer;
+ private int uri;
+
+ private SocketChannel realSocketChannel;
+ private boolean failed;
+
+ private ClientAttachment clientAttachment;
+
+ public ClientAttachment getClientAttachment() {
+ return clientAttachment;
+ }
+
+ public TransferAttachment setClientAttachment(ClientAttachment clientAttachment) {
+ this.clientAttachment = clientAttachment;
+ return this;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public TransferAttachment setFailed(boolean failed) {
+ this.failed = failed;
+ return this;
+ }
+
+ public SocketChannel getRealSocketChannel() {
+ return realSocketChannel;
+ }
+
+ public TransferAttachment setRealSocketChannel(SocketChannel realSocketChannel) {
+ this.realSocketChannel = realSocketChannel;
+ return this;
+ }
+
+ public int getUri() {
+ return uri;
+ }
+
+ public TransferAttachment setUri(int uri) {
+ this.uri = uri;
+ return this;
+ }
+
+ public AsynchronousSocketChannel getClient() {
+ return client;
+ }
+
+ public TransferAttachment setClient(AsynchronousSocketChannel client) {
+ this.client = client;
+ return this;
+ }
+
+ public ByteBuffer getWriteByteBuffer() {
+ return writeByteBuffer;
+ }
+
+ public TransferAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) {
+ this.writeByteBuffer = writeByteBuffer;
+ return this;
+ }
+
+ public ByteBuffer getReadByteBuffer() {
+ return readByteBuffer;
+ }
+
+ public TransferAttachment setReadByteBuffer(ByteBuffer readByteBuffer) {
+ this.readByteBuffer = readByteBuffer;
+ return this;
+ }
+
+ public boolean canReaded() {
+ while (!writed.compareAndSet(false, false)) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+
+ public boolean canWrited() {
+ while (!writed.compareAndSet(false, true)) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+
+ public void close() {
+ if (clientAttachment != null) {
+ clientAttachment.getTransferAttachments().remove(getUri());
+ clientAttachment = null;
+ }
+ if (realSocketChannel != null) {
+ try {
+ realSocketChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ realSocketChannel = null;
+ }
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ client = null;
+ }
+ }
+
+ // 代理连接断开消息发送
+ public void disconnect() {
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.DISCONNECT);
+ message.setUri(getUri());
+ ByteBuffer writeByteBuffer = Message.toByteBuffer(message);
+ if (canWrited()) {
+ getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer).setFailed(true), new TransferWriteCompletionHandler());
+ }
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..7701e7964bbde3e7112721f1cc82a9431291b821
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java
@@ -0,0 +1,74 @@
+package org.jiucheng.magpiebridge.transfer.aio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.SocketChannel;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.util.ThreadManager;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class TransferConnectedCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, final TransferAttachment attachment) {
+ if (attachment.getWriteByteBuffer().hasRemaining()) {
+ attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this);
+ return;
+ }
+ attachment.writed.compareAndSet(true, false);
+
+ attachment.setWriteByteBuffer(null);
+ ByteBuffer readByteBuffer = ByteBuffer.allocate(13);
+ attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), new TransferReadCompletionHandler());
+
+ ThreadManager.singleton().execute(new Runnable() {
+ public void run() {
+ ByteBuffer buf = ByteBuffer.allocate(4 * 1024 * 1024);
+ while(true) {
+ if (attachment.canReaded()) {
+ SocketChannel real = attachment.getRealSocketChannel();
+ if (real == null) {
+ return;
+ }
+ int len;
+ try {
+ len = real.read(buf);
+ } catch (IOException e) {
+ attachment.disconnect();
+ return;
+ }
+ if (len == -1) {
+ attachment.disconnect();
+ return;
+ } else if (len > 0) {
+ buf.flip();
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.TRANSFER);
+ message.setUri(attachment.getUri());
+ byte[] bts = new byte[len];
+ buf.get(bts);
+ message.setData(bts);
+ message.setSize(bts.length);
+ ByteBuffer buffer = Message.toByteBuffer(message);
+ if (attachment.canWrited()) {
+ attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferWriteCompletionHandler());
+ buf.clear();
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public void failed(Throwable exc, TransferAttachment attachment) {
+ attachment.writed.compareAndSet(true, false);
+ attachment.close();
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..97054c8cfce0ccd36da788ca2b58467ff9b2dcec
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java
@@ -0,0 +1,38 @@
+package org.jiucheng.magpiebridge.transfer.aio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+
+import org.jiucheng.magpiebridge.client.aio.ClientAttachment;
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.util.Cfg;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class TransferEstablishmentCompletionHandler implements CompletionHandler {
+
+ public void completed(Void result, TransferAttachment attachment) {
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.CONNECT);
+ message.setUri(attachment.getUri());
+ byte[] data = Cfg.getClientKey().getBytes();
+ message.setData(data);
+ message.setSize(data.length);
+ ByteBuffer buffer = Message.toByteBuffer(message);
+ if (attachment.canWrited()) {
+ attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferConnectedCompletionHandler());
+ }
+ }
+
+ public void failed(Throwable exc, TransferAttachment attachment) {
+ ClientAttachment clientAttachment = attachment.getClientAttachment();
+ if (clientAttachment != null) {
+ clientAttachment.disconnect(attachment.getUri());
+ }
+ attachment.close();
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..955c52c2278d31d2254e1e958f05484382d3cde0
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java
@@ -0,0 +1,90 @@
+package org.jiucheng.magpiebridge.transfer.aio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.SocketChannel;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class TransferReadCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, TransferAttachment attachment) {
+ // client关闭连接
+ if (result == -1) {
+ // close(attachment);
+ attachment.close();
+ return;
+ }
+ ByteBuffer readByteBuffer = attachment.getReadByteBuffer();
+ if (readByteBuffer.position() != readByteBuffer.capacity()) {
+ attachment.getClient().read(readByteBuffer, attachment, this);
+ return;
+ }
+ if (readByteBuffer.capacity() == 13) {
+ // 验证消息头
+ readByteBuffer.flip();
+ int magic = readByteBuffer.getInt();
+ byte type = readByteBuffer.get();
+ int uri = readByteBuffer.getInt();
+ int size = readByteBuffer.getInt();
+ if (magic != Message.MAGIC) {
+ // close(attachment);
+ attachment.close();
+ return;
+ }
+ if (size > 0) {
+ readByteBuffer = ByteBuffer.allocate(size + 13);
+ readByteBuffer.putInt(magic);
+ readByteBuffer.put(type);
+ readByteBuffer.putInt(uri);
+ readByteBuffer.putInt(size);
+ attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), this);
+ return;
+ }
+ }
+
+ Message message = Message.fromByteBuffer(readByteBuffer);
+ byte type = message.getType();
+ if (type == Message.Type.TRANSFER) {
+ handleTransferMessage(attachment, message);
+ } else if (type == Message.Type.HEARTBEAT) {
+ // 心跳不处理
+ attachment.setReadByteBuffer(ByteBuffer.allocate(13));
+ attachment.getClient().read(attachment.getReadByteBuffer(), attachment, this);
+ }
+ }
+
+ private void handleTransferMessage(TransferAttachment attachment, final Message message) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(message.getData().length);
+ byteBuffer.put(message.getData());
+ byteBuffer.flip();
+ SocketChannel socket = attachment.getRealSocketChannel();
+ if (socket != null) {
+ try {
+ while(byteBuffer.hasRemaining()) {
+ socket.write(byteBuffer);
+ }
+ attachment.setReadByteBuffer(ByteBuffer.allocate(13));
+ attachment.getClient().read(attachment.getReadByteBuffer(), attachment, this);
+ } catch (Exception e) {
+ Message rmsg = new Message();
+ rmsg.setMagic(Message.MAGIC);
+ rmsg.setType(Message.Type.DISCONNECT);
+ rmsg.setUri(message.getUri());
+ ByteBuffer buffer = Message.toByteBuffer(rmsg);
+ if (attachment.canWrited()) {
+ attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferWriteCompletionHandler());
+ }
+ }
+ }
+ }
+
+ public void failed(Throwable exc, TransferAttachment attachment) {
+ attachment.close();
+ }
+}
diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..0538f481cdeebc6a3680e549ff4a118012d0aeb1
--- /dev/null
+++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java
@@ -0,0 +1,28 @@
+package org.jiucheng.magpiebridge.transfer.aio;
+
+import java.nio.channels.CompletionHandler;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class TransferWriteCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, TransferAttachment attachment) {
+ if (attachment.getWriteByteBuffer().hasRemaining()) {
+ attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this);
+ return;
+ }
+ attachment.writed.compareAndSet(true, false);
+
+ if (attachment.isFailed()) {
+ attachment.close();
+ }
+ }
+
+ public void failed(Throwable exc, TransferAttachment attachment) {
+ attachment.writed.compareAndSet(true, false);
+ attachment.close();
+ }
+}
diff --git a/magpiebridge-common/pom.xml b/magpiebridge-common/pom.xml
index 1f948a802567bb4897775b2cf4591cdf2a744317..ccbd09e28fbf9791958804aa7ac32e55f04d5ac2 100644
--- a/magpiebridge-common/pom.xml
+++ b/magpiebridge-common/pom.xml
@@ -4,7 +4,7 @@
org.jiucheng.magpiebridge
magpiebridge
- 0.0.1
+ 1.0.0
magpiebridge-common
diff --git a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java
deleted file mode 100644
index 7fdd9fc710fa7655a494a013e9d259ae28b247c3..0000000000000000000000000000000000000000
--- a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.jiucheng.magpiebridge.schedule;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class ScheduleManager {
-
- private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
-
- public static void schedule(Runnable runnable) {
- scheduledExecutorService.scheduleAtFixedRate(runnable, 30L, 30L, TimeUnit.MICROSECONDS);
- }
-}
diff --git a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java
new file mode 100644
index 0000000000000000000000000000000000000000..d9a8a04797f2f37d055e910efc2d7a11f4d398d6
--- /dev/null
+++ b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java
@@ -0,0 +1,18 @@
+package org.jiucheng.magpiebridge.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ThreadManager {
+
+ private static final ExecutorService executorService = Executors.newCachedThreadPool();
+
+ public static ExecutorService singleton() {
+ return executorService;
+ }
+}
diff --git a/magpiebridge-server/pom.xml b/magpiebridge-server/pom.xml
index 0c87bafaf8613c80c6d4f291c3f7fac92271dea3..edbef9abffd0c472472f556b2a4bbaeddb0cd526 100644
--- a/magpiebridge-server/pom.xml
+++ b/magpiebridge-server/pom.xml
@@ -4,7 +4,7 @@
org.jiucheng.magpiebridge
magpiebridge
- 0.0.1
+ 1.0.0
magpiebridge-server
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java
deleted file mode 100644
index 4c7ac5185a1929bd4c66068f6fb2b8850ce52bae..0000000000000000000000000000000000000000
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package org.jiucheng.magpiebridge.server.aio;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousServerSocketChannel;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteAttachment;
-
-/**
- * Client附件
- *
- * @author jiucheng
- *
- */
-public class ClientAttachment {
- // 当前在写
- public final AtomicBoolean writed = new AtomicBoolean(false);
-
- // 代理服务
- private List asynchronousServerSocketChannels = new ArrayList();
- // 代理连接
- private final ConcurrentMap proxys = new ConcurrentHashMap();
-
- // Client连接
- AsynchronousSocketChannel asynchronousSocketChannel;
- // Client读入数据
- ByteBuffer readBuffer;
-
- public ClientAttachment(AsynchronousSocketChannel asynchronousSocketChannel) {
- this.asynchronousSocketChannel = asynchronousSocketChannel;
- }
-
- public ByteBuffer getReadBuffer() {
- return readBuffer;
- }
-
- public ClientAttachment setReadBuffer(ByteBuffer readBuffer) {
- this.readBuffer = readBuffer;
- return this;
- }
-
- public AsynchronousSocketChannel getAsynchronousSocketChannel() {
- return asynchronousSocketChannel;
- }
-
- public void putProxy(Integer uri, ProxyClientWriteAttachment proxy) {
- proxys.put(uri, proxy);
- }
-
- public ProxyClientWriteAttachment getProxy(Integer uri) {
- return proxys.get(uri);
- }
-
- public ProxyClientWriteAttachment removeProxy(Integer uri) {
- return proxys.remove(uri);
- }
-
- public void addProxyServer(AsynchronousServerSocketChannel asynchronousServerSocketChannel) {
- asynchronousServerSocketChannels.add(asynchronousServerSocketChannel);
- }
-
- public void close() {
- Set keys = proxys.keySet();
- for (Integer key : keys) {
- ProxyClientWriteAttachment proxyWriteAttachment = proxys.remove(key);
- if (proxyWriteAttachment != null && proxyWriteAttachment.getAsynchronousSocketChannel() != null) {
- try {
- proxyWriteAttachment.getAsynchronousSocketChannel().close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- for (AsynchronousServerSocketChannel asynchronousServerSocketChannel : asynchronousServerSocketChannels) {
- if (asynchronousServerSocketChannel != null) {
- try {
- asynchronousServerSocketChannel.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- if (asynchronousSocketChannel != null) {
- try {
- asynchronousSocketChannel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientWriteCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientWriteCompletionHandler.java
deleted file mode 100644
index 1887c2f54d23bfb0fdfd39703b8148446888378c..0000000000000000000000000000000000000000
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientWriteCompletionHandler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.jiucheng.magpiebridge.server.aio;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
-
-import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientCompletionHandler;
-
-public class ClientWriteCompletionHandler implements CompletionHandler {
-
- public void completed(Integer result, ClientWriteAttachment attachment) {
- if (attachment.getWriteBuffer().hasRemaining()) {
- attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(attachment.getWriteBuffer(), attachment, this);
- return;
- }
-
- attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(true, false);
-
- if (attachment.getProxyAsynchronousSocketChannel() != null) {
- attachment.setReadByteBuffer(ByteBuffer.allocate(4 * 1024 * 1024));
- attachment.getProxyAsynchronousSocketChannel().read(attachment.getReadByteBuffer(), attachment, new ProxyClientCompletionHandler());
- }
- }
-
- public void failed(Throwable exc, ClientWriteAttachment attachment) {
- attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(true, false);
- exc.printStackTrace();
- }
-}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java
index 7d80a30846eaac6de4b80aea9ef21550434ec8eb..63866c9d2ad0e9059150426e23a5f7530d139834 100644
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java
@@ -5,12 +5,11 @@ import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jiucheng.magpiebridge.util.Cfg;
+import org.jiucheng.magpiebridge.util.ThreadManager;
/**
*
@@ -18,23 +17,19 @@ import org.jiucheng.magpiebridge.util.Cfg;
*
*/
public class Server {
-
private static final Logger LOGGER = Logger.getLogger(Server.class.getName());
private static final Object wait = new Object();
- public static final int READ_TIMEOUT = Integer.MAX_VALUE;
-
public static void main(String[] args) throws IOException, InterruptedException {
Cfg.loadProperties(Server.class);
- ExecutorService executorService = Executors.newCachedThreadPool();
- AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
+ AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(ThreadManager.singleton());
final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
server.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024);
server.bind(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()));
- server.accept(new ServerAttachment(server), new ServerCompletionHandler());
+ server.accept(server, new ServerEstablishmentCompletionHandler());
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
@@ -54,6 +49,7 @@ public class Server {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.log(Level.INFO, "Server started");
}
+
synchronized (wait) {
wait.wait();
}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java
index ecc43dcc3c7d50534eb8c87f9254cc145d94a316..1c61cda276a23d8e4c22478819b0cae3e65ee6c2 100644
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java
@@ -1,16 +1,134 @@
package org.jiucheng.magpiebridge.server.aio;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jiucheng.magpiebridge.server.aio.proxy.ProxyAttachment;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
public class ServerAttachment {
+ // 代理服务
+ private List asynchronousServerSocketChannels = new ArrayList();
+ // 代理连接
+ public static final ConcurrentMap proxys = new ConcurrentHashMap();
+
+ // Client连接
+ AsynchronousSocketChannel server;
+ // Client读入数据
+ ByteBuffer readBuffer;
+ // Client写入数据
+ ByteBuffer writeBuffer;
+ // 当前在写
+ public final AtomicBoolean writed = new AtomicBoolean(false);
- AsynchronousServerSocketChannel server;
+ private ProxyAttachment proxyAttachment;
+ private boolean mastered;
- public ServerAttachment(AsynchronousServerSocketChannel server) {
+ public ServerAttachment(AsynchronousSocketChannel server) {
this.server = server;
}
- public AsynchronousServerSocketChannel getServer() {
+ public boolean isMastered() {
+ return mastered;
+ }
+
+ public ServerAttachment setMastered(boolean mastered) {
+ this.mastered = mastered;
+ return this;
+ }
+
+ public ProxyAttachment getProxyAttachment() {
+ return proxyAttachment;
+ }
+
+ public ServerAttachment setProxyAttachment(ProxyAttachment proxyAttachment) {
+ this.proxyAttachment = proxyAttachment;
+ return this;
+ }
+
+ public ByteBuffer getWriteBuffer() {
+ return writeBuffer;
+ }
+
+ public ServerAttachment setWriteBuffer(ByteBuffer writeBuffer) {
+ this.writeBuffer = writeBuffer;
+ return this;
+ }
+
+ public ByteBuffer getReadBuffer() {
+ return readBuffer;
+ }
+
+ public ServerAttachment setReadBuffer(ByteBuffer readBuffer) {
+ this.readBuffer = readBuffer;
+ return this;
+ }
+
+ public AsynchronousSocketChannel getServer() {
return server;
}
+
+ public void putProxy(Integer uri, ProxyAttachment proxy) {
+ proxys.put(uri, proxy);
+ }
+
+ public ProxyAttachment getProxy(Integer uri) {
+ return proxys.get(uri);
+ }
+
+ public ProxyAttachment removeProxy(Integer uri) {
+ return proxys.remove(uri);
+ }
+
+ public void addProxyServer(AsynchronousServerSocketChannel asynchronousServerSocketChannel) {
+ asynchronousServerSocketChannels.add(asynchronousServerSocketChannel);
+ }
+
+ public void close() {
+ if (server != null) {
+ try {
+ server.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ if (mastered) {
+ for (AsynchronousServerSocketChannel asynchronousServerSocketChannel : asynchronousServerSocketChannels) {
+ if (asynchronousServerSocketChannel != null) {
+ try {
+ asynchronousServerSocketChannel.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ } else {
+ if (proxyAttachment != null) {
+ proxyAttachment.close();
+ }
+ }
+ }
+
+ public boolean canWrited() {
+ while (!writed.compareAndSet(false, true)) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java
similarity index 32%
rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java
rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java
index e2ab76dd89a246907df1b93d3de3458ddc127996..2059fd8c81a0906f67832250fbb17cba4dd8e9cc 100644
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java
@@ -1,20 +1,25 @@
package org.jiucheng.magpiebridge.server.aio;
import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
-import java.util.concurrent.TimeUnit;
-public class ServerCompletionHandler implements CompletionHandler {
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ServerEstablishmentCompletionHandler implements CompletionHandler {
- public void completed(AsynchronousSocketChannel result, ServerAttachment attachment) {
- attachment.getServer().accept(attachment, this);
+ public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
+ attachment.accept(attachment, this);
ByteBuffer readBuffer = ByteBuffer.allocate(13);
- result.read(readBuffer, Server.READ_TIMEOUT, TimeUnit.SECONDS, new ClientAttachment(result).setReadBuffer(readBuffer), new ClientCompletionHandler());
+ result.read(readBuffer, new ServerAttachment(result).setReadBuffer(readBuffer), new ServerReadCompletionHandler());
}
- public void failed(Throwable exc, ServerAttachment attachment) {
+ public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
exc.printStackTrace();
}
}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java
similarity index 54%
rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java
rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java
index efdbdf462ffe47052d1255eb5ff78c880cdf82b5..85892048052f6ac65682ab1b28ab541f48993cd9 100644
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java
@@ -10,31 +10,29 @@ import java.nio.channels.CompletionHandler;
import java.text.MessageFormat;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jiucheng.magpiebridge.protocol.Message;
-import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteAttachment;
-import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteCompletionHandler;
-import org.jiucheng.magpiebridge.server.aio.proxy.ProxyServerAttachment;
-import org.jiucheng.magpiebridge.server.aio.proxy.ProxyServerCompletionHandler;
+import org.jiucheng.magpiebridge.server.aio.proxy.ProxyReadCompletionHandler;
+import org.jiucheng.magpiebridge.server.aio.proxy.ProxyAttachment;
+import org.jiucheng.magpiebridge.server.aio.proxy.ProxyWriteCompletionHandler;
+import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentAttachment;
+import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentCompletionHandler;
import org.jiucheng.magpiebridge.util.Cfg;
+import org.jiucheng.magpiebridge.util.ThreadManager;
/**
- * Client数据解析处理
*
* @author jiucheng
*
*/
-public class ClientCompletionHandler implements CompletionHandler {
-
- private static final Logger LOGGER = Logger.getLogger(ClientCompletionHandler.class.getName());
+public class ServerReadCompletionHandler implements CompletionHandler {
+ private static final Logger LOGGER = Logger.getLogger(ServerReadCompletionHandler.class.getName());
// Client连接
- private static final ConcurrentMap CLIENTS = new ConcurrentHashMap();
+ private static final ConcurrentMap CLIENTS = new ConcurrentHashMap();
- public void completed(Integer result, ClientAttachment attachment) {
+ public void completed(Integer result, ServerAttachment attachment) {
// client关闭连接
if (result == -1) {
close(attachment);
@@ -43,7 +41,7 @@ public class ClientCompletionHandler implements CompletionHandler 0) {
readByteBuffer = ByteBuffer.allocate(size + 13);
readByteBuffer.putInt(magic);
readByteBuffer.put(type);
readByteBuffer.putInt(uri);
readByteBuffer.putInt(size);
- attachment.getAsynchronousSocketChannel().read(readByteBuffer, Server.READ_TIMEOUT, TimeUnit.SECONDS, attachment.setReadBuffer(readByteBuffer), this);
+ attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
return;
}
}
@@ -79,47 +77,60 @@ public class ClientCompletionHandler implements CompletionHandler remote={1}", local, remote));
}
proxy.bind(new InetSocketAddress(locals[0], Integer.valueOf(locals[1])));
- proxy.accept(new ProxyServerAttachment(proxy).setClientAttachment(attachment).setRemote(remote), new ProxyServerCompletionHandler());
+ proxy.accept(new ProxyEstablishmentAttachment(proxy).setServerAttachment(attachment).setRemote(remote), new ProxyEstablishmentCompletionHandler());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.log(Level.INFO, MessageFormat.format("----end port={0} -> remote={1}", local, remote));
}
@@ -156,11 +167,11 @@ public class ClientCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, ServerAttachment attachment) {
+ if (attachment.getWriteBuffer().hasRemaining()) {
+ attachment.getServer().write(attachment.getWriteBuffer(), attachment, this);
+ return;
+ }
+ attachment.writed.compareAndSet(true, false);
+
+ ProxyAttachment proxyAttachment = attachment.getProxyAttachment();
+ if (proxyAttachment != null) {
+ proxyAttachment.setReadBuffer(ByteBuffer.allocate(4 * 1024 * 1024));
+ proxyAttachment.getAsynchronousSocketChannel().read(proxyAttachment.getReadBuffer(), proxyAttachment, new ProxyReadCompletionHandler());
+ }
+ }
+
+ public void failed(Throwable exc, ServerAttachment attachment) {
+ attachment.writed.compareAndSet(true, false);
+ exc.printStackTrace();
+ }
+}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java
similarity index 31%
rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteCompletionHandler.java
rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java
index 08f2399e3a48d901c681aa4f1cc2961f6f0aa321..7094d5aa146d23949b6884901cd8f1bd4f46aeb6 100644
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteCompletionHandler.java
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java
@@ -1,18 +1,23 @@
-package org.jiucheng.magpiebridge.server.aio.proxy;
+package org.jiucheng.magpiebridge.server.aio;
import java.nio.channels.CompletionHandler;
-public class ProxyClientWriteCompletionHandler implements CompletionHandler {
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ServerWriteNoProxyClientReadCompletionHandler implements CompletionHandler {
- public void completed(Integer result, ProxyClientWriteAttachment attachment) {
- if (attachment.getWriteByteBuffer().hasRemaining()) {
- attachment.getAsynchronousSocketChannel().write(attachment.getWriteByteBuffer(), attachment, this);
+ public void completed(Integer result, ServerAttachment attachment) {
+ if (attachment.getWriteBuffer().hasRemaining()) {
+ attachment.getServer().write(attachment.getWriteBuffer(), attachment, this);
return;
}
attachment.writed.compareAndSet(true, false);
}
- public void failed(Throwable exc, ProxyClientWriteAttachment attachment) {
+ public void failed(Throwable exc, ServerAttachment attachment) {
attachment.writed.compareAndSet(true, false);
exc.printStackTrace();
}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java
similarity index 31%
rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteAttachment.java
rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java
index 28f2ff9cd1870cafbf9b13b9de45e30629903720..1539b832a7a94a39d4cdaea1695f7a52794895d4 100644
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteAttachment.java
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java
@@ -1,10 +1,12 @@
package org.jiucheng.magpiebridge.server.aio.proxy;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.jiucheng.magpiebridge.server.aio.ClientAttachment;
+import org.jiucheng.magpiebridge.server.aio.ServerAttachment;
/**
* 代理连接附件
@@ -12,17 +14,20 @@ import org.jiucheng.magpiebridge.server.aio.ClientAttachment;
* @author jiucheng
*
*/
-public class ProxyClientWriteAttachment {
+public class ProxyAttachment {
// 当前在写
public final AtomicBoolean writed = new AtomicBoolean(false);
// 代理连接
private AsynchronousSocketChannel asynchronousSocketChannel;
// 待写入数据
- private ByteBuffer writeByteBuffer;
+ private ByteBuffer readBuffer;
+ // 待写入数据
+ private ByteBuffer writeBuffer;
// Client附件
- private ClientAttachment clientAttachment;
+ private ServerAttachment serverAttachment;
+ private int uri;
- public ProxyClientWriteAttachment(AsynchronousSocketChannel asynchronousSocketChannel) {
+ public ProxyAttachment(AsynchronousSocketChannel asynchronousSocketChannel) {
this.asynchronousSocketChannel = asynchronousSocketChannel;
}
@@ -30,21 +35,62 @@ public class ProxyClientWriteAttachment {
return asynchronousSocketChannel;
}
- public ByteBuffer getWriteByteBuffer() {
- return writeByteBuffer;
+ public int getUri() {
+ return uri;
+ }
+
+ public ProxyAttachment setUri(int uri) {
+ this.uri = uri;
+ return this;
+ }
+
+ public ByteBuffer getReadBuffer() {
+ return readBuffer;
+ }
+
+ public ProxyAttachment setReadBuffer(ByteBuffer readBuffer) {
+ this.readBuffer = readBuffer;
+ return this;
+ }
+
+ public ByteBuffer getWriteBuffer() {
+ return writeBuffer;
}
- public ProxyClientWriteAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) {
- this.writeByteBuffer = writeByteBuffer;
+ public ProxyAttachment setWriteBuffer(ByteBuffer writeBuffer) {
+ this.writeBuffer = writeBuffer;
return this;
}
- public ClientAttachment getClientAttachment() {
- return clientAttachment;
+ public ServerAttachment getServerAttachment() {
+ return serverAttachment;
}
- public ProxyClientWriteAttachment setClientAttachment(ClientAttachment clientAttachment) {
- this.clientAttachment = clientAttachment;
+ public ProxyAttachment setServerAttachment(ServerAttachment serverAttachment) {
+ this.serverAttachment = serverAttachment;
return this;
}
+
+ public boolean canWrited() {
+ while (!writed.compareAndSet(false, true)) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+
+ public void close() {
+ ServerAttachment.proxys.remove(uri);
+ AsynchronousSocketChannel proxy = getAsynchronousSocketChannel();
+ if (proxy != null) {
+ try {
+ proxy.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientCompletionHandler.java
deleted file mode 100644
index 39d752f97498aa8d8bcac3d394770acaa3020716..0000000000000000000000000000000000000000
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientCompletionHandler.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.jiucheng.magpiebridge.server.aio.proxy;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
-
-import org.jiucheng.magpiebridge.protocol.Message;
-import org.jiucheng.magpiebridge.server.aio.ClientWriteAttachment;
-import org.jiucheng.magpiebridge.server.aio.ClientWriteCompletionHandler;
-
-public class ProxyClientCompletionHandler implements CompletionHandler {
-
- public void completed(Integer result, ClientWriteAttachment attachment) {
- if (result == -1) {
- try {
- attachment.getProxyAsynchronousSocketChannel().close();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- Message proxym = new Message();
- proxym.setMagic(Message.MAGIC);
- proxym.setType(Message.Type.DISCONNECT);
- proxym.setUri(attachment.getUri());
- ByteBuffer writeBuffer = Message.toByteBuffer(proxym);
- while (!attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(false, true)) {}
- attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, attachment.setWriteBuffer(writeBuffer).setProxyAsynchronousSocketChannel(null), new ClientWriteCompletionHandler());
- return;
- }
- ByteBuffer readByteBuffer = attachment.getReadByteBuffer();
- readByteBuffer.flip();
-
- Message message = new Message();
- message.setType(Message.Type.TRANSFER);
- message.setUri(attachment.getUri());
- byte[] bts = new byte[result];
- readByteBuffer.get(bts);
- message.setData(bts);
- message.setSize(bts.length);
-
- ByteBuffer writeBuffer = Message.toByteBuffer(message);
- while (!attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(false, true)) {}
- attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, attachment.setWriteBuffer(writeBuffer), new ClientWriteCompletionHandler());
- return;
- }
-
- public void failed(Throwable exc, ClientWriteAttachment attachment) {
- exc.printStackTrace();
- }
-}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java
similarity index 31%
rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerAttachment.java
rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java
index 41028b71ddfe00f690ad0296dc512d497366fea7..c8312e2787394d9880d47b3faefd03ed8b802b9e 100644
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerAttachment.java
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java
@@ -2,32 +2,37 @@ package org.jiucheng.magpiebridge.server.aio.proxy;
import java.nio.channels.AsynchronousServerSocketChannel;
-import org.jiucheng.magpiebridge.server.aio.ClientAttachment;
+import org.jiucheng.magpiebridge.server.aio.ServerAttachment;
-public class ProxyServerAttachment {
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ProxyEstablishmentAttachment {
- private AsynchronousServerSocketChannel proxyServer;
- private ClientAttachment clientAttachment;
+ private AsynchronousServerSocketChannel proxy;
+ private ServerAttachment serverAttachment;
private String remote;
- public ProxyServerAttachment(AsynchronousServerSocketChannel proxyServer) {
- this.proxyServer = proxyServer;
+ public ProxyEstablishmentAttachment(AsynchronousServerSocketChannel proxy) {
+ this.proxy = proxy;
}
- public AsynchronousServerSocketChannel getProxyServer() {
- return proxyServer;
+ public AsynchronousServerSocketChannel getProxy() {
+ return proxy;
}
- public ClientAttachment getClientAttachment() {
- return clientAttachment;
+ public ServerAttachment getServerAttachment() {
+ return serverAttachment;
}
- public ProxyServerAttachment setClientAttachment(ClientAttachment clientAttachment) {
- this.clientAttachment = clientAttachment;
+ public ProxyEstablishmentAttachment setServerAttachment(ServerAttachment serverAttachment) {
+ this.serverAttachment = serverAttachment;
return this;
}
- public ProxyServerAttachment setRemote(String remort) {
+ public ProxyEstablishmentAttachment setRemote(String remort) {
this.remote = remort;
return this;
}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..450209882f060737eae57507307b67e23203319b
--- /dev/null
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java
@@ -0,0 +1,42 @@
+package org.jiucheng.magpiebridge.server.aio.proxy;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.server.aio.ServerAttachment;
+import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ProxyEstablishmentCompletionHandler implements CompletionHandler {
+ private static final AtomicInteger CLIENTS_ID = new AtomicInteger();
+
+ public void completed(AsynchronousSocketChannel result, ProxyEstablishmentAttachment attachment) {
+ attachment.getProxy().accept(attachment, this);
+
+ if (attachment.getServerAttachment().canWrited()) {
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.CONNECT);
+ message.setUri(CLIENTS_ID.incrementAndGet());
+ byte[] data = attachment.getRemote().getBytes();
+ message.setData(data);
+ message.setSize(data.length);
+ ByteBuffer writeBuffer = Message.toByteBuffer(message);
+
+ ServerAttachment.proxys.put(message.getUri(), new ProxyAttachment(result).setUri(message.getUri()).setServerAttachment(attachment.getServerAttachment()));
+ attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler());
+ }
+ }
+
+ public void failed(Throwable exc, ProxyEstablishmentAttachment attachment) {
+ // attachment.getServerAttachment().writed.compareAndSet(true, false);
+ System.out.println("proxy server failed");
+ }
+}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..21232b438fdb396af36fcb29075d868daa804d75
--- /dev/null
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java
@@ -0,0 +1,67 @@
+package org.jiucheng.magpiebridge.server.aio.proxy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.server.aio.ServerWriteCompletionHandler;
+import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ProxyReadCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, ProxyAttachment attachment) {
+ if (result == -1) {
+ try {
+ attachment.getAsynchronousSocketChannel().close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (attachment.getServerAttachment().canWrited()) {
+ Message proxym = new Message();
+ proxym.setMagic(Message.MAGIC);
+ proxym.setType(Message.Type.DISCONNECT);
+ proxym.setUri(attachment.getUri());
+ ByteBuffer writeBuffer = Message.toByteBuffer(proxym);
+
+ attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler());
+ return;
+ }
+ }
+
+ if (attachment.getServerAttachment().canWrited()) {
+ ByteBuffer readByteBuffer = attachment.getReadBuffer();
+ readByteBuffer.flip();
+ Message message = new Message();
+ message.setMagic(Message.MAGIC);
+ message.setType(Message.Type.TRANSFER);
+ message.setUri(attachment.getUri());
+ byte[] bts = new byte[result];
+ readByteBuffer.get(bts);
+ message.setData(bts);
+ message.setSize(bts.length);
+ ByteBuffer writeBuffer = Message.toByteBuffer(message);
+
+ attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteCompletionHandler());
+ }
+ }
+
+ public void failed(Throwable exc, ProxyAttachment attachment) {
+ if (attachment.getServerAttachment().canWrited()) {
+ Message proxym = new Message();
+ proxym.setMagic(Message.MAGIC);
+ proxym.setType(Message.Type.DISCONNECT);
+ proxym.setUri(attachment.getUri());
+ ByteBuffer writeBuffer = Message.toByteBuffer(proxym);
+
+ attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler());
+ return;
+ }
+ exc.printStackTrace();
+ }
+}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerCompletionHandler.java
deleted file mode 100644
index c4a63dacc67fd7b21231faa678253a2a0712d59d..0000000000000000000000000000000000000000
--- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerCompletionHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.jiucheng.magpiebridge.server.aio.proxy;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jiucheng.magpiebridge.protocol.Message;
-import org.jiucheng.magpiebridge.server.aio.ClientWriteAttachment;
-import org.jiucheng.magpiebridge.server.aio.ClientWriteCompletionHandler;
-
-public class ProxyServerCompletionHandler implements CompletionHandler {
-
- private static final AtomicInteger CLIENTS_ID = new AtomicInteger();
-
- public void completed(AsynchronousSocketChannel result, ProxyServerAttachment attachment) {
- attachment.getProxyServer().accept(attachment, this);
-
- Message message = new Message();
- message.setMagic(Message.MAGIC);
- message.setType(Message.Type.CONNECT);
- message.setUri(CLIENTS_ID.incrementAndGet());
- byte[] data = attachment.getRemote().getBytes();
- message.setData(data);
- message.setSize(data.length);
-
- ByteBuffer writeBuffer = Message.toByteBuffer(message);
- attachment.getClientAttachment().putProxy(message.getUri(), new ProxyClientWriteAttachment(result).setClientAttachment(attachment.getClientAttachment()));
-
- while (!attachment.getClientAttachment().writed.compareAndSet(false, true)) {}
- attachment.getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, new ClientWriteAttachment(writeBuffer, attachment).setProxyAsynchronousSocketChannel(result).setUri(message.getUri()), new ClientWriteCompletionHandler());
- }
-
- public void failed(Throwable exc, ProxyServerAttachment attachment) {
- attachment.getClientAttachment().writed.compareAndSet(true, false);
- System.out.println("proxy server failed");
- }
-}
diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..b55ea0bb11e05cc2ed2c2e988a9fafe88fb23509
--- /dev/null
+++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java
@@ -0,0 +1,42 @@
+package org.jiucheng.magpiebridge.server.aio.proxy;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+
+import org.jiucheng.magpiebridge.protocol.Message;
+import org.jiucheng.magpiebridge.server.aio.ServerAttachment;
+import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler;
+
+/**
+ *
+ * @author jiucheng
+ *
+ */
+public class ProxyWriteCompletionHandler implements CompletionHandler {
+
+ public void completed(Integer result, ProxyAttachment attachment) {
+ if (attachment.getWriteBuffer().hasRemaining()) {
+ attachment.getAsynchronousSocketChannel().write(attachment.getWriteBuffer(), attachment, this);
+ return;
+ }
+ attachment.writed.compareAndSet(true, false);
+ }
+
+ public void failed(Throwable exc, ProxyAttachment attachment) {
+ attachment.writed.compareAndSet(true, false);
+ exc.printStackTrace();
+ if (attachment.getServerAttachment().canWrited()) {
+ Message proxym = new Message();
+ proxym.setMagic(Message.MAGIC);
+ proxym.setType(Message.Type.DISCONNECT);
+ proxym.setUri(attachment.getUri());
+ ByteBuffer writeBuffer = Message.toByteBuffer(proxym);
+
+ attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler());
+ }
+ ServerAttachment serverAttachment = attachment.getServerAttachment();
+ if (serverAttachment != null) {
+ serverAttachment.close();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 29e7396dffa389e53ca4c7fb7d246ab5fb7b2582..14770f4262e13d602db51554eb33813d06b3a41f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
org.jiucheng.magpiebridge
magpiebridge
- 0.0.1
+ 1.0.0
pom
magpiebridge-common
@@ -16,7 +16,7 @@
org.jiucheng.magpiebridge
magpiebridge-common
- 0.0.1
+ 1.0.0
junit