diff --git a/README.md b/README.md index 865cb6f4b7f9f2128d1b0492ab759dcc8b03ac5a..47ff132f81831c41d2b68b48182881d5b9786d0f 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,17 @@ # 鹊桥(内网穿透) #### 介绍 -[使用java实现的内网穿透工具,致力于帮助开发者内网开发供外部调试使用。](http://blog.jiucheng.org/article/10097) +[使用java基于aio/nio实现的内网穿透工具,致力于帮助开发者内网开发供外部调试使用。](http://blog.jiucheng.org/article/10097) #### 软件架构 ![软件架构图](https://images.gitee.com/uploads/images/2019/0616/165254_c3715fbf_120615.png "build-0.0.1.png") +#### 源码打包正式包 + +``` +mvn clean install -Dmaven.test.skip -Denv=release +``` + #### 安装教程 1. 准备公网(假设公网ip=10.1.1.22)服务器,安装openjdk1.7+ diff --git a/magpiebridge-client/.gitignore b/magpiebridge-client/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..b83d22266ac8aa2f8df2edef68082c789727841d --- /dev/null +++ b/magpiebridge-client/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/magpiebridge-client/pom.xml b/magpiebridge-client/pom.xml index c90ef067b4325cf36519a434bdeac5fe4eff57d2..d9fed5efdbfe3d06337b98de697053300a9632f8 100644 --- a/magpiebridge-client/pom.xml +++ b/magpiebridge-client/pom.xml @@ -4,7 +4,7 @@ org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 magpiebridge-client diff --git a/magpiebridge-client/src/main/bin/startup.sh b/magpiebridge-client/src/main/bin/startup.sh index 9f0d9252827acf2af077ccc0ddb63d3430088e66..cbad87a6d07aa81ef4ac3e865bf776181665ea46 100644 --- a/magpiebridge-client/src/main/bin/startup.sh +++ b/magpiebridge-client/src/main/bin/startup.sh @@ -52,7 +52,7 @@ CLASSPATH="$base/conf:$CLASSPATH"; echo CLASSPATH :$CLASSPATH echo "cd to $bin_abs_path for workaround relative path" cd $bin_abs_path -$JAVA $JAVA_OPTS $APP_OPTS -classpath .:$CLASSPATH org.jiucheng.magpiebridge.client.nio.Client 1>>$base/logs/client.log 2>&1 & +$JAVA $JAVA_OPTS $APP_OPTS -classpath .:$CLASSPATH org.jiucheng.magpiebridge.client.aio.Client 1>>$base/logs/client.log 2>&1 & echo $! > $base/bin/client.pid echo "cd to $current_path for continue" diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java new file mode 100644 index 0000000000000000000000000000000000000000..ad56512a6db653fd3ca1bb22a6d891deab239124 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java @@ -0,0 +1,84 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.Enumeration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class Client { + private static final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2); + + public static void main(String[] args) throws IOException, InterruptedException { + Cfg.loadProperties(Client.class); + final ClientAttachment attachment = new ClientAttachment(); + startup(attachment); + schedule(attachment); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + shutdown(attachment); + } + }); + } + + private static void schedule(final ClientAttachment clientAttachment) { + // 重连 + schedule.scheduleAtFixedRate(new Runnable() { + public void run() { + if (clientAttachment.isFailed()) { + try { + startup(clientAttachment); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }, 60, 60, TimeUnit.SECONDS); + // 心跳 + schedule.scheduleWithFixedDelay(new Runnable() { + public void run() { + if (!clientAttachment.isFailed()) { + clientAttachment.heartbeat(); + } + } + }, 30L, 30L, TimeUnit.SECONDS); + } + + public static void startup(final ClientAttachment attachment) throws IOException { + ConcurrentHashMap tas = attachment.getTransferAttachments(); + Enumeration uris = tas.keys(); + if (uris != null) { + while (uris.hasMoreElements()) { + Integer uri = uris.nextElement(); + TransferAttachment ta = tas.remove(uri); + if (ta != null) { + ta.disconnect(); + } + } + } + AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); + client.setOption(StandardSocketOptions.SO_REUSEADDR, true); + client.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024); + attachment.setClient(client); + attachment.setFailed(false); + client.connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()), attachment, attachment.getClientEstablishmentCompletionHandler()); + } + + public static void shutdown(ClientAttachment clientAttachment) { + schedule.shutdownNow(); + clientAttachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java new file mode 100644 index 0000000000000000000000000000000000000000..060cc20d21df2891879e0f657ca2c36582bac5a7 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java @@ -0,0 +1,141 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.Enumeration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment; + +/** + * + * @author jiucheng + * + */ +public class ClientAttachment { + private ClientEstablishmentCompletionHandler clientEstablishmentCompletionHandler = new ClientEstablishmentCompletionHandler(); + private ClientReadCompletionHandler clientReadCompletionHandler = new ClientReadCompletionHandler(); + private ClientWriteCompletionHandler clientWriteCompletionHandler = new ClientWriteCompletionHandler(); + private ClientWriteNoReadCompletionHandler clientWriteNoReadCompletionHandler = new ClientWriteNoReadCompletionHandler(); + + private ConcurrentHashMap transferAttachments = new ConcurrentHashMap(); + // 当前在写 + public final AtomicBoolean writed = new AtomicBoolean(false); + + private AsynchronousSocketChannel client; + private ByteBuffer writeByteBuffer; + private ByteBuffer readByteBuffer; + private boolean failed = false; + + public ClientEstablishmentCompletionHandler getClientEstablishmentCompletionHandler() { + return clientEstablishmentCompletionHandler; + } + + public ClientReadCompletionHandler getClientReadCompletionHandler() { + return clientReadCompletionHandler; + } + + public ClientWriteCompletionHandler getClientWriteCompletionHandler() { + return clientWriteCompletionHandler; + } + + public ClientWriteNoReadCompletionHandler getClientWriteNoReadCompletionHandler() { + return clientWriteNoReadCompletionHandler; + } + + public AsynchronousSocketChannel getClient() { + return client; + } + + public ClientAttachment setClient(AsynchronousSocketChannel client) { + this.client = client; + return this; + } + + public ByteBuffer getReadByteBuffer() { + return readByteBuffer; + } + + public ClientAttachment setReadByteBuffer(ByteBuffer readByteBuffer) { + this.readByteBuffer = readByteBuffer; + return this; + } + + public ByteBuffer getWriteByteBuffer() { + return writeByteBuffer; + } + + public ClientAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) { + this.writeByteBuffer = writeByteBuffer; + return this; + } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public boolean isFailed() { + return failed; + } + + public ClientAttachment setFailed(boolean failed) { + this.failed = failed; + return this; + } + + public ConcurrentHashMap getTransferAttachments() { + return transferAttachments; + } + + // 代理连接断开消息发送 + public void disconnect(int uri) { + if (canWrited()) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.DISCONNECT); + message.setUri(uri); + ByteBuffer writeByteBuffer = Message.toByteBuffer(message); + getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer), getClientWriteNoReadCompletionHandler()); + } + } + + // HEARTBEAT + public void heartbeat() { + if (canWrited()) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.HEARTBEAT); + ByteBuffer writeByteBuffer = Message.toByteBuffer(message); + getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer), getClientWriteNoReadCompletionHandler()); + } + } + + public void close () { + try { + getClient().close(); + } catch (IOException e) { + } + ConcurrentHashMap tas = getTransferAttachments(); + Enumeration uris = tas.keys(); + if (uris != null) { + while (uris.hasMoreElements()) { + Integer uri = uris.nextElement(); + TransferAttachment ta = tas.remove(uri); + if (ta != null) { + ta.disconnect(); + } + } + } + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..d4d3cdf9ee9082ce5942445b6324c007f33c9182 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java @@ -0,0 +1,33 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class ClientEstablishmentCompletionHandler implements CompletionHandler { + + public void completed(Void result, ClientAttachment attachment) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.AUTH); + byte[] data = Cfg.getClientKey().getBytes(); + message.setData(data); + message.setSize(data.length); + ByteBuffer buffer = Message.toByteBuffer(message); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), attachment.getClientWriteCompletionHandler()); + } + } + + public void failed(Throwable exc, ClientAttachment attachment) { + System.out.println("establish error"); + attachment.setFailed(true); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..8ebdf5e840752dc6dcb6b8d802c85589d2f5068d --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java @@ -0,0 +1,113 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment; +import org.jiucheng.magpiebridge.transfer.aio.TransferEstablishmentCompletionHandler; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class ClientReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ClientAttachment attachment) { + if (result == -1) { + close(attachment); + return; + } + + ByteBuffer readByteBuffer = attachment.getReadByteBuffer(); + if (readByteBuffer.position() != readByteBuffer.capacity()) { + attachment.getClient().read(readByteBuffer, attachment, this); + return; + } + + if (readByteBuffer.capacity() == 13) { + // 验证消息头 + readByteBuffer.flip(); + int magic = readByteBuffer.getInt(); + byte type = readByteBuffer.get(); + int uri = readByteBuffer.getInt(); + int size = readByteBuffer.getInt(); + if (magic != Message.MAGIC) { + close(attachment); + return; + } + if (size > 0) { + readByteBuffer = ByteBuffer.allocate(size + 13); + readByteBuffer.putInt(magic); + readByteBuffer.put(type); + readByteBuffer.putInt(uri); + readByteBuffer.putInt(size); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + return; + } + } + + Message message = Message.fromByteBuffer(readByteBuffer); + byte type = message.getType(); + if (type == Message.Type.CONNECT) { + try { + handleConnectMessage(attachment, message); + } catch (IOException e) { + e.printStackTrace(); + attachment.disconnect(message.getUri()); + } + readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + } else if (type == Message.Type.HEARTBEAT) { + // 心跳不处理 + readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + } + } + + private void handleConnectMessage(ClientAttachment attachment, Message message) throws IOException { + int uri = message.getUri(); + // lp.io2c.com + String ipport = new String(message.getData()); + String[] strs = ipport.split(":"); + + SocketChannel realSocketChannel = null; + try { + realSocketChannel = SocketChannel.open(); + realSocketChannel.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1]))); + } catch (NumberFormatException e1) { + attachment.disconnect(uri); + return; + } catch (IOException e1) { + attachment.disconnect(uri); + return; + } + + AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); + client.setOption(StandardSocketOptions.SO_REUSEADDR, true); + client.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024); + TransferAttachment transferAttachment = new TransferAttachment().setClient(client).setUri(uri).setRealSocketChannel(realSocketChannel); + + transferAttachment.getClient().connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()), transferAttachment.setClientAttachment(attachment), new TransferEstablishmentCompletionHandler()); + } + + public void close(ClientAttachment attachment) { + attachment.setFailed(true); + try { + attachment.getClient().close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void failed(Throwable exc, ClientAttachment attachment) { + attachment.setFailed(true); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..1551bc0703b1999177984aafcec467b0dac1c1c5 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java @@ -0,0 +1,30 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ClientWriteCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ClientAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + + attachment.setWriteByteBuffer(null); + ByteBuffer readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + } + + public void failed(Throwable exc, ClientAttachment attachment) { + System.out.println("write error"); + attachment.setFailed(true); + attachment.writed.compareAndSet(true, false); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..16f9f1b806028f19c246f0b472f002d8e7ca18fe --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java @@ -0,0 +1,25 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.nio.channels.CompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ClientWriteNoReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ClientAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + } + + public void failed(Throwable exc, ClientAttachment attachment) { + System.out.println("write heartbeat error"); + attachment.setFailed(true); + attachment.writed.compareAndSet(true, false); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java deleted file mode 100644 index eb67f50e6c40234a8d767995d6875b3dd0234b3c..0000000000000000000000000000000000000000 --- a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java +++ /dev/null @@ -1,340 +0,0 @@ -package org.jiucheng.magpiebridge.client.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.util.Cfg; - -/** - * - * @author jiucheng - * - */ -public class Client { - - private static final Logger LOGGER = Logger.getLogger(Client.class.getName()); - - private static final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2); - private static final ExecutorService executorService = Executors.newCachedThreadPool(); - private static final Map realServers = new ConcurrentHashMap(); - - // 当前在写 - private static final AtomicBoolean writed = new AtomicBoolean(false); - private static volatile SocketChannel socketChannel; - private static volatile boolean runed = false; - - public static void main(String[] args) throws IOException, InterruptedException { - Cfg.loadProperties(Client.class); - schedule(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - shutdown(); - } - }); - startup(); - } - - private static void schedule() { - // 重连 - schedule.scheduleAtFixedRate(new Runnable() { - public void run() { - if (!runed) { - try { - startup(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - }, 60, 60, TimeUnit.SECONDS); - // 心跳 - schedule.scheduleWithFixedDelay(new Runnable() { - public void run() { - SocketChannel sc = Client.socketChannel; - if (sc != null) { - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.HEARTBEAT); - ByteBuffer buffer = Message.toByteBuffer(message); - while(!writed.compareAndSet(false, true)) {} - try { - while (buffer.hasRemaining()) { - sc.write(buffer); - } - } catch (IOException e) { - e.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } - } - }, 30L, 30L, TimeUnit.SECONDS); - } - - public static void startup() { - runed = true; - try { - socketChannel = SocketChannel.open(); - socketChannel.connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort())); - } catch (IOException e) { - e.printStackTrace(); - closeSocketChannel(); - return; - } - - auth(socketChannel); - - ByteBuffer headerByteBuffer = ByteBuffer.allocate(13); - while (true) { - headerByteBuffer.clear(); - ByteBuffer buf; - try { - int len = socketChannel.read(headerByteBuffer); - if (len == -1) { - closeSocketChannel(); - return; - } - while (headerByteBuffer.position() != headerByteBuffer.capacity()) { - socketChannel.read(headerByteBuffer); - } - - headerByteBuffer.flip(); - - int magic = headerByteBuffer.getInt(); - byte type = headerByteBuffer.get(); - int uri = headerByteBuffer.getInt(); - int size = headerByteBuffer.getInt(); - - buf = ByteBuffer.allocate(size + 13); - buf.putInt(magic); - buf.put(type); - buf.putInt(uri); - buf.putInt(size); - while (buf.position() != buf.capacity()) { - socketChannel.read(buf); - } - } catch (Exception e) { - e.printStackTrace(); - closeSocketChannel(); - return; - } - - Message message = Message.fromByteBuffer(buf); - byte type = message.getType(); - if (type == Message.Type.CONNECT) { - handleConnectMessage(socketChannel, message); - } else if (type == Message.Type.DISCONNECT) { - handleDisconnectMessage(socketChannel, message); - } else if (type == Message.Type.TRANSFER) { - handleTransferMessage(socketChannel, message); - } else if (type == Message.Type.HEARTBEAT) { - // 心跳不处理 - } - } - } - - public static void closeSocketChannel() { - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (IOException e) { - // e.printStackTrace(); - } - socketChannel = null; - } - for (Integer key : realServers.keySet()) { - SocketChannel socket = realServers.remove(key); - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - // e.printStackTrace(); - } - } - } - runed = false; - } - - public static void shutdown() { - schedule.shutdownNow(); - closeSocketChannel(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("shutdown"); - } - } - - private static void handleDisconnectMessage(final SocketChannel socketChannel, final Message proxyMessage) { - SocketChannel socket = realServers.remove(proxyMessage.getUri()); - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - // e.printStackTrace(); - } - } - } - - private static void handleTransferMessage(final SocketChannel socketChannel, final Message message) { - ByteBuffer byteBuffer = ByteBuffer.allocate(message.getData().length); - byteBuffer.put(message.getData()); - byteBuffer.flip(); - SocketChannel socket = realServers.get(message.getUri()); - if (socket != null) { - try { - while(byteBuffer.hasRemaining()) { - socket.write(byteBuffer); - } - } catch (Exception e) { - e.printStackTrace(); - realServers.remove(message.getUri()); - while(!writed.compareAndSet(false, true)) {} - try { - Message rmsg = new Message(); - rmsg.setMagic(Message.MAGIC); - rmsg.setType(Message.Type.DISCONNECT); - rmsg.setUri(message.getUri()); - ByteBuffer buffer = Message.toByteBuffer(rmsg); - while(buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e2) { - e2.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } - } - } - - private static void handleConnectMessage(final SocketChannel socketChannel, final Message message) { - int uri = message.getUri(); - // lp.io2c.com - String ipport = new String(message.getData()); - String[] strs = ipport.split(":"); - - SocketChannel realSocketChannel = null; - try { - realSocketChannel = SocketChannel.open(); - realSocketChannel.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1]))); - } catch (NumberFormatException e1) { - e1.printStackTrace(); - disconnect(socketChannel, uri); - return; - } catch (IOException e1) { - e1.printStackTrace(); - disconnect(socketChannel, uri); - return; - } - - realServers.put(uri, realSocketChannel); - - final Integer tmpUri = uri; - executorService.execute(new Runnable() { - private Integer uri = tmpUri; - public void run() { - ByteBuffer buf = ByteBuffer.allocate(4 * 1024 * 1024); - while(true) { - SocketChannel real = realServers.get(uri); - if (real == null) { - return; - } - int len; - try { - len = real.read(buf); - } catch (IOException e) { - realServers.remove(uri); - disconnect(socketChannel, uri); - // e.printStackTrace(); - return; - } - if (len == -1) { - realServers.remove(uri); - disconnect(socketChannel, uri); - return; - } else if (len > 0) { - buf.flip(); - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.TRANSFER); - message.setUri(uri); - byte[] bts = new byte[len]; - buf.get(bts); - message.setData(bts); - message.setSize(bts.length); - ByteBuffer buffer = Message.toByteBuffer(message); - while(!writed.compareAndSet(false, true)) {} - try { - while(buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e) { - e.printStackTrace(); - closeSocketChannel(); - return; - } finally { - writed.compareAndSet(true, false); - } - buf.clear(); - } - } - } - }); - } - - // 代理连接断开消息发送 - private static void disconnect(SocketChannel socketChannel, int uri) { - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.DISCONNECT); - message.setUri(uri); - ByteBuffer buffer = Message.toByteBuffer(message); - while (!writed.compareAndSet(false, true)) {} - try { - while(buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e2) { - e2.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } - - // 认证信息发送 - private static void auth(final SocketChannel socketChannel) { - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.AUTH); - byte[] data = Cfg.getClientKey().getBytes(); - message.setData(data); - message.setSize(data.length); - ByteBuffer buffer = Message.toByteBuffer(message); - while(!writed.compareAndSet(false, true)) {} - try { - while (buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e) { - e.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } -} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java new file mode 100644 index 0000000000000000000000000000000000000000..23518c9605b6c81caa7baf4caa63964eedeed227 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java @@ -0,0 +1,151 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jiucheng.magpiebridge.client.aio.ClientAttachment; +import org.jiucheng.magpiebridge.protocol.Message; + +/** + * + * @author jiucheng + * + */ +public class TransferAttachment { + // 当前在写 + public final AtomicBoolean writed = new AtomicBoolean(false); + + private AsynchronousSocketChannel client; + private ByteBuffer writeByteBuffer; + private ByteBuffer readByteBuffer; + private int uri; + + private SocketChannel realSocketChannel; + private boolean failed; + + private ClientAttachment clientAttachment; + + public ClientAttachment getClientAttachment() { + return clientAttachment; + } + + public TransferAttachment setClientAttachment(ClientAttachment clientAttachment) { + this.clientAttachment = clientAttachment; + return this; + } + + public boolean isFailed() { + return failed; + } + + public TransferAttachment setFailed(boolean failed) { + this.failed = failed; + return this; + } + + public SocketChannel getRealSocketChannel() { + return realSocketChannel; + } + + public TransferAttachment setRealSocketChannel(SocketChannel realSocketChannel) { + this.realSocketChannel = realSocketChannel; + return this; + } + + public int getUri() { + return uri; + } + + public TransferAttachment setUri(int uri) { + this.uri = uri; + return this; + } + + public AsynchronousSocketChannel getClient() { + return client; + } + + public TransferAttachment setClient(AsynchronousSocketChannel client) { + this.client = client; + return this; + } + + public ByteBuffer getWriteByteBuffer() { + return writeByteBuffer; + } + + public TransferAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) { + this.writeByteBuffer = writeByteBuffer; + return this; + } + + public ByteBuffer getReadByteBuffer() { + return readByteBuffer; + } + + public TransferAttachment setReadByteBuffer(ByteBuffer readByteBuffer) { + this.readByteBuffer = readByteBuffer; + return this; + } + + public boolean canReaded() { + while (!writed.compareAndSet(false, false)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public void close() { + if (clientAttachment != null) { + clientAttachment.getTransferAttachments().remove(getUri()); + clientAttachment = null; + } + if (realSocketChannel != null) { + try { + realSocketChannel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + realSocketChannel = null; + } + if (client != null) { + try { + client.close(); + } catch (IOException e) { + e.printStackTrace(); + } + client = null; + } + } + + // 代理连接断开消息发送 + public void disconnect() { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.DISCONNECT); + message.setUri(getUri()); + ByteBuffer writeByteBuffer = Message.toByteBuffer(message); + if (canWrited()) { + getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer).setFailed(true), new TransferWriteCompletionHandler()); + } + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..7701e7964bbde3e7112721f1cc82a9431291b821 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java @@ -0,0 +1,74 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.util.ThreadManager; + +/** + * + * @author jiucheng + * + */ +public class TransferConnectedCompletionHandler implements CompletionHandler { + + public void completed(Integer result, final TransferAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + + attachment.setWriteByteBuffer(null); + ByteBuffer readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), new TransferReadCompletionHandler()); + + ThreadManager.singleton().execute(new Runnable() { + public void run() { + ByteBuffer buf = ByteBuffer.allocate(4 * 1024 * 1024); + while(true) { + if (attachment.canReaded()) { + SocketChannel real = attachment.getRealSocketChannel(); + if (real == null) { + return; + } + int len; + try { + len = real.read(buf); + } catch (IOException e) { + attachment.disconnect(); + return; + } + if (len == -1) { + attachment.disconnect(); + return; + } else if (len > 0) { + buf.flip(); + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.TRANSFER); + message.setUri(attachment.getUri()); + byte[] bts = new byte[len]; + buf.get(bts); + message.setData(bts); + message.setSize(bts.length); + ByteBuffer buffer = Message.toByteBuffer(message); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferWriteCompletionHandler()); + buf.clear(); + } + } + } + } + } + }); + } + + public void failed(Throwable exc, TransferAttachment attachment) { + attachment.writed.compareAndSet(true, false); + attachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..97054c8cfce0ccd36da788ca2b58467ff9b2dcec --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java @@ -0,0 +1,38 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +import org.jiucheng.magpiebridge.client.aio.ClientAttachment; +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class TransferEstablishmentCompletionHandler implements CompletionHandler { + + public void completed(Void result, TransferAttachment attachment) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.CONNECT); + message.setUri(attachment.getUri()); + byte[] data = Cfg.getClientKey().getBytes(); + message.setData(data); + message.setSize(data.length); + ByteBuffer buffer = Message.toByteBuffer(message); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferConnectedCompletionHandler()); + } + } + + public void failed(Throwable exc, TransferAttachment attachment) { + ClientAttachment clientAttachment = attachment.getClientAttachment(); + if (clientAttachment != null) { + clientAttachment.disconnect(attachment.getUri()); + } + attachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..955c52c2278d31d2254e1e958f05484382d3cde0 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java @@ -0,0 +1,90 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; + +import org.jiucheng.magpiebridge.protocol.Message; + +/** + * + * @author jiucheng + * + */ +public class TransferReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, TransferAttachment attachment) { + // client关闭连接 + if (result == -1) { + // close(attachment); + attachment.close(); + return; + } + ByteBuffer readByteBuffer = attachment.getReadByteBuffer(); + if (readByteBuffer.position() != readByteBuffer.capacity()) { + attachment.getClient().read(readByteBuffer, attachment, this); + return; + } + if (readByteBuffer.capacity() == 13) { + // 验证消息头 + readByteBuffer.flip(); + int magic = readByteBuffer.getInt(); + byte type = readByteBuffer.get(); + int uri = readByteBuffer.getInt(); + int size = readByteBuffer.getInt(); + if (magic != Message.MAGIC) { + // close(attachment); + attachment.close(); + return; + } + if (size > 0) { + readByteBuffer = ByteBuffer.allocate(size + 13); + readByteBuffer.putInt(magic); + readByteBuffer.put(type); + readByteBuffer.putInt(uri); + readByteBuffer.putInt(size); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), this); + return; + } + } + + Message message = Message.fromByteBuffer(readByteBuffer); + byte type = message.getType(); + if (type == Message.Type.TRANSFER) { + handleTransferMessage(attachment, message); + } else if (type == Message.Type.HEARTBEAT) { + // 心跳不处理 + attachment.setReadByteBuffer(ByteBuffer.allocate(13)); + attachment.getClient().read(attachment.getReadByteBuffer(), attachment, this); + } + } + + private void handleTransferMessage(TransferAttachment attachment, final Message message) { + ByteBuffer byteBuffer = ByteBuffer.allocate(message.getData().length); + byteBuffer.put(message.getData()); + byteBuffer.flip(); + SocketChannel socket = attachment.getRealSocketChannel(); + if (socket != null) { + try { + while(byteBuffer.hasRemaining()) { + socket.write(byteBuffer); + } + attachment.setReadByteBuffer(ByteBuffer.allocate(13)); + attachment.getClient().read(attachment.getReadByteBuffer(), attachment, this); + } catch (Exception e) { + Message rmsg = new Message(); + rmsg.setMagic(Message.MAGIC); + rmsg.setType(Message.Type.DISCONNECT); + rmsg.setUri(message.getUri()); + ByteBuffer buffer = Message.toByteBuffer(rmsg); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferWriteCompletionHandler()); + } + } + } + } + + public void failed(Throwable exc, TransferAttachment attachment) { + attachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..0538f481cdeebc6a3680e549ff4a118012d0aeb1 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java @@ -0,0 +1,28 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.nio.channels.CompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class TransferWriteCompletionHandler implements CompletionHandler { + + public void completed(Integer result, TransferAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + + if (attachment.isFailed()) { + attachment.close(); + } + } + + public void failed(Throwable exc, TransferAttachment attachment) { + attachment.writed.compareAndSet(true, false); + attachment.close(); + } +} diff --git a/magpiebridge-common/pom.xml b/magpiebridge-common/pom.xml index 1f948a802567bb4897775b2cf4591cdf2a744317..ccbd09e28fbf9791958804aa7ac32e55f04d5ac2 100644 --- a/magpiebridge-common/pom.xml +++ b/magpiebridge-common/pom.xml @@ -4,7 +4,7 @@ org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 magpiebridge-common diff --git a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java deleted file mode 100644 index 7fdd9fc710fa7655a494a013e9d259ae28b247c3..0000000000000000000000000000000000000000 --- a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.jiucheng.magpiebridge.schedule; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ScheduleManager { - - private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); - - public static void schedule(Runnable runnable) { - scheduledExecutorService.scheduleAtFixedRate(runnable, 30L, 30L, TimeUnit.MICROSECONDS); - } -} diff --git a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java new file mode 100644 index 0000000000000000000000000000000000000000..d9a8a04797f2f37d055e910efc2d7a11f4d398d6 --- /dev/null +++ b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java @@ -0,0 +1,18 @@ +package org.jiucheng.magpiebridge.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * + * @author jiucheng + * + */ +public class ThreadManager { + + private static final ExecutorService executorService = Executors.newCachedThreadPool(); + + public static ExecutorService singleton() { + return executorService; + } +} diff --git a/magpiebridge-server/pom.xml b/magpiebridge-server/pom.xml index 0c87bafaf8613c80c6d4f291c3f7fac92271dea3..edbef9abffd0c472472f556b2a4bbaeddb0cd526 100644 --- a/magpiebridge-server/pom.xml +++ b/magpiebridge-server/pom.xml @@ -4,7 +4,7 @@ org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 magpiebridge-server diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java deleted file mode 100644 index 4c7ac5185a1929bd4c66068f6fb2b8850ce52bae..0000000000000000000000000000000000000000 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java +++ /dev/null @@ -1,98 +0,0 @@ -package org.jiucheng.magpiebridge.server.aio; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousServerSocketChannel; -import java.nio.channels.AsynchronousSocketChannel; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteAttachment; - -/** - * Client附件 - * - * @author jiucheng - * - */ -public class ClientAttachment { - // 当前在写 - public final AtomicBoolean writed = new AtomicBoolean(false); - - // 代理服务 - private List asynchronousServerSocketChannels = new ArrayList(); - // 代理连接 - private final ConcurrentMap proxys = new ConcurrentHashMap(); - - // Client连接 - AsynchronousSocketChannel asynchronousSocketChannel; - // Client读入数据 - ByteBuffer readBuffer; - - public ClientAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { - this.asynchronousSocketChannel = asynchronousSocketChannel; - } - - public ByteBuffer getReadBuffer() { - return readBuffer; - } - - public ClientAttachment setReadBuffer(ByteBuffer readBuffer) { - this.readBuffer = readBuffer; - return this; - } - - public AsynchronousSocketChannel getAsynchronousSocketChannel() { - return asynchronousSocketChannel; - } - - public void putProxy(Integer uri, ProxyClientWriteAttachment proxy) { - proxys.put(uri, proxy); - } - - public ProxyClientWriteAttachment getProxy(Integer uri) { - return proxys.get(uri); - } - - public ProxyClientWriteAttachment removeProxy(Integer uri) { - return proxys.remove(uri); - } - - public void addProxyServer(AsynchronousServerSocketChannel asynchronousServerSocketChannel) { - asynchronousServerSocketChannels.add(asynchronousServerSocketChannel); - } - - public void close() { - Set keys = proxys.keySet(); - for (Integer key : keys) { - ProxyClientWriteAttachment proxyWriteAttachment = proxys.remove(key); - if (proxyWriteAttachment != null && proxyWriteAttachment.getAsynchronousSocketChannel() != null) { - try { - proxyWriteAttachment.getAsynchronousSocketChannel().close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - for (AsynchronousServerSocketChannel asynchronousServerSocketChannel : asynchronousServerSocketChannels) { - if (asynchronousServerSocketChannel != null) { - try { - asynchronousServerSocketChannel.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - if (asynchronousSocketChannel != null) { - try { - asynchronousSocketChannel.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } -} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientWriteCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientWriteCompletionHandler.java deleted file mode 100644 index 1887c2f54d23bfb0fdfd39703b8148446888378c..0000000000000000000000000000000000000000 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientWriteCompletionHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.jiucheng.magpiebridge.server.aio; - -import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; - -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientCompletionHandler; - -public class ClientWriteCompletionHandler implements CompletionHandler { - - public void completed(Integer result, ClientWriteAttachment attachment) { - if (attachment.getWriteBuffer().hasRemaining()) { - attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(attachment.getWriteBuffer(), attachment, this); - return; - } - - attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(true, false); - - if (attachment.getProxyAsynchronousSocketChannel() != null) { - attachment.setReadByteBuffer(ByteBuffer.allocate(4 * 1024 * 1024)); - attachment.getProxyAsynchronousSocketChannel().read(attachment.getReadByteBuffer(), attachment, new ProxyClientCompletionHandler()); - } - } - - public void failed(Throwable exc, ClientWriteAttachment attachment) { - attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(true, false); - exc.printStackTrace(); - } -} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java index 7d80a30846eaac6de4b80aea9ef21550434ec8eb..63866c9d2ad0e9059150426e23a5f7530d139834 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java @@ -5,12 +5,11 @@ import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; import org.jiucheng.magpiebridge.util.Cfg; +import org.jiucheng.magpiebridge.util.ThreadManager; /** * @@ -18,23 +17,19 @@ import org.jiucheng.magpiebridge.util.Cfg; * */ public class Server { - private static final Logger LOGGER = Logger.getLogger(Server.class.getName()); private static final Object wait = new Object(); - public static final int READ_TIMEOUT = Integer.MAX_VALUE; - public static void main(String[] args) throws IOException, InterruptedException { Cfg.loadProperties(Server.class); - ExecutorService executorService = Executors.newCachedThreadPool(); - AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService); + AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(ThreadManager.singleton()); final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group); server.setOption(StandardSocketOptions.SO_REUSEADDR, true); server.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024); server.bind(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort())); - server.accept(new ServerAttachment(server), new ServerCompletionHandler()); + server.accept(server, new ServerEstablishmentCompletionHandler()); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { @@ -54,6 +49,7 @@ public class Server { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, "Server started"); } + synchronized (wait) { wait.wait(); } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java index ecc43dcc3c7d50534eb8c87f9254cc145d94a316..1c61cda276a23d8e4c22478819b0cae3e65ee6c2 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java @@ -1,16 +1,134 @@ package org.jiucheng.magpiebridge.server.aio; +import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyAttachment; + +/** + * + * @author jiucheng + * + */ public class ServerAttachment { + // 代理服务 + private List asynchronousServerSocketChannels = new ArrayList(); + // 代理连接 + public static final ConcurrentMap proxys = new ConcurrentHashMap(); + + // Client连接 + AsynchronousSocketChannel server; + // Client读入数据 + ByteBuffer readBuffer; + // Client写入数据 + ByteBuffer writeBuffer; + // 当前在写 + public final AtomicBoolean writed = new AtomicBoolean(false); - AsynchronousServerSocketChannel server; + private ProxyAttachment proxyAttachment; + private boolean mastered; - public ServerAttachment(AsynchronousServerSocketChannel server) { + public ServerAttachment(AsynchronousSocketChannel server) { this.server = server; } - public AsynchronousServerSocketChannel getServer() { + public boolean isMastered() { + return mastered; + } + + public ServerAttachment setMastered(boolean mastered) { + this.mastered = mastered; + return this; + } + + public ProxyAttachment getProxyAttachment() { + return proxyAttachment; + } + + public ServerAttachment setProxyAttachment(ProxyAttachment proxyAttachment) { + this.proxyAttachment = proxyAttachment; + return this; + } + + public ByteBuffer getWriteBuffer() { + return writeBuffer; + } + + public ServerAttachment setWriteBuffer(ByteBuffer writeBuffer) { + this.writeBuffer = writeBuffer; + return this; + } + + public ByteBuffer getReadBuffer() { + return readBuffer; + } + + public ServerAttachment setReadBuffer(ByteBuffer readBuffer) { + this.readBuffer = readBuffer; + return this; + } + + public AsynchronousSocketChannel getServer() { return server; } + + public void putProxy(Integer uri, ProxyAttachment proxy) { + proxys.put(uri, proxy); + } + + public ProxyAttachment getProxy(Integer uri) { + return proxys.get(uri); + } + + public ProxyAttachment removeProxy(Integer uri) { + return proxys.remove(uri); + } + + public void addProxyServer(AsynchronousServerSocketChannel asynchronousServerSocketChannel) { + asynchronousServerSocketChannels.add(asynchronousServerSocketChannel); + } + + public void close() { + if (server != null) { + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (mastered) { + for (AsynchronousServerSocketChannel asynchronousServerSocketChannel : asynchronousServerSocketChannels) { + if (asynchronousServerSocketChannel != null) { + try { + asynchronousServerSocketChannel.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } else { + if (proxyAttachment != null) { + proxyAttachment.close(); + } + } + } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java similarity index 32% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java index e2ab76dd89a246907df1b93d3de3458ddc127996..2059fd8c81a0906f67832250fbb17cba4dd8e9cc 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java @@ -1,20 +1,25 @@ package org.jiucheng.magpiebridge.server.aio; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; -import java.util.concurrent.TimeUnit; -public class ServerCompletionHandler implements CompletionHandler { +/** + * + * @author jiucheng + * + */ +public class ServerEstablishmentCompletionHandler implements CompletionHandler { - public void completed(AsynchronousSocketChannel result, ServerAttachment attachment) { - attachment.getServer().accept(attachment, this); + public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) { + attachment.accept(attachment, this); ByteBuffer readBuffer = ByteBuffer.allocate(13); - result.read(readBuffer, Server.READ_TIMEOUT, TimeUnit.SECONDS, new ClientAttachment(result).setReadBuffer(readBuffer), new ClientCompletionHandler()); + result.read(readBuffer, new ServerAttachment(result).setReadBuffer(readBuffer), new ServerReadCompletionHandler()); } - public void failed(Throwable exc, ServerAttachment attachment) { + public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { exc.printStackTrace(); } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java similarity index 54% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java index efdbdf462ffe47052d1255eb5ff78c880cdf82b5..85892048052f6ac65682ab1b28ab541f48993cd9 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java @@ -10,31 +10,29 @@ import java.nio.channels.CompletionHandler; import java.text.MessageFormat; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteAttachment; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteCompletionHandler; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyServerAttachment; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyServerCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyReadCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyAttachment; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyWriteCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentAttachment; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentCompletionHandler; import org.jiucheng.magpiebridge.util.Cfg; +import org.jiucheng.magpiebridge.util.ThreadManager; /** - * Client数据解析处理 * * @author jiucheng * */ -public class ClientCompletionHandler implements CompletionHandler { - - private static final Logger LOGGER = Logger.getLogger(ClientCompletionHandler.class.getName()); +public class ServerReadCompletionHandler implements CompletionHandler { + private static final Logger LOGGER = Logger.getLogger(ServerReadCompletionHandler.class.getName()); // Client连接 - private static final ConcurrentMap CLIENTS = new ConcurrentHashMap(); + private static final ConcurrentMap CLIENTS = new ConcurrentHashMap(); - public void completed(Integer result, ClientAttachment attachment) { + public void completed(Integer result, ServerAttachment attachment) { // client关闭连接 if (result == -1) { close(attachment); @@ -43,7 +41,7 @@ public class ClientCompletionHandler implements CompletionHandler 0) { readByteBuffer = ByteBuffer.allocate(size + 13); readByteBuffer.putInt(magic); readByteBuffer.put(type); readByteBuffer.putInt(uri); readByteBuffer.putInt(size); - attachment.getAsynchronousSocketChannel().read(readByteBuffer, Server.READ_TIMEOUT, TimeUnit.SECONDS, attachment.setReadBuffer(readByteBuffer), this); + attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this); return; } } @@ -79,47 +77,60 @@ public class ClientCompletionHandler implements CompletionHandler remote={1}", local, remote)); } proxy.bind(new InetSocketAddress(locals[0], Integer.valueOf(locals[1]))); - proxy.accept(new ProxyServerAttachment(proxy).setClientAttachment(attachment).setRemote(remote), new ProxyServerCompletionHandler()); + proxy.accept(new ProxyEstablishmentAttachment(proxy).setServerAttachment(attachment).setRemote(remote), new ProxyEstablishmentCompletionHandler()); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, MessageFormat.format("----end port={0} -> remote={1}", local, remote)); } @@ -156,11 +167,11 @@ public class ClientCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ServerAttachment attachment) { + if (attachment.getWriteBuffer().hasRemaining()) { + attachment.getServer().write(attachment.getWriteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + + ProxyAttachment proxyAttachment = attachment.getProxyAttachment(); + if (proxyAttachment != null) { + proxyAttachment.setReadBuffer(ByteBuffer.allocate(4 * 1024 * 1024)); + proxyAttachment.getAsynchronousSocketChannel().read(proxyAttachment.getReadBuffer(), proxyAttachment, new ProxyReadCompletionHandler()); + } + } + + public void failed(Throwable exc, ServerAttachment attachment) { + attachment.writed.compareAndSet(true, false); + exc.printStackTrace(); + } +} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java similarity index 31% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteCompletionHandler.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java index 08f2399e3a48d901c681aa4f1cc2961f6f0aa321..7094d5aa146d23949b6884901cd8f1bd4f46aeb6 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java @@ -1,18 +1,23 @@ -package org.jiucheng.magpiebridge.server.aio.proxy; +package org.jiucheng.magpiebridge.server.aio; import java.nio.channels.CompletionHandler; -public class ProxyClientWriteCompletionHandler implements CompletionHandler { +/** + * + * @author jiucheng + * + */ +public class ServerWriteNoProxyClientReadCompletionHandler implements CompletionHandler { - public void completed(Integer result, ProxyClientWriteAttachment attachment) { - if (attachment.getWriteByteBuffer().hasRemaining()) { - attachment.getAsynchronousSocketChannel().write(attachment.getWriteByteBuffer(), attachment, this); + public void completed(Integer result, ServerAttachment attachment) { + if (attachment.getWriteBuffer().hasRemaining()) { + attachment.getServer().write(attachment.getWriteBuffer(), attachment, this); return; } attachment.writed.compareAndSet(true, false); } - public void failed(Throwable exc, ProxyClientWriteAttachment attachment) { + public void failed(Throwable exc, ServerAttachment attachment) { attachment.writed.compareAndSet(true, false); exc.printStackTrace(); } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java similarity index 31% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteAttachment.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java index 28f2ff9cd1870cafbf9b13b9de45e30629903720..1539b832a7a94a39d4cdaea1695f7a52794895d4 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientWriteAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java @@ -1,10 +1,12 @@ package org.jiucheng.magpiebridge.server.aio.proxy; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.jiucheng.magpiebridge.server.aio.ClientAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; /** * 代理连接附件 @@ -12,17 +14,20 @@ import org.jiucheng.magpiebridge.server.aio.ClientAttachment; * @author jiucheng * */ -public class ProxyClientWriteAttachment { +public class ProxyAttachment { // 当前在写 public final AtomicBoolean writed = new AtomicBoolean(false); // 代理连接 private AsynchronousSocketChannel asynchronousSocketChannel; // 待写入数据 - private ByteBuffer writeByteBuffer; + private ByteBuffer readBuffer; + // 待写入数据 + private ByteBuffer writeBuffer; // Client附件 - private ClientAttachment clientAttachment; + private ServerAttachment serverAttachment; + private int uri; - public ProxyClientWriteAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { + public ProxyAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { this.asynchronousSocketChannel = asynchronousSocketChannel; } @@ -30,21 +35,62 @@ public class ProxyClientWriteAttachment { return asynchronousSocketChannel; } - public ByteBuffer getWriteByteBuffer() { - return writeByteBuffer; + public int getUri() { + return uri; + } + + public ProxyAttachment setUri(int uri) { + this.uri = uri; + return this; + } + + public ByteBuffer getReadBuffer() { + return readBuffer; + } + + public ProxyAttachment setReadBuffer(ByteBuffer readBuffer) { + this.readBuffer = readBuffer; + return this; + } + + public ByteBuffer getWriteBuffer() { + return writeBuffer; } - public ProxyClientWriteAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) { - this.writeByteBuffer = writeByteBuffer; + public ProxyAttachment setWriteBuffer(ByteBuffer writeBuffer) { + this.writeBuffer = writeBuffer; return this; } - public ClientAttachment getClientAttachment() { - return clientAttachment; + public ServerAttachment getServerAttachment() { + return serverAttachment; } - public ProxyClientWriteAttachment setClientAttachment(ClientAttachment clientAttachment) { - this.clientAttachment = clientAttachment; + public ProxyAttachment setServerAttachment(ServerAttachment serverAttachment) { + this.serverAttachment = serverAttachment; return this; } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public void close() { + ServerAttachment.proxys.remove(uri); + AsynchronousSocketChannel proxy = getAsynchronousSocketChannel(); + if (proxy != null) { + try { + proxy.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientCompletionHandler.java deleted file mode 100644 index 39d752f97498aa8d8bcac3d394770acaa3020716..0000000000000000000000000000000000000000 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyClientCompletionHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.jiucheng.magpiebridge.server.aio.proxy; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; - -import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.server.aio.ClientWriteAttachment; -import org.jiucheng.magpiebridge.server.aio.ClientWriteCompletionHandler; - -public class ProxyClientCompletionHandler implements CompletionHandler { - - public void completed(Integer result, ClientWriteAttachment attachment) { - if (result == -1) { - try { - attachment.getProxyAsynchronousSocketChannel().close(); - } catch (IOException e) { - e.printStackTrace(); - } - - Message proxym = new Message(); - proxym.setMagic(Message.MAGIC); - proxym.setType(Message.Type.DISCONNECT); - proxym.setUri(attachment.getUri()); - ByteBuffer writeBuffer = Message.toByteBuffer(proxym); - while (!attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(false, true)) {} - attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, attachment.setWriteBuffer(writeBuffer).setProxyAsynchronousSocketChannel(null), new ClientWriteCompletionHandler()); - return; - } - ByteBuffer readByteBuffer = attachment.getReadByteBuffer(); - readByteBuffer.flip(); - - Message message = new Message(); - message.setType(Message.Type.TRANSFER); - message.setUri(attachment.getUri()); - byte[] bts = new byte[result]; - readByteBuffer.get(bts); - message.setData(bts); - message.setSize(bts.length); - - ByteBuffer writeBuffer = Message.toByteBuffer(message); - while (!attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(false, true)) {} - attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, attachment.setWriteBuffer(writeBuffer), new ClientWriteCompletionHandler()); - return; - } - - public void failed(Throwable exc, ClientWriteAttachment attachment) { - exc.printStackTrace(); - } -} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java similarity index 31% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerAttachment.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java index 41028b71ddfe00f690ad0296dc512d497366fea7..c8312e2787394d9880d47b3faefd03ed8b802b9e 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java @@ -2,32 +2,37 @@ package org.jiucheng.magpiebridge.server.aio.proxy; import java.nio.channels.AsynchronousServerSocketChannel; -import org.jiucheng.magpiebridge.server.aio.ClientAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; -public class ProxyServerAttachment { +/** + * + * @author jiucheng + * + */ +public class ProxyEstablishmentAttachment { - private AsynchronousServerSocketChannel proxyServer; - private ClientAttachment clientAttachment; + private AsynchronousServerSocketChannel proxy; + private ServerAttachment serverAttachment; private String remote; - public ProxyServerAttachment(AsynchronousServerSocketChannel proxyServer) { - this.proxyServer = proxyServer; + public ProxyEstablishmentAttachment(AsynchronousServerSocketChannel proxy) { + this.proxy = proxy; } - public AsynchronousServerSocketChannel getProxyServer() { - return proxyServer; + public AsynchronousServerSocketChannel getProxy() { + return proxy; } - public ClientAttachment getClientAttachment() { - return clientAttachment; + public ServerAttachment getServerAttachment() { + return serverAttachment; } - public ProxyServerAttachment setClientAttachment(ClientAttachment clientAttachment) { - this.clientAttachment = clientAttachment; + public ProxyEstablishmentAttachment setServerAttachment(ServerAttachment serverAttachment) { + this.serverAttachment = serverAttachment; return this; } - public ProxyServerAttachment setRemote(String remort) { + public ProxyEstablishmentAttachment setRemote(String remort) { this.remote = remort; return this; } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..450209882f060737eae57507307b67e23203319b --- /dev/null +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java @@ -0,0 +1,42 @@ +package org.jiucheng.magpiebridge.server.aio.proxy; + +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.atomic.AtomicInteger; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ProxyEstablishmentCompletionHandler implements CompletionHandler { + private static final AtomicInteger CLIENTS_ID = new AtomicInteger(); + + public void completed(AsynchronousSocketChannel result, ProxyEstablishmentAttachment attachment) { + attachment.getProxy().accept(attachment, this); + + if (attachment.getServerAttachment().canWrited()) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.CONNECT); + message.setUri(CLIENTS_ID.incrementAndGet()); + byte[] data = attachment.getRemote().getBytes(); + message.setData(data); + message.setSize(data.length); + ByteBuffer writeBuffer = Message.toByteBuffer(message); + + ServerAttachment.proxys.put(message.getUri(), new ProxyAttachment(result).setUri(message.getUri()).setServerAttachment(attachment.getServerAttachment())); + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); + } + } + + public void failed(Throwable exc, ProxyEstablishmentAttachment attachment) { + // attachment.getServerAttachment().writed.compareAndSet(true, false); + System.out.println("proxy server failed"); + } +} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..21232b438fdb396af36fcb29075d868daa804d75 --- /dev/null +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java @@ -0,0 +1,67 @@ +package org.jiucheng.magpiebridge.server.aio.proxy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.server.aio.ServerWriteCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ProxyReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ProxyAttachment attachment) { + if (result == -1) { + try { + attachment.getAsynchronousSocketChannel().close(); + } catch (IOException e) { + e.printStackTrace(); + } + if (attachment.getServerAttachment().canWrited()) { + Message proxym = new Message(); + proxym.setMagic(Message.MAGIC); + proxym.setType(Message.Type.DISCONNECT); + proxym.setUri(attachment.getUri()); + ByteBuffer writeBuffer = Message.toByteBuffer(proxym); + + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); + return; + } + } + + if (attachment.getServerAttachment().canWrited()) { + ByteBuffer readByteBuffer = attachment.getReadBuffer(); + readByteBuffer.flip(); + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.TRANSFER); + message.setUri(attachment.getUri()); + byte[] bts = new byte[result]; + readByteBuffer.get(bts); + message.setData(bts); + message.setSize(bts.length); + ByteBuffer writeBuffer = Message.toByteBuffer(message); + + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteCompletionHandler()); + } + } + + public void failed(Throwable exc, ProxyAttachment attachment) { + if (attachment.getServerAttachment().canWrited()) { + Message proxym = new Message(); + proxym.setMagic(Message.MAGIC); + proxym.setType(Message.Type.DISCONNECT); + proxym.setUri(attachment.getUri()); + ByteBuffer writeBuffer = Message.toByteBuffer(proxym); + + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); + return; + } + exc.printStackTrace(); + } +} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerCompletionHandler.java deleted file mode 100644 index c4a63dacc67fd7b21231faa678253a2a0712d59d..0000000000000000000000000000000000000000 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyServerCompletionHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.jiucheng.magpiebridge.server.aio.proxy; - -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.CompletionHandler; -import java.util.concurrent.atomic.AtomicInteger; - -import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.server.aio.ClientWriteAttachment; -import org.jiucheng.magpiebridge.server.aio.ClientWriteCompletionHandler; - -public class ProxyServerCompletionHandler implements CompletionHandler { - - private static final AtomicInteger CLIENTS_ID = new AtomicInteger(); - - public void completed(AsynchronousSocketChannel result, ProxyServerAttachment attachment) { - attachment.getProxyServer().accept(attachment, this); - - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.CONNECT); - message.setUri(CLIENTS_ID.incrementAndGet()); - byte[] data = attachment.getRemote().getBytes(); - message.setData(data); - message.setSize(data.length); - - ByteBuffer writeBuffer = Message.toByteBuffer(message); - attachment.getClientAttachment().putProxy(message.getUri(), new ProxyClientWriteAttachment(result).setClientAttachment(attachment.getClientAttachment())); - - while (!attachment.getClientAttachment().writed.compareAndSet(false, true)) {} - attachment.getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, new ClientWriteAttachment(writeBuffer, attachment).setProxyAsynchronousSocketChannel(result).setUri(message.getUri()), new ClientWriteCompletionHandler()); - } - - public void failed(Throwable exc, ProxyServerAttachment attachment) { - attachment.getClientAttachment().writed.compareAndSet(true, false); - System.out.println("proxy server failed"); - } -} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..b55ea0bb11e05cc2ed2c2e988a9fafe88fb23509 --- /dev/null +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java @@ -0,0 +1,42 @@ +package org.jiucheng.magpiebridge.server.aio.proxy; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ProxyWriteCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ProxyAttachment attachment) { + if (attachment.getWriteBuffer().hasRemaining()) { + attachment.getAsynchronousSocketChannel().write(attachment.getWriteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + } + + public void failed(Throwable exc, ProxyAttachment attachment) { + attachment.writed.compareAndSet(true, false); + exc.printStackTrace(); + if (attachment.getServerAttachment().canWrited()) { + Message proxym = new Message(); + proxym.setMagic(Message.MAGIC); + proxym.setType(Message.Type.DISCONNECT); + proxym.setUri(attachment.getUri()); + ByteBuffer writeBuffer = Message.toByteBuffer(proxym); + + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); + } + ServerAttachment serverAttachment = attachment.getServerAttachment(); + if (serverAttachment != null) { + serverAttachment.close(); + } + } +} diff --git a/pom.xml b/pom.xml index 29e7396dffa389e53ca4c7fb7d246ab5fb7b2582..14770f4262e13d602db51554eb33813d06b3a41f 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 pom magpiebridge-common @@ -16,7 +16,7 @@ org.jiucheng.magpiebridge magpiebridge-common - 0.0.1 + 1.0.0 junit