From 06680072f516c3ad4f6529e596fccb36593392ea Mon Sep 17 00:00:00 2001 From: jiucheng Date: Mon, 29 Jul 2019 00:12:58 +0800 Subject: [PATCH 1/3] dev_1.0.1 --- magpiebridge-client/.gitignore | 1 + magpiebridge-client/pom.xml | 2 +- magpiebridge-client/src/main/bin/startup.sh | 2 +- .../magpiebridge/client/aio/Client.java | 84 +++++ .../client/aio/ClientAttachment.java | 141 ++++++++ .../ClientEstablishmentCompletionHandler.java | 33 ++ .../aio/ClientReadCompletionHandler.java | 113 ++++++ .../aio/ClientWriteCompletionHandler.java | 30 ++ .../ClientWriteNoReadCompletionHandler.java | 25 ++ .../magpiebridge/client/nio/Client.java | 340 ------------------ .../transfer/aio/TransferAttachment.java | 151 ++++++++ .../TransferConnectedCompletionHandler.java | 74 ++++ ...ransferEstablishmentCompletionHandler.java | 38 ++ .../aio/TransferReadCompletionHandler.java | 90 +++++ .../aio/TransferWriteCompletionHandler.java | 28 ++ magpiebridge-common/pom.xml | 2 +- .../schedule/ScheduleManager.java | 14 - .../magpiebridge/util/ThreadManager.java | 18 + magpiebridge-server/pom.xml | 2 +- .../magpiebridge/server/aio/Server.java | 12 +- ...ava => ServerEstablishmentAttachment.java} | 12 +- ...ServerEstablishmentCompletionHandler.java} | 14 +- ...achment.java => ServerReadAttachment.java} | 31 +- ....java => ServerReadCompletionHandler.java} | 71 ++-- ...chment.java => ServerWriteAttachment.java} | 31 +- ...java => ServerWriteCompletionHandler.java} | 17 +- ...java => ProxyEstablishmentAttachment.java} | 14 +- ... ProxyEstablishmentCompletionHandler.java} | 26 +- ...r.java => ProxyReadCompletionHandler.java} | 20 +- ...achment.java => ProxyWriteAttachment.java} | 25 +- ....java => ProxyWriteCompletionHandler.java} | 6 +- .../src/main/resources/conf/cfg.properties | 2 +- pom.xml | 4 +- 33 files changed, 999 insertions(+), 474 deletions(-) create mode 100644 magpiebridge-client/.gitignore create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java delete mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java create mode 100644 magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java delete mode 100644 magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java create mode 100644 magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/{ServerAttachment.java => ServerEstablishmentAttachment.java} (52%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/{ServerCompletionHandler.java => ServerEstablishmentCompletionHandler.java} (48%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/{ClientAttachment.java => ServerReadAttachment.java} (73%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/{ClientCompletionHandler.java => ServerReadCompletionHandler.java} (64%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/{ClientWriteAttachment.java => ServerWriteAttachment.java} (59%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/{ClientWriteCompletionHandler.java => ServerWriteCompletionHandler.java} (60%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/{ProxyServerAttachment.java => ProxyEstablishmentAttachment.java} (56%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/{ProxyServerCompletionHandler.java => ProxyEstablishmentCompletionHandler.java} (45%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/{ProxyClientCompletionHandler.java => ProxyReadCompletionHandler.java} (68%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/{ProxyClientWriteAttachment.java => ProxyWriteAttachment.java} (53%) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/{ProxyClientWriteCompletionHandler.java => ProxyWriteCompletionHandler.java} (62%) diff --git a/magpiebridge-client/.gitignore b/magpiebridge-client/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/magpiebridge-client/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/magpiebridge-client/pom.xml b/magpiebridge-client/pom.xml index c90ef06..d9fed5e 100644 --- a/magpiebridge-client/pom.xml +++ b/magpiebridge-client/pom.xml @@ -4,7 +4,7 @@ org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 magpiebridge-client diff --git a/magpiebridge-client/src/main/bin/startup.sh b/magpiebridge-client/src/main/bin/startup.sh index 9f0d925..cbad87a 100644 --- a/magpiebridge-client/src/main/bin/startup.sh +++ b/magpiebridge-client/src/main/bin/startup.sh @@ -52,7 +52,7 @@ CLASSPATH="$base/conf:$CLASSPATH"; echo CLASSPATH :$CLASSPATH echo "cd to $bin_abs_path for workaround relative path" cd $bin_abs_path -$JAVA $JAVA_OPTS $APP_OPTS -classpath .:$CLASSPATH org.jiucheng.magpiebridge.client.nio.Client 1>>$base/logs/client.log 2>&1 & +$JAVA $JAVA_OPTS $APP_OPTS -classpath .:$CLASSPATH org.jiucheng.magpiebridge.client.aio.Client 1>>$base/logs/client.log 2>&1 & echo $! > $base/bin/client.pid echo "cd to $current_path for continue" diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java new file mode 100644 index 0000000..ad56512 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/Client.java @@ -0,0 +1,84 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.Enumeration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class Client { + private static final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2); + + public static void main(String[] args) throws IOException, InterruptedException { + Cfg.loadProperties(Client.class); + final ClientAttachment attachment = new ClientAttachment(); + startup(attachment); + schedule(attachment); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + shutdown(attachment); + } + }); + } + + private static void schedule(final ClientAttachment clientAttachment) { + // 重连 + schedule.scheduleAtFixedRate(new Runnable() { + public void run() { + if (clientAttachment.isFailed()) { + try { + startup(clientAttachment); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }, 60, 60, TimeUnit.SECONDS); + // 心跳 + schedule.scheduleWithFixedDelay(new Runnable() { + public void run() { + if (!clientAttachment.isFailed()) { + clientAttachment.heartbeat(); + } + } + }, 30L, 30L, TimeUnit.SECONDS); + } + + public static void startup(final ClientAttachment attachment) throws IOException { + ConcurrentHashMap tas = attachment.getTransferAttachments(); + Enumeration uris = tas.keys(); + if (uris != null) { + while (uris.hasMoreElements()) { + Integer uri = uris.nextElement(); + TransferAttachment ta = tas.remove(uri); + if (ta != null) { + ta.disconnect(); + } + } + } + AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); + client.setOption(StandardSocketOptions.SO_REUSEADDR, true); + client.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024); + attachment.setClient(client); + attachment.setFailed(false); + client.connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()), attachment, attachment.getClientEstablishmentCompletionHandler()); + } + + public static void shutdown(ClientAttachment clientAttachment) { + schedule.shutdownNow(); + clientAttachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java new file mode 100644 index 0000000..060cc20 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientAttachment.java @@ -0,0 +1,141 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.Enumeration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment; + +/** + * + * @author jiucheng + * + */ +public class ClientAttachment { + private ClientEstablishmentCompletionHandler clientEstablishmentCompletionHandler = new ClientEstablishmentCompletionHandler(); + private ClientReadCompletionHandler clientReadCompletionHandler = new ClientReadCompletionHandler(); + private ClientWriteCompletionHandler clientWriteCompletionHandler = new ClientWriteCompletionHandler(); + private ClientWriteNoReadCompletionHandler clientWriteNoReadCompletionHandler = new ClientWriteNoReadCompletionHandler(); + + private ConcurrentHashMap transferAttachments = new ConcurrentHashMap(); + // 当前在写 + public final AtomicBoolean writed = new AtomicBoolean(false); + + private AsynchronousSocketChannel client; + private ByteBuffer writeByteBuffer; + private ByteBuffer readByteBuffer; + private boolean failed = false; + + public ClientEstablishmentCompletionHandler getClientEstablishmentCompletionHandler() { + return clientEstablishmentCompletionHandler; + } + + public ClientReadCompletionHandler getClientReadCompletionHandler() { + return clientReadCompletionHandler; + } + + public ClientWriteCompletionHandler getClientWriteCompletionHandler() { + return clientWriteCompletionHandler; + } + + public ClientWriteNoReadCompletionHandler getClientWriteNoReadCompletionHandler() { + return clientWriteNoReadCompletionHandler; + } + + public AsynchronousSocketChannel getClient() { + return client; + } + + public ClientAttachment setClient(AsynchronousSocketChannel client) { + this.client = client; + return this; + } + + public ByteBuffer getReadByteBuffer() { + return readByteBuffer; + } + + public ClientAttachment setReadByteBuffer(ByteBuffer readByteBuffer) { + this.readByteBuffer = readByteBuffer; + return this; + } + + public ByteBuffer getWriteByteBuffer() { + return writeByteBuffer; + } + + public ClientAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) { + this.writeByteBuffer = writeByteBuffer; + return this; + } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public boolean isFailed() { + return failed; + } + + public ClientAttachment setFailed(boolean failed) { + this.failed = failed; + return this; + } + + public ConcurrentHashMap getTransferAttachments() { + return transferAttachments; + } + + // 代理连接断开消息发送 + public void disconnect(int uri) { + if (canWrited()) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.DISCONNECT); + message.setUri(uri); + ByteBuffer writeByteBuffer = Message.toByteBuffer(message); + getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer), getClientWriteNoReadCompletionHandler()); + } + } + + // HEARTBEAT + public void heartbeat() { + if (canWrited()) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.HEARTBEAT); + ByteBuffer writeByteBuffer = Message.toByteBuffer(message); + getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer), getClientWriteNoReadCompletionHandler()); + } + } + + public void close () { + try { + getClient().close(); + } catch (IOException e) { + } + ConcurrentHashMap tas = getTransferAttachments(); + Enumeration uris = tas.keys(); + if (uris != null) { + while (uris.hasMoreElements()) { + Integer uri = uris.nextElement(); + TransferAttachment ta = tas.remove(uri); + if (ta != null) { + ta.disconnect(); + } + } + } + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java new file mode 100644 index 0000000..d4d3cdf --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientEstablishmentCompletionHandler.java @@ -0,0 +1,33 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class ClientEstablishmentCompletionHandler implements CompletionHandler { + + public void completed(Void result, ClientAttachment attachment) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.AUTH); + byte[] data = Cfg.getClientKey().getBytes(); + message.setData(data); + message.setSize(data.length); + ByteBuffer buffer = Message.toByteBuffer(message); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), attachment.getClientWriteCompletionHandler()); + } + } + + public void failed(Throwable exc, ClientAttachment attachment) { + System.out.println("establish error"); + attachment.setFailed(true); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java new file mode 100644 index 0000000..8ebdf5e --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientReadCompletionHandler.java @@ -0,0 +1,113 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.transfer.aio.TransferAttachment; +import org.jiucheng.magpiebridge.transfer.aio.TransferEstablishmentCompletionHandler; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class ClientReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ClientAttachment attachment) { + if (result == -1) { + close(attachment); + return; + } + + ByteBuffer readByteBuffer = attachment.getReadByteBuffer(); + if (readByteBuffer.position() != readByteBuffer.capacity()) { + attachment.getClient().read(readByteBuffer, attachment, this); + return; + } + + if (readByteBuffer.capacity() == 13) { + // 验证消息头 + readByteBuffer.flip(); + int magic = readByteBuffer.getInt(); + byte type = readByteBuffer.get(); + int uri = readByteBuffer.getInt(); + int size = readByteBuffer.getInt(); + if (magic != Message.MAGIC) { + close(attachment); + return; + } + if (size > 0) { + readByteBuffer = ByteBuffer.allocate(size + 13); + readByteBuffer.putInt(magic); + readByteBuffer.put(type); + readByteBuffer.putInt(uri); + readByteBuffer.putInt(size); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + return; + } + } + + Message message = Message.fromByteBuffer(readByteBuffer); + byte type = message.getType(); + if (type == Message.Type.CONNECT) { + try { + handleConnectMessage(attachment, message); + } catch (IOException e) { + e.printStackTrace(); + attachment.disconnect(message.getUri()); + } + readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + } else if (type == Message.Type.HEARTBEAT) { + // 心跳不处理 + readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + } + } + + private void handleConnectMessage(ClientAttachment attachment, Message message) throws IOException { + int uri = message.getUri(); + // lp.io2c.com + String ipport = new String(message.getData()); + String[] strs = ipport.split(":"); + + SocketChannel realSocketChannel = null; + try { + realSocketChannel = SocketChannel.open(); + realSocketChannel.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1]))); + } catch (NumberFormatException e1) { + attachment.disconnect(uri); + return; + } catch (IOException e1) { + attachment.disconnect(uri); + return; + } + + AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); + client.setOption(StandardSocketOptions.SO_REUSEADDR, true); + client.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024); + TransferAttachment transferAttachment = new TransferAttachment().setClient(client).setUri(uri).setRealSocketChannel(realSocketChannel); + + transferAttachment.getClient().connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort()), transferAttachment.setClientAttachment(attachment), new TransferEstablishmentCompletionHandler()); + } + + public void close(ClientAttachment attachment) { + attachment.setFailed(true); + try { + attachment.getClient().close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void failed(Throwable exc, ClientAttachment attachment) { + attachment.setFailed(true); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java new file mode 100644 index 0000000..1551bc0 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteCompletionHandler.java @@ -0,0 +1,30 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ClientWriteCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ClientAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + + attachment.setWriteByteBuffer(null); + ByteBuffer readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), attachment.getClientReadCompletionHandler()); + } + + public void failed(Throwable exc, ClientAttachment attachment) { + System.out.println("write error"); + attachment.setFailed(true); + attachment.writed.compareAndSet(true, false); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java new file mode 100644 index 0000000..16f9f1b --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/aio/ClientWriteNoReadCompletionHandler.java @@ -0,0 +1,25 @@ +package org.jiucheng.magpiebridge.client.aio; + +import java.nio.channels.CompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ClientWriteNoReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ClientAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + } + + public void failed(Throwable exc, ClientAttachment attachment) { + System.out.println("write heartbeat error"); + attachment.setFailed(true); + attachment.writed.compareAndSet(true, false); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java deleted file mode 100644 index eb67f50..0000000 --- a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/client/nio/Client.java +++ /dev/null @@ -1,340 +0,0 @@ -package org.jiucheng.magpiebridge.client.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.util.Cfg; - -/** - * - * @author jiucheng - * - */ -public class Client { - - private static final Logger LOGGER = Logger.getLogger(Client.class.getName()); - - private static final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2); - private static final ExecutorService executorService = Executors.newCachedThreadPool(); - private static final Map realServers = new ConcurrentHashMap(); - - // 当前在写 - private static final AtomicBoolean writed = new AtomicBoolean(false); - private static volatile SocketChannel socketChannel; - private static volatile boolean runed = false; - - public static void main(String[] args) throws IOException, InterruptedException { - Cfg.loadProperties(Client.class); - schedule(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - shutdown(); - } - }); - startup(); - } - - private static void schedule() { - // 重连 - schedule.scheduleAtFixedRate(new Runnable() { - public void run() { - if (!runed) { - try { - startup(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - }, 60, 60, TimeUnit.SECONDS); - // 心跳 - schedule.scheduleWithFixedDelay(new Runnable() { - public void run() { - SocketChannel sc = Client.socketChannel; - if (sc != null) { - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.HEARTBEAT); - ByteBuffer buffer = Message.toByteBuffer(message); - while(!writed.compareAndSet(false, true)) {} - try { - while (buffer.hasRemaining()) { - sc.write(buffer); - } - } catch (IOException e) { - e.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } - } - }, 30L, 30L, TimeUnit.SECONDS); - } - - public static void startup() { - runed = true; - try { - socketChannel = SocketChannel.open(); - socketChannel.connect(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort())); - } catch (IOException e) { - e.printStackTrace(); - closeSocketChannel(); - return; - } - - auth(socketChannel); - - ByteBuffer headerByteBuffer = ByteBuffer.allocate(13); - while (true) { - headerByteBuffer.clear(); - ByteBuffer buf; - try { - int len = socketChannel.read(headerByteBuffer); - if (len == -1) { - closeSocketChannel(); - return; - } - while (headerByteBuffer.position() != headerByteBuffer.capacity()) { - socketChannel.read(headerByteBuffer); - } - - headerByteBuffer.flip(); - - int magic = headerByteBuffer.getInt(); - byte type = headerByteBuffer.get(); - int uri = headerByteBuffer.getInt(); - int size = headerByteBuffer.getInt(); - - buf = ByteBuffer.allocate(size + 13); - buf.putInt(magic); - buf.put(type); - buf.putInt(uri); - buf.putInt(size); - while (buf.position() != buf.capacity()) { - socketChannel.read(buf); - } - } catch (Exception e) { - e.printStackTrace(); - closeSocketChannel(); - return; - } - - Message message = Message.fromByteBuffer(buf); - byte type = message.getType(); - if (type == Message.Type.CONNECT) { - handleConnectMessage(socketChannel, message); - } else if (type == Message.Type.DISCONNECT) { - handleDisconnectMessage(socketChannel, message); - } else if (type == Message.Type.TRANSFER) { - handleTransferMessage(socketChannel, message); - } else if (type == Message.Type.HEARTBEAT) { - // 心跳不处理 - } - } - } - - public static void closeSocketChannel() { - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (IOException e) { - // e.printStackTrace(); - } - socketChannel = null; - } - for (Integer key : realServers.keySet()) { - SocketChannel socket = realServers.remove(key); - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - // e.printStackTrace(); - } - } - } - runed = false; - } - - public static void shutdown() { - schedule.shutdownNow(); - closeSocketChannel(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("shutdown"); - } - } - - private static void handleDisconnectMessage(final SocketChannel socketChannel, final Message proxyMessage) { - SocketChannel socket = realServers.remove(proxyMessage.getUri()); - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - // e.printStackTrace(); - } - } - } - - private static void handleTransferMessage(final SocketChannel socketChannel, final Message message) { - ByteBuffer byteBuffer = ByteBuffer.allocate(message.getData().length); - byteBuffer.put(message.getData()); - byteBuffer.flip(); - SocketChannel socket = realServers.get(message.getUri()); - if (socket != null) { - try { - while(byteBuffer.hasRemaining()) { - socket.write(byteBuffer); - } - } catch (Exception e) { - e.printStackTrace(); - realServers.remove(message.getUri()); - while(!writed.compareAndSet(false, true)) {} - try { - Message rmsg = new Message(); - rmsg.setMagic(Message.MAGIC); - rmsg.setType(Message.Type.DISCONNECT); - rmsg.setUri(message.getUri()); - ByteBuffer buffer = Message.toByteBuffer(rmsg); - while(buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e2) { - e2.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } - } - } - - private static void handleConnectMessage(final SocketChannel socketChannel, final Message message) { - int uri = message.getUri(); - // lp.io2c.com - String ipport = new String(message.getData()); - String[] strs = ipport.split(":"); - - SocketChannel realSocketChannel = null; - try { - realSocketChannel = SocketChannel.open(); - realSocketChannel.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1]))); - } catch (NumberFormatException e1) { - e1.printStackTrace(); - disconnect(socketChannel, uri); - return; - } catch (IOException e1) { - e1.printStackTrace(); - disconnect(socketChannel, uri); - return; - } - - realServers.put(uri, realSocketChannel); - - final Integer tmpUri = uri; - executorService.execute(new Runnable() { - private Integer uri = tmpUri; - public void run() { - ByteBuffer buf = ByteBuffer.allocate(4 * 1024 * 1024); - while(true) { - SocketChannel real = realServers.get(uri); - if (real == null) { - return; - } - int len; - try { - len = real.read(buf); - } catch (IOException e) { - realServers.remove(uri); - disconnect(socketChannel, uri); - // e.printStackTrace(); - return; - } - if (len == -1) { - realServers.remove(uri); - disconnect(socketChannel, uri); - return; - } else if (len > 0) { - buf.flip(); - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.TRANSFER); - message.setUri(uri); - byte[] bts = new byte[len]; - buf.get(bts); - message.setData(bts); - message.setSize(bts.length); - ByteBuffer buffer = Message.toByteBuffer(message); - while(!writed.compareAndSet(false, true)) {} - try { - while(buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e) { - e.printStackTrace(); - closeSocketChannel(); - return; - } finally { - writed.compareAndSet(true, false); - } - buf.clear(); - } - } - } - }); - } - - // 代理连接断开消息发送 - private static void disconnect(SocketChannel socketChannel, int uri) { - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.DISCONNECT); - message.setUri(uri); - ByteBuffer buffer = Message.toByteBuffer(message); - while (!writed.compareAndSet(false, true)) {} - try { - while(buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e2) { - e2.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } - - // 认证信息发送 - private static void auth(final SocketChannel socketChannel) { - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.AUTH); - byte[] data = Cfg.getClientKey().getBytes(); - message.setData(data); - message.setSize(data.length); - ByteBuffer buffer = Message.toByteBuffer(message); - while(!writed.compareAndSet(false, true)) {} - try { - while (buffer.hasRemaining()) { - socketChannel.write(buffer); - } - } catch (Exception e) { - e.printStackTrace(); - closeSocketChannel(); - } finally { - writed.compareAndSet(true, false); - } - } -} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java new file mode 100644 index 0000000..23518c9 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferAttachment.java @@ -0,0 +1,151 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jiucheng.magpiebridge.client.aio.ClientAttachment; +import org.jiucheng.magpiebridge.protocol.Message; + +/** + * + * @author jiucheng + * + */ +public class TransferAttachment { + // 当前在写 + public final AtomicBoolean writed = new AtomicBoolean(false); + + private AsynchronousSocketChannel client; + private ByteBuffer writeByteBuffer; + private ByteBuffer readByteBuffer; + private int uri; + + private SocketChannel realSocketChannel; + private boolean failed; + + private ClientAttachment clientAttachment; + + public ClientAttachment getClientAttachment() { + return clientAttachment; + } + + public TransferAttachment setClientAttachment(ClientAttachment clientAttachment) { + this.clientAttachment = clientAttachment; + return this; + } + + public boolean isFailed() { + return failed; + } + + public TransferAttachment setFailed(boolean failed) { + this.failed = failed; + return this; + } + + public SocketChannel getRealSocketChannel() { + return realSocketChannel; + } + + public TransferAttachment setRealSocketChannel(SocketChannel realSocketChannel) { + this.realSocketChannel = realSocketChannel; + return this; + } + + public int getUri() { + return uri; + } + + public TransferAttachment setUri(int uri) { + this.uri = uri; + return this; + } + + public AsynchronousSocketChannel getClient() { + return client; + } + + public TransferAttachment setClient(AsynchronousSocketChannel client) { + this.client = client; + return this; + } + + public ByteBuffer getWriteByteBuffer() { + return writeByteBuffer; + } + + public TransferAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) { + this.writeByteBuffer = writeByteBuffer; + return this; + } + + public ByteBuffer getReadByteBuffer() { + return readByteBuffer; + } + + public TransferAttachment setReadByteBuffer(ByteBuffer readByteBuffer) { + this.readByteBuffer = readByteBuffer; + return this; + } + + public boolean canReaded() { + while (!writed.compareAndSet(false, false)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public void close() { + if (clientAttachment != null) { + clientAttachment.getTransferAttachments().remove(getUri()); + clientAttachment = null; + } + if (realSocketChannel != null) { + try { + realSocketChannel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + realSocketChannel = null; + } + if (client != null) { + try { + client.close(); + } catch (IOException e) { + e.printStackTrace(); + } + client = null; + } + } + + // 代理连接断开消息发送 + public void disconnect() { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.DISCONNECT); + message.setUri(getUri()); + ByteBuffer writeByteBuffer = Message.toByteBuffer(message); + if (canWrited()) { + getClient().write(writeByteBuffer, setWriteByteBuffer(writeByteBuffer).setFailed(true), new TransferWriteCompletionHandler()); + } + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java new file mode 100644 index 0000000..7701e79 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferConnectedCompletionHandler.java @@ -0,0 +1,74 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; + +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.util.ThreadManager; + +/** + * + * @author jiucheng + * + */ +public class TransferConnectedCompletionHandler implements CompletionHandler { + + public void completed(Integer result, final TransferAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + + attachment.setWriteByteBuffer(null); + ByteBuffer readByteBuffer = ByteBuffer.allocate(13); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), new TransferReadCompletionHandler()); + + ThreadManager.singleton().execute(new Runnable() { + public void run() { + ByteBuffer buf = ByteBuffer.allocate(4 * 1024 * 1024); + while(true) { + if (attachment.canReaded()) { + SocketChannel real = attachment.getRealSocketChannel(); + if (real == null) { + return; + } + int len; + try { + len = real.read(buf); + } catch (IOException e) { + attachment.disconnect(); + return; + } + if (len == -1) { + attachment.disconnect(); + return; + } else if (len > 0) { + buf.flip(); + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.TRANSFER); + message.setUri(attachment.getUri()); + byte[] bts = new byte[len]; + buf.get(bts); + message.setData(bts); + message.setSize(bts.length); + ByteBuffer buffer = Message.toByteBuffer(message); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferWriteCompletionHandler()); + buf.clear(); + } + } + } + } + } + }); + } + + public void failed(Throwable exc, TransferAttachment attachment) { + attachment.writed.compareAndSet(true, false); + attachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java new file mode 100644 index 0000000..97054c8 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferEstablishmentCompletionHandler.java @@ -0,0 +1,38 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +import org.jiucheng.magpiebridge.client.aio.ClientAttachment; +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.util.Cfg; + +/** + * + * @author jiucheng + * + */ +public class TransferEstablishmentCompletionHandler implements CompletionHandler { + + public void completed(Void result, TransferAttachment attachment) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.CONNECT); + message.setUri(attachment.getUri()); + byte[] data = Cfg.getClientKey().getBytes(); + message.setData(data); + message.setSize(data.length); + ByteBuffer buffer = Message.toByteBuffer(message); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferConnectedCompletionHandler()); + } + } + + public void failed(Throwable exc, TransferAttachment attachment) { + ClientAttachment clientAttachment = attachment.getClientAttachment(); + if (clientAttachment != null) { + clientAttachment.disconnect(attachment.getUri()); + } + attachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java new file mode 100644 index 0000000..955c52c --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferReadCompletionHandler.java @@ -0,0 +1,90 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; + +import org.jiucheng.magpiebridge.protocol.Message; + +/** + * + * @author jiucheng + * + */ +public class TransferReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, TransferAttachment attachment) { + // client关闭连接 + if (result == -1) { + // close(attachment); + attachment.close(); + return; + } + ByteBuffer readByteBuffer = attachment.getReadByteBuffer(); + if (readByteBuffer.position() != readByteBuffer.capacity()) { + attachment.getClient().read(readByteBuffer, attachment, this); + return; + } + if (readByteBuffer.capacity() == 13) { + // 验证消息头 + readByteBuffer.flip(); + int magic = readByteBuffer.getInt(); + byte type = readByteBuffer.get(); + int uri = readByteBuffer.getInt(); + int size = readByteBuffer.getInt(); + if (magic != Message.MAGIC) { + // close(attachment); + attachment.close(); + return; + } + if (size > 0) { + readByteBuffer = ByteBuffer.allocate(size + 13); + readByteBuffer.putInt(magic); + readByteBuffer.put(type); + readByteBuffer.putInt(uri); + readByteBuffer.putInt(size); + attachment.getClient().read(readByteBuffer, attachment.setReadByteBuffer(readByteBuffer), this); + return; + } + } + + Message message = Message.fromByteBuffer(readByteBuffer); + byte type = message.getType(); + if (type == Message.Type.TRANSFER) { + handleTransferMessage(attachment, message); + } else if (type == Message.Type.HEARTBEAT) { + // 心跳不处理 + attachment.setReadByteBuffer(ByteBuffer.allocate(13)); + attachment.getClient().read(attachment.getReadByteBuffer(), attachment, this); + } + } + + private void handleTransferMessage(TransferAttachment attachment, final Message message) { + ByteBuffer byteBuffer = ByteBuffer.allocate(message.getData().length); + byteBuffer.put(message.getData()); + byteBuffer.flip(); + SocketChannel socket = attachment.getRealSocketChannel(); + if (socket != null) { + try { + while(byteBuffer.hasRemaining()) { + socket.write(byteBuffer); + } + attachment.setReadByteBuffer(ByteBuffer.allocate(13)); + attachment.getClient().read(attachment.getReadByteBuffer(), attachment, this); + } catch (Exception e) { + Message rmsg = new Message(); + rmsg.setMagic(Message.MAGIC); + rmsg.setType(Message.Type.DISCONNECT); + rmsg.setUri(message.getUri()); + ByteBuffer buffer = Message.toByteBuffer(rmsg); + if (attachment.canWrited()) { + attachment.getClient().write(buffer, attachment.setWriteByteBuffer(buffer), new TransferWriteCompletionHandler()); + } + } + } + } + + public void failed(Throwable exc, TransferAttachment attachment) { + attachment.close(); + } +} diff --git a/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java new file mode 100644 index 0000000..0538f48 --- /dev/null +++ b/magpiebridge-client/src/main/java/org/jiucheng/magpiebridge/transfer/aio/TransferWriteCompletionHandler.java @@ -0,0 +1,28 @@ +package org.jiucheng.magpiebridge.transfer.aio; + +import java.nio.channels.CompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class TransferWriteCompletionHandler implements CompletionHandler { + + public void completed(Integer result, TransferAttachment attachment) { + if (attachment.getWriteByteBuffer().hasRemaining()) { + attachment.getClient().write(attachment.getWriteByteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + + if (attachment.isFailed()) { + attachment.close(); + } + } + + public void failed(Throwable exc, TransferAttachment attachment) { + attachment.writed.compareAndSet(true, false); + attachment.close(); + } +} diff --git a/magpiebridge-common/pom.xml b/magpiebridge-common/pom.xml index 1f948a8..ccbd09e 100644 --- a/magpiebridge-common/pom.xml +++ b/magpiebridge-common/pom.xml @@ -4,7 +4,7 @@ org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 magpiebridge-common diff --git a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java deleted file mode 100644 index 7fdd9fc..0000000 --- a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/schedule/ScheduleManager.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.jiucheng.magpiebridge.schedule; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ScheduleManager { - - private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); - - public static void schedule(Runnable runnable) { - scheduledExecutorService.scheduleAtFixedRate(runnable, 30L, 30L, TimeUnit.MICROSECONDS); - } -} diff --git a/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java new file mode 100644 index 0000000..d9a8a04 --- /dev/null +++ b/magpiebridge-common/src/main/java/org/jiucheng/magpiebridge/util/ThreadManager.java @@ -0,0 +1,18 @@ +package org.jiucheng.magpiebridge.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * + * @author jiucheng + * + */ +public class ThreadManager { + + private static final ExecutorService executorService = Executors.newCachedThreadPool(); + + public static ExecutorService singleton() { + return executorService; + } +} diff --git a/magpiebridge-server/pom.xml b/magpiebridge-server/pom.xml index 0c87baf..edbef9a 100644 --- a/magpiebridge-server/pom.xml +++ b/magpiebridge-server/pom.xml @@ -4,7 +4,7 @@ org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 magpiebridge-server diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java index 7d80a30..0ce996c 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java @@ -5,12 +5,11 @@ import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; import org.jiucheng.magpiebridge.util.Cfg; +import org.jiucheng.magpiebridge.util.ThreadManager; /** * @@ -18,23 +17,19 @@ import org.jiucheng.magpiebridge.util.Cfg; * */ public class Server { - private static final Logger LOGGER = Logger.getLogger(Server.class.getName()); private static final Object wait = new Object(); - public static final int READ_TIMEOUT = Integer.MAX_VALUE; - public static void main(String[] args) throws IOException, InterruptedException { Cfg.loadProperties(Server.class); - ExecutorService executorService = Executors.newCachedThreadPool(); - AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService); + AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(ThreadManager.singleton()); final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group); server.setOption(StandardSocketOptions.SO_REUSEADDR, true); server.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024); server.bind(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort())); - server.accept(new ServerAttachment(server), new ServerCompletionHandler()); + server.accept(new ServerEstablishmentAttachment(server), new ServerEstablishmentCompletionHandler()); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { @@ -54,6 +49,7 @@ public class Server { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, "Server started"); } + synchronized (wait) { wait.wait(); } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentAttachment.java similarity index 52% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentAttachment.java index ecc43dc..c9f678a 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentAttachment.java @@ -2,11 +2,15 @@ package org.jiucheng.magpiebridge.server.aio; import java.nio.channels.AsynchronousServerSocketChannel; -public class ServerAttachment { +/** + * + * @author jiucheng + * + */ +public class ServerEstablishmentAttachment { + private AsynchronousServerSocketChannel server; - AsynchronousServerSocketChannel server; - - public ServerAttachment(AsynchronousServerSocketChannel server) { + public ServerEstablishmentAttachment(AsynchronousServerSocketChannel server) { this.server = server; } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java similarity index 48% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java index e2ab76d..8d6792c 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java @@ -3,18 +3,22 @@ package org.jiucheng.magpiebridge.server.aio; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; -import java.util.concurrent.TimeUnit; -public class ServerCompletionHandler implements CompletionHandler { +/** + * + * @author jiucheng + * + */ +public class ServerEstablishmentCompletionHandler implements CompletionHandler { - public void completed(AsynchronousSocketChannel result, ServerAttachment attachment) { + public void completed(AsynchronousSocketChannel result, ServerEstablishmentAttachment attachment) { attachment.getServer().accept(attachment, this); ByteBuffer readBuffer = ByteBuffer.allocate(13); - result.read(readBuffer, Server.READ_TIMEOUT, TimeUnit.SECONDS, new ClientAttachment(result).setReadBuffer(readBuffer), new ClientCompletionHandler()); + result.read(readBuffer, new ServerReadAttachment(result).setReadBuffer(readBuffer), new ServerReadCompletionHandler()); } - public void failed(Throwable exc, ServerAttachment attachment) { + public void failed(Throwable exc, ServerEstablishmentAttachment attachment) { exc.printStackTrace(); } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadAttachment.java similarity index 73% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadAttachment.java index 4c7ac51..e8da71f 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadAttachment.java @@ -9,9 +9,10 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteAttachment; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyWriteAttachment; /** * Client附件 @@ -19,21 +20,20 @@ import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteAttachment; * @author jiucheng * */ -public class ClientAttachment { +public class ServerReadAttachment { // 当前在写 public final AtomicBoolean writed = new AtomicBoolean(false); - // 代理服务 private List asynchronousServerSocketChannels = new ArrayList(); // 代理连接 - private final ConcurrentMap proxys = new ConcurrentHashMap(); + public static final ConcurrentMap proxys = new ConcurrentHashMap(); // Client连接 AsynchronousSocketChannel asynchronousSocketChannel; // Client读入数据 ByteBuffer readBuffer; - public ClientAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { + public ServerReadAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { this.asynchronousSocketChannel = asynchronousSocketChannel; } @@ -41,7 +41,7 @@ public class ClientAttachment { return readBuffer; } - public ClientAttachment setReadBuffer(ByteBuffer readBuffer) { + public ServerReadAttachment setReadBuffer(ByteBuffer readBuffer) { this.readBuffer = readBuffer; return this; } @@ -50,15 +50,15 @@ public class ClientAttachment { return asynchronousSocketChannel; } - public void putProxy(Integer uri, ProxyClientWriteAttachment proxy) { + public void putProxy(Integer uri, ProxyWriteAttachment proxy) { proxys.put(uri, proxy); } - public ProxyClientWriteAttachment getProxy(Integer uri) { + public ProxyWriteAttachment getProxy(Integer uri) { return proxys.get(uri); } - public ProxyClientWriteAttachment removeProxy(Integer uri) { + public ProxyWriteAttachment removeProxy(Integer uri) { return proxys.remove(uri); } @@ -69,7 +69,7 @@ public class ClientAttachment { public void close() { Set keys = proxys.keySet(); for (Integer key : keys) { - ProxyClientWriteAttachment proxyWriteAttachment = proxys.remove(key); + ProxyWriteAttachment proxyWriteAttachment = proxys.remove(key); if (proxyWriteAttachment != null && proxyWriteAttachment.getAsynchronousSocketChannel() != null) { try { proxyWriteAttachment.getAsynchronousSocketChannel().close(); @@ -95,4 +95,15 @@ public class ClientAttachment { } } } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java similarity index 64% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java index efdbdf4..f64ddd8 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ClientCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadCompletionHandler.java @@ -10,31 +10,29 @@ import java.nio.channels.CompletionHandler; import java.text.MessageFormat; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteAttachment; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyClientWriteCompletionHandler; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyServerAttachment; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyServerCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyReadCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyWriteAttachment; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyWriteCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentAttachment; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentCompletionHandler; import org.jiucheng.magpiebridge.util.Cfg; +import org.jiucheng.magpiebridge.util.ThreadManager; /** - * Client数据解析处理 * * @author jiucheng * */ -public class ClientCompletionHandler implements CompletionHandler { - - private static final Logger LOGGER = Logger.getLogger(ClientCompletionHandler.class.getName()); +public class ServerReadCompletionHandler implements CompletionHandler { + private static final Logger LOGGER = Logger.getLogger(ServerReadCompletionHandler.class.getName()); // Client连接 - private static final ConcurrentMap CLIENTS = new ConcurrentHashMap(); + private static final ConcurrentMap CLIENTS = new ConcurrentHashMap(); - public void completed(Integer result, ClientAttachment attachment) { + public void completed(Integer result, ServerReadAttachment attachment) { // client关闭连接 if (result == -1) { close(attachment); @@ -43,7 +41,7 @@ public class ClientCompletionHandler implements CompletionHandler 0) { readByteBuffer = ByteBuffer.allocate(size + 13); readByteBuffer.putInt(magic); readByteBuffer.put(type); readByteBuffer.putInt(uri); readByteBuffer.putInt(size); - attachment.getAsynchronousSocketChannel().read(readByteBuffer, Server.READ_TIMEOUT, TimeUnit.SECONDS, attachment.setReadBuffer(readByteBuffer), this); + attachment.getAsynchronousSocketChannel().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this); return; } } @@ -79,31 +77,40 @@ public class ClientCompletionHandler implements CompletionHandler remote={1}", local, remote)); } proxy.bind(new InetSocketAddress(locals[0], Integer.valueOf(locals[1]))); - proxy.accept(new ProxyServerAttachment(proxy).setClientAttachment(attachment).setRemote(remote), new ProxyServerCompletionHandler()); + proxy.accept(new ProxyEstablishmentAttachment(proxy).setClientAttachment(attachment).setRemote(remote), new ProxyEstablishmentCompletionHandler()); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, MessageFormat.format("----end port={0} -> remote={1}", local, remote)); } @@ -156,11 +163,11 @@ public class ClientCompletionHandler implements CompletionHandler { +/** + * + * @author jiucheng + * + */ +public class ServerWriteCompletionHandler implements CompletionHandler { - public void completed(Integer result, ClientWriteAttachment attachment) { + public void completed(Integer result, ServerWriteAttachment attachment) { if (attachment.getWriteBuffer().hasRemaining()) { attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(attachment.getWriteBuffer(), attachment, this); return; @@ -15,13 +20,13 @@ public class ClientWriteCompletionHandler implements CompletionHandler { - +public class ProxyEstablishmentCompletionHandler implements CompletionHandler { private static final AtomicInteger CLIENTS_ID = new AtomicInteger(); - - public void completed(AsynchronousSocketChannel result, ProxyServerAttachment attachment) { - attachment.getProxyServer().accept(attachment, this); + + public void completed(AsynchronousSocketChannel result, ProxyEstablishmentAttachment attachment) { + attachment.getProxyServer().accept(new ProxyEstablishmentAttachment(attachment.getProxyServer()).setClientAttachment(attachment.getClientAttachment()).setRemote(attachment.getRemote()), this); Message message = new Message(); message.setMagic(Message.MAGIC); @@ -25,13 +24,18 @@ public class ProxyServerCompletionHandler implements CompletionHandler { +public class ProxyReadCompletionHandler implements CompletionHandler { - public void completed(Integer result, ClientWriteAttachment attachment) { + public void completed(Integer result, ServerWriteAttachment attachment) { if (result == -1) { try { attachment.getProxyAsynchronousSocketChannel().close(); @@ -24,13 +24,15 @@ public class ProxyClientCompletionHandler implements CompletionHandler { +public class ProxyWriteCompletionHandler implements CompletionHandler { - public void completed(Integer result, ProxyClientWriteAttachment attachment) { + public void completed(Integer result, ProxyWriteAttachment attachment) { if (attachment.getWriteByteBuffer().hasRemaining()) { attachment.getAsynchronousSocketChannel().write(attachment.getWriteByteBuffer(), attachment, this); return; @@ -12,7 +12,7 @@ public class ProxyClientWriteCompletionHandler implements CompletionHandler4.0.0 org.jiucheng.magpiebridge magpiebridge - 0.0.1 + 1.0.0 pom magpiebridge-common @@ -16,7 +16,7 @@ org.jiucheng.magpiebridge magpiebridge-common - 0.0.1 + 1.0.0 junit -- Gitee From baeff6f000f6c5919787d3a6a0a28060e8a844e6 Mon Sep 17 00:00:00 2001 From: jiucheng Date: Mon, 29 Jul 2019 23:14:37 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=AB=AF=E6=8F=A1=E6=89=8B=E4=B8=AD=E7=9A=84attached=20object?= =?UTF-8?q?=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../magpiebridge/server/aio/Server.java | 2 +- .../aio/ServerEstablishmentAttachment.java | 20 ------------------- .../ServerEstablishmentCompletionHandler.java | 9 +++++---- 3 files changed, 6 insertions(+), 25 deletions(-) delete mode 100644 magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentAttachment.java diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java index 0ce996c..63866c9 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/Server.java @@ -29,7 +29,7 @@ public class Server { server.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024); server.bind(new InetSocketAddress(Cfg.getServerIp(), Cfg.getServerPort())); - server.accept(new ServerEstablishmentAttachment(server), new ServerEstablishmentCompletionHandler()); + server.accept(server, new ServerEstablishmentCompletionHandler()); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentAttachment.java deleted file mode 100644 index c9f678a..0000000 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentAttachment.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.jiucheng.magpiebridge.server.aio; - -import java.nio.channels.AsynchronousServerSocketChannel; - -/** - * - * @author jiucheng - * - */ -public class ServerEstablishmentAttachment { - private AsynchronousServerSocketChannel server; - - public ServerEstablishmentAttachment(AsynchronousServerSocketChannel server) { - this.server = server; - } - - public AsynchronousServerSocketChannel getServer() { - return server; - } -} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java index 8d6792c..9b472c4 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java @@ -1,6 +1,7 @@ package org.jiucheng.magpiebridge.server.aio; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; @@ -9,16 +10,16 @@ import java.nio.channels.CompletionHandler; * @author jiucheng * */ -public class ServerEstablishmentCompletionHandler implements CompletionHandler { +public class ServerEstablishmentCompletionHandler implements CompletionHandler { - public void completed(AsynchronousSocketChannel result, ServerEstablishmentAttachment attachment) { - attachment.getServer().accept(attachment, this); + public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) { + attachment.accept(attachment, this); ByteBuffer readBuffer = ByteBuffer.allocate(13); result.read(readBuffer, new ServerReadAttachment(result).setReadBuffer(readBuffer), new ServerReadCompletionHandler()); } - public void failed(Throwable exc, ServerEstablishmentAttachment attachment) { + public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { exc.printStackTrace(); } } -- Gitee From f0a9ec47b045abd43c3d536b8a445ca0e69c2eae Mon Sep 17 00:00:00 2001 From: jiucheng Date: Sun, 4 Aug 2019 13:26:58 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=A4=9A=E8=BF=9E=E6=8E=A5=E5=BD=A2=E5=BC=8F=E5=92=8C?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E7=AB=AF=E9=80=9A=E4=BF=A1=EF=BC=88IXZBM?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 8 +- ...dAttachment.java => ServerAttachment.java} | 101 +++++++++++------- .../ServerEstablishmentCompletionHandler.java | 2 +- .../aio/ServerReadCompletionHandler.java | 84 ++++++++------- .../server/aio/ServerWriteAttachment.java | 5 + .../aio/ServerWriteCompletionHandler.java | 21 ++-- ...iteNoProxyClientReadCompletionHandler.java | 24 +++++ ...teAttachment.java => ProxyAttachment.java} | 83 +++++++++----- .../proxy/ProxyEstablishmentAttachment.java | 27 +++-- .../ProxyEstablishmentCompletionHandler.java | 42 ++++---- .../aio/proxy/ProxyReadCompletionHandler.java | 67 +++++++----- .../proxy/ProxyWriteCompletionHandler.java | 33 +++++- .../src/main/resources/conf/cfg.properties | 2 +- 13 files changed, 321 insertions(+), 178 deletions(-) rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/{ServerReadAttachment.java => ServerAttachment.java} (46%) create mode 100644 magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java rename magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/{ProxyWriteAttachment.java => ProxyAttachment.java} (32%) diff --git a/README.md b/README.md index 865cb6f..47ff132 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,17 @@ # 鹊桥(内网穿透) #### 介绍 -[使用java实现的内网穿透工具,致力于帮助开发者内网开发供外部调试使用。](http://blog.jiucheng.org/article/10097) +[使用java基于aio/nio实现的内网穿透工具,致力于帮助开发者内网开发供外部调试使用。](http://blog.jiucheng.org/article/10097) #### 软件架构 ![软件架构图](https://images.gitee.com/uploads/images/2019/0616/165254_c3715fbf_120615.png "build-0.0.1.png") +#### 源码打包正式包 + +``` +mvn clean install -Dmaven.test.skip -Denv=release +``` + #### 安装教程 1. 准备公网(假设公网ip=10.1.1.22)服务器,安装openjdk1.7+ diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java similarity index 46% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadAttachment.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java index e8da71f..1c61cda 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerReadAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerAttachment.java @@ -6,59 +6,89 @@ import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.jiucheng.magpiebridge.server.aio.proxy.ProxyWriteAttachment; +import org.jiucheng.magpiebridge.server.aio.proxy.ProxyAttachment; /** - * Client附件 * * @author jiucheng * */ -public class ServerReadAttachment { - // 当前在写 - public final AtomicBoolean writed = new AtomicBoolean(false); +public class ServerAttachment { // 代理服务 private List asynchronousServerSocketChannels = new ArrayList(); // 代理连接 - public static final ConcurrentMap proxys = new ConcurrentHashMap(); + public static final ConcurrentMap proxys = new ConcurrentHashMap(); // Client连接 - AsynchronousSocketChannel asynchronousSocketChannel; + AsynchronousSocketChannel server; // Client读入数据 ByteBuffer readBuffer; + // Client写入数据 + ByteBuffer writeBuffer; + // 当前在写 + public final AtomicBoolean writed = new AtomicBoolean(false); - public ServerReadAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { - this.asynchronousSocketChannel = asynchronousSocketChannel; + private ProxyAttachment proxyAttachment; + private boolean mastered; + + public ServerAttachment(AsynchronousSocketChannel server) { + this.server = server; } + public boolean isMastered() { + return mastered; + } + + public ServerAttachment setMastered(boolean mastered) { + this.mastered = mastered; + return this; + } + + public ProxyAttachment getProxyAttachment() { + return proxyAttachment; + } + + public ServerAttachment setProxyAttachment(ProxyAttachment proxyAttachment) { + this.proxyAttachment = proxyAttachment; + return this; + } + + public ByteBuffer getWriteBuffer() { + return writeBuffer; + } + + public ServerAttachment setWriteBuffer(ByteBuffer writeBuffer) { + this.writeBuffer = writeBuffer; + return this; + } + public ByteBuffer getReadBuffer() { return readBuffer; } - public ServerReadAttachment setReadBuffer(ByteBuffer readBuffer) { + public ServerAttachment setReadBuffer(ByteBuffer readBuffer) { this.readBuffer = readBuffer; return this; } - public AsynchronousSocketChannel getAsynchronousSocketChannel() { - return asynchronousSocketChannel; + public AsynchronousSocketChannel getServer() { + return server; } - public void putProxy(Integer uri, ProxyWriteAttachment proxy) { + public void putProxy(Integer uri, ProxyAttachment proxy) { proxys.put(uri, proxy); } - public ProxyWriteAttachment getProxy(Integer uri) { + public ProxyAttachment getProxy(Integer uri) { return proxys.get(uri); } - public ProxyWriteAttachment removeProxy(Integer uri) { + public ProxyAttachment removeProxy(Integer uri) { return proxys.remove(uri); } @@ -67,33 +97,28 @@ public class ServerReadAttachment { } public void close() { - Set keys = proxys.keySet(); - for (Integer key : keys) { - ProxyWriteAttachment proxyWriteAttachment = proxys.remove(key); - if (proxyWriteAttachment != null && proxyWriteAttachment.getAsynchronousSocketChannel() != null) { - try { - proxyWriteAttachment.getAsynchronousSocketChannel().close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - for (AsynchronousServerSocketChannel asynchronousServerSocketChannel : asynchronousServerSocketChannels) { - if (asynchronousServerSocketChannel != null) { - try { - asynchronousServerSocketChannel.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - if (asynchronousSocketChannel != null) { + if (server != null) { try { - asynchronousSocketChannel.close(); + server.close(); } catch (IOException e) { e.printStackTrace(); } } + if (mastered) { + for (AsynchronousServerSocketChannel asynchronousServerSocketChannel : asynchronousServerSocketChannels) { + if (asynchronousServerSocketChannel != null) { + try { + asynchronousServerSocketChannel.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } else { + if (proxyAttachment != null) { + proxyAttachment.close(); + } + } } public boolean canWrited() { diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java index 9b472c4..2059fd8 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerEstablishmentCompletionHandler.java @@ -16,7 +16,7 @@ public class ServerEstablishmentCompletionHandler implements CompletionHandler { +public class ServerReadCompletionHandler implements CompletionHandler { private static final Logger LOGGER = Logger.getLogger(ServerReadCompletionHandler.class.getName()); // Client连接 - private static final ConcurrentMap CLIENTS = new ConcurrentHashMap(); + private static final ConcurrentMap CLIENTS = new ConcurrentHashMap(); - public void completed(Integer result, ServerReadAttachment attachment) { + public void completed(Integer result, ServerAttachment attachment) { // client关闭连接 if (result == -1) { close(attachment); @@ -41,7 +41,7 @@ public class ServerReadCompletionHandler implements CompletionHandler remote={1}", local, remote)); } proxy.bind(new InetSocketAddress(locals[0], Integer.valueOf(locals[1]))); - proxy.accept(new ProxyEstablishmentAttachment(proxy).setClientAttachment(attachment).setRemote(remote), new ProxyEstablishmentCompletionHandler()); + proxy.accept(new ProxyEstablishmentAttachment(proxy).setServerAttachment(attachment).setRemote(remote), new ProxyEstablishmentCompletionHandler()); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, MessageFormat.format("----end port={0} -> remote={1}", local, remote)); } @@ -163,11 +167,11 @@ public class ServerReadCompletionHandler implements CompletionHandler { +public class ServerWriteCompletionHandler implements CompletionHandler { - public void completed(Integer result, ServerWriteAttachment attachment) { + public void completed(Integer result, ServerAttachment attachment) { if (attachment.getWriteBuffer().hasRemaining()) { - attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(attachment.getWriteBuffer(), attachment, this); + attachment.getServer().write(attachment.getWriteBuffer(), attachment, this); return; } + attachment.writed.compareAndSet(true, false); - attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(true, false); - - if (attachment.isConnected() && attachment.getProxyAsynchronousSocketChannel() != null) { - attachment.setReadByteBuffer(ByteBuffer.allocate(4 * 1024 * 1024)); - attachment.getProxyAsynchronousSocketChannel().read(attachment.getReadByteBuffer(), attachment, new ProxyReadCompletionHandler()); + ProxyAttachment proxyAttachment = attachment.getProxyAttachment(); + if (proxyAttachment != null) { + proxyAttachment.setReadBuffer(ByteBuffer.allocate(4 * 1024 * 1024)); + proxyAttachment.getAsynchronousSocketChannel().read(proxyAttachment.getReadBuffer(), proxyAttachment, new ProxyReadCompletionHandler()); } } - public void failed(Throwable exc, ServerWriteAttachment attachment) { - attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(true, false); + public void failed(Throwable exc, ServerAttachment attachment) { + attachment.writed.compareAndSet(true, false); exc.printStackTrace(); } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java new file mode 100644 index 0000000..7094d5a --- /dev/null +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/ServerWriteNoProxyClientReadCompletionHandler.java @@ -0,0 +1,24 @@ +package org.jiucheng.magpiebridge.server.aio; + +import java.nio.channels.CompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ServerWriteNoProxyClientReadCompletionHandler implements CompletionHandler { + + public void completed(Integer result, ServerAttachment attachment) { + if (attachment.getWriteBuffer().hasRemaining()) { + attachment.getServer().write(attachment.getWriteBuffer(), attachment, this); + return; + } + attachment.writed.compareAndSet(true, false); + } + + public void failed(Throwable exc, ServerAttachment attachment) { + attachment.writed.compareAndSet(true, false); + exc.printStackTrace(); + } +} diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java similarity index 32% rename from magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteAttachment.java rename to magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java index 0419ce6..1539b83 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyAttachment.java @@ -1,11 +1,12 @@ package org.jiucheng.magpiebridge.server.aio.proxy; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.jiucheng.magpiebridge.server.aio.ServerReadAttachment; -import org.jiucheng.magpiebridge.server.aio.ServerWriteAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; /** * 代理连接附件 @@ -13,27 +14,20 @@ import org.jiucheng.magpiebridge.server.aio.ServerWriteAttachment; * @author jiucheng * */ -public class ProxyWriteAttachment { +public class ProxyAttachment { // 当前在写 public final AtomicBoolean writed = new AtomicBoolean(false); // 代理连接 private AsynchronousSocketChannel asynchronousSocketChannel; // 待写入数据 - private ByteBuffer writeByteBuffer; + private ByteBuffer readBuffer; + // 待写入数据 + private ByteBuffer writeBuffer; // Client附件 - private ServerReadAttachment clientAttachment; - private ServerWriteAttachment clientWriteAttachment; - - public ServerWriteAttachment getClientWriteAttachment() { - return clientWriteAttachment; - } - - public ProxyWriteAttachment setClientWriteAttachment(ServerWriteAttachment clientWriteAttachment) { - this.clientWriteAttachment = clientWriteAttachment; - return this; - } + private ServerAttachment serverAttachment; + private int uri; - public ProxyWriteAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { + public ProxyAttachment(AsynchronousSocketChannel asynchronousSocketChannel) { this.asynchronousSocketChannel = asynchronousSocketChannel; } @@ -41,21 +35,62 @@ public class ProxyWriteAttachment { return asynchronousSocketChannel; } - public ByteBuffer getWriteByteBuffer() { - return writeByteBuffer; + public int getUri() { + return uri; + } + + public ProxyAttachment setUri(int uri) { + this.uri = uri; + return this; + } + + public ByteBuffer getReadBuffer() { + return readBuffer; + } + + public ProxyAttachment setReadBuffer(ByteBuffer readBuffer) { + this.readBuffer = readBuffer; + return this; + } + + public ByteBuffer getWriteBuffer() { + return writeBuffer; } - public ProxyWriteAttachment setWriteByteBuffer(ByteBuffer writeByteBuffer) { - this.writeByteBuffer = writeByteBuffer; + public ProxyAttachment setWriteBuffer(ByteBuffer writeBuffer) { + this.writeBuffer = writeBuffer; return this; } - public ServerReadAttachment getClientAttachment() { - return clientAttachment; + public ServerAttachment getServerAttachment() { + return serverAttachment; } - public ProxyWriteAttachment setClientAttachment(ServerReadAttachment clientAttachment) { - this.clientAttachment = clientAttachment; + public ProxyAttachment setServerAttachment(ServerAttachment serverAttachment) { + this.serverAttachment = serverAttachment; return this; } + + public boolean canWrited() { + while (!writed.compareAndSet(false, true)) { + try { + TimeUnit.MILLISECONDS.sleep(50L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + public void close() { + ServerAttachment.proxys.remove(uri); + AsynchronousSocketChannel proxy = getAsynchronousSocketChannel(); + if (proxy != null) { + try { + proxy.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java index 0a237e2..c8312e2 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentAttachment.java @@ -2,28 +2,33 @@ package org.jiucheng.magpiebridge.server.aio.proxy; import java.nio.channels.AsynchronousServerSocketChannel; -import org.jiucheng.magpiebridge.server.aio.ServerReadAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; +/** + * + * @author jiucheng + * + */ public class ProxyEstablishmentAttachment { - private AsynchronousServerSocketChannel proxyServer; - private ServerReadAttachment clientAttachment; + private AsynchronousServerSocketChannel proxy; + private ServerAttachment serverAttachment; private String remote; - public ProxyEstablishmentAttachment(AsynchronousServerSocketChannel proxyServer) { - this.proxyServer = proxyServer; + public ProxyEstablishmentAttachment(AsynchronousServerSocketChannel proxy) { + this.proxy = proxy; } - public AsynchronousServerSocketChannel getProxyServer() { - return proxyServer; + public AsynchronousServerSocketChannel getProxy() { + return proxy; } - public ServerReadAttachment getClientAttachment() { - return clientAttachment; + public ServerAttachment getServerAttachment() { + return serverAttachment; } - public ProxyEstablishmentAttachment setClientAttachment(ServerReadAttachment clientAttachment) { - this.clientAttachment = clientAttachment; + public ProxyEstablishmentAttachment setServerAttachment(ServerAttachment serverAttachment) { + this.serverAttachment = serverAttachment; return this; } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java index f74f5a9..4502098 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyEstablishmentCompletionHandler.java @@ -6,37 +6,37 @@ import java.nio.channels.CompletionHandler; import java.util.concurrent.atomic.AtomicInteger; import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.server.aio.ServerWriteAttachment; -import org.jiucheng.magpiebridge.server.aio.ServerWriteCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler; +/** + * + * @author jiucheng + * + */ public class ProxyEstablishmentCompletionHandler implements CompletionHandler { private static final AtomicInteger CLIENTS_ID = new AtomicInteger(); public void completed(AsynchronousSocketChannel result, ProxyEstablishmentAttachment attachment) { - attachment.getProxyServer().accept(new ProxyEstablishmentAttachment(attachment.getProxyServer()).setClientAttachment(attachment.getClientAttachment()).setRemote(attachment.getRemote()), this); + attachment.getProxy().accept(attachment, this); - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.CONNECT); - message.setUri(CLIENTS_ID.incrementAndGet()); - byte[] data = attachment.getRemote().getBytes(); - message.setData(data); - message.setSize(data.length); - - ByteBuffer writeBuffer = Message.toByteBuffer(message); - ProxyWriteAttachment proxyClientWriteAttachment = new ProxyWriteAttachment(result).setClientAttachment(attachment.getClientAttachment()); - - attachment.getClientAttachment().putProxy(message.getUri(), proxyClientWriteAttachment); - - if (attachment.getClientAttachment().canWrited()) { - ServerWriteAttachment clientWriteAttachment = new ServerWriteAttachment(writeBuffer, attachment).setProxyAsynchronousSocketChannel(result).setUri(message.getUri()); - proxyClientWriteAttachment.setClientWriteAttachment(clientWriteAttachment); - attachment.getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, clientWriteAttachment, new ServerWriteCompletionHandler()); + if (attachment.getServerAttachment().canWrited()) { + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.CONNECT); + message.setUri(CLIENTS_ID.incrementAndGet()); + byte[] data = attachment.getRemote().getBytes(); + message.setData(data); + message.setSize(data.length); + ByteBuffer writeBuffer = Message.toByteBuffer(message); + + ServerAttachment.proxys.put(message.getUri(), new ProxyAttachment(result).setUri(message.getUri()).setServerAttachment(attachment.getServerAttachment())); + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); } } public void failed(Throwable exc, ProxyEstablishmentAttachment attachment) { - attachment.getClientAttachment().writed.compareAndSet(true, false); + // attachment.getServerAttachment().writed.compareAndSet(true, false); System.out.println("proxy server failed"); } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java index 5aa9441..21232b4 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyReadCompletionHandler.java @@ -5,48 +5,63 @@ import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import org.jiucheng.magpiebridge.protocol.Message; -import org.jiucheng.magpiebridge.server.aio.ServerWriteAttachment; import org.jiucheng.magpiebridge.server.aio.ServerWriteCompletionHandler; +import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler; -public class ProxyReadCompletionHandler implements CompletionHandler { +/** + * + * @author jiucheng + * + */ +public class ProxyReadCompletionHandler implements CompletionHandler { - public void completed(Integer result, ServerWriteAttachment attachment) { + public void completed(Integer result, ProxyAttachment attachment) { if (result == -1) { try { - attachment.getProxyAsynchronousSocketChannel().close(); + attachment.getAsynchronousSocketChannel().close(); } catch (IOException e) { e.printStackTrace(); } + if (attachment.getServerAttachment().canWrited()) { + Message proxym = new Message(); + proxym.setMagic(Message.MAGIC); + proxym.setType(Message.Type.DISCONNECT); + proxym.setUri(attachment.getUri()); + ByteBuffer writeBuffer = Message.toByteBuffer(proxym); + + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); + return; + } + } + + if (attachment.getServerAttachment().canWrited()) { + ByteBuffer readByteBuffer = attachment.getReadBuffer(); + readByteBuffer.flip(); + Message message = new Message(); + message.setMagic(Message.MAGIC); + message.setType(Message.Type.TRANSFER); + message.setUri(attachment.getUri()); + byte[] bts = new byte[result]; + readByteBuffer.get(bts); + message.setData(bts); + message.setSize(bts.length); + ByteBuffer writeBuffer = Message.toByteBuffer(message); + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteCompletionHandler()); + } + } + + public void failed(Throwable exc, ProxyAttachment attachment) { + if (attachment.getServerAttachment().canWrited()) { Message proxym = new Message(); proxym.setMagic(Message.MAGIC); proxym.setType(Message.Type.DISCONNECT); proxym.setUri(attachment.getUri()); ByteBuffer writeBuffer = Message.toByteBuffer(proxym); - while (!attachment.getProxyAttachment().getClientAttachment().writed.compareAndSet(false, true)) {} - attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, attachment.setWriteBuffer(writeBuffer).setProxyAsynchronousSocketChannel(null), new ServerWriteCompletionHandler()); + + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); return; } - - ByteBuffer readByteBuffer = attachment.getReadByteBuffer(); - readByteBuffer.flip(); - - Message message = new Message(); - message.setMagic(Message.MAGIC); - message.setType(Message.Type.TRANSFER); - message.setUri(attachment.getUri()); - byte[] bts = new byte[result]; - readByteBuffer.get(bts); - message.setData(bts); - message.setSize(bts.length); - - ByteBuffer writeBuffer = Message.toByteBuffer(message); - if (attachment.getProxyAttachment().getClientAttachment().canWrited()) { - attachment.getProxyAttachment().getClientAttachment().getAsynchronousSocketChannel().write(writeBuffer, attachment.setWriteBuffer(writeBuffer), new ServerWriteCompletionHandler()); - } - } - - public void failed(Throwable exc, ServerWriteAttachment attachment) { exc.printStackTrace(); } } diff --git a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java index ddc4750..b55ea0b 100644 --- a/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java +++ b/magpiebridge-server/src/main/java/org/jiucheng/magpiebridge/server/aio/proxy/ProxyWriteCompletionHandler.java @@ -1,19 +1,42 @@ package org.jiucheng.magpiebridge.server.aio.proxy; +import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; -public class ProxyWriteCompletionHandler implements CompletionHandler { +import org.jiucheng.magpiebridge.protocol.Message; +import org.jiucheng.magpiebridge.server.aio.ServerAttachment; +import org.jiucheng.magpiebridge.server.aio.ServerWriteNoProxyClientReadCompletionHandler; + +/** + * + * @author jiucheng + * + */ +public class ProxyWriteCompletionHandler implements CompletionHandler { - public void completed(Integer result, ProxyWriteAttachment attachment) { - if (attachment.getWriteByteBuffer().hasRemaining()) { - attachment.getAsynchronousSocketChannel().write(attachment.getWriteByteBuffer(), attachment, this); + public void completed(Integer result, ProxyAttachment attachment) { + if (attachment.getWriteBuffer().hasRemaining()) { + attachment.getAsynchronousSocketChannel().write(attachment.getWriteBuffer(), attachment, this); return; } attachment.writed.compareAndSet(true, false); } - public void failed(Throwable exc, ProxyWriteAttachment attachment) { + public void failed(Throwable exc, ProxyAttachment attachment) { attachment.writed.compareAndSet(true, false); exc.printStackTrace(); + if (attachment.getServerAttachment().canWrited()) { + Message proxym = new Message(); + proxym.setMagic(Message.MAGIC); + proxym.setType(Message.Type.DISCONNECT); + proxym.setUri(attachment.getUri()); + ByteBuffer writeBuffer = Message.toByteBuffer(proxym); + + attachment.getServerAttachment().getServer().write(writeBuffer, attachment.getServerAttachment().setWriteBuffer(writeBuffer), new ServerWriteNoProxyClientReadCompletionHandler()); + } + ServerAttachment serverAttachment = attachment.getServerAttachment(); + if (serverAttachment != null) { + serverAttachment.close(); + } } } diff --git a/magpiebridge-server/src/main/resources/conf/cfg.properties b/magpiebridge-server/src/main/resources/conf/cfg.properties index 6e1a68c..b02604f 100644 --- a/magpiebridge-server/src/main/resources/conf/cfg.properties +++ b/magpiebridge-server/src/main/resources/conf/cfg.properties @@ -2,6 +2,6 @@ server.ip=0.0.0.0 server.port=9799 -server.d563d057c1bc45f781459faf8bf5b32b.mappings=0.0.0.0:13389/192.168.1.102:3389,0.0.0.0:13306/127.0.0.1:3306,0.0.0.0:13305/192.168.1.102:3305,0.0.0.0:13388/192.168.1.102:3389 +server.d563d057c1bc45f781459faf8bf5b32b.mappings=0.0.0.0:13389/192.168.1.102:3389,0.0.0.0:13306/127.0.0.1:3306 # client -- Gitee