From 8737e5de452c4dec90472d48ffce23b7571d320b Mon Sep 17 00:00:00 2001 From: yuanzhishang <18355256200@163.com> Date: Thu, 15 Apr 2021 14:33:24 +0800 Subject: [PATCH 1/7] ok --- README.md | 1 - src/main/java/com/example/ConsumerHead.java | 36 ----- src/main/java/com/example/EmitLogDirect.java | 28 ---- src/main/java/com/example/ProducerHead.java | 61 -------- .../java/com/example/PublishConfirmDemo.java | 135 ------------------ src/main/java/com/example/RPCServer.java | 72 ---------- .../example/utils/ConnectionFactoryUtils.java | 2 +- 7 files changed, 1 insertion(+), 334 deletions(-) delete mode 100644 src/main/java/com/example/ConsumerHead.java delete mode 100644 src/main/java/com/example/EmitLogDirect.java delete mode 100644 src/main/java/com/example/ProducerHead.java delete mode 100644 src/main/java/com/example/PublishConfirmDemo.java delete mode 100644 src/main/java/com/example/RPCServer.java diff --git a/README.md b/README.md index b77fa95..c64883c 100644 --- a/README.md +++ b/README.md @@ -10,4 +10,3 @@ ### int i = 1/2 -### test223267 \ No newline at end of file diff --git a/src/main/java/com/example/ConsumerHead.java b/src/main/java/com/example/ConsumerHead.java deleted file mode 100644 index 09d7074..0000000 --- a/src/main/java/com/example/ConsumerHead.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.example; - -import com.example.utils.ConnectionFactoryUtils; -import com.rabbitmq.client.*; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -/** - * @author wangxiyuan - * @description - * @createDate 2021/3/25 17:39 - **/ -public class ConsumerHead { - public static void main(String[] args) throws IOException, TimeoutException { - // 创建连接和频道 - ConnectionFactory factory = ConnectionFactoryUtils.getConnectionFactory(); - Connection connection = factory.newConnection(); - Channel channel = connection.createChannel(); - channel.queueDeclare("test_header_queue", false, false, false, null); - System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); - channel.basicConsume("test_header_queue", true, new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - - System.out.println("body:" + new String(body)); - } - }); - /*DeliverCallback deliverCallback = (consumerTag, delivery) -> { - String message = new String(delivery.getBody(), "UTF-8"); - System.out.println(" [x] Received '" + message + "'"); - //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); - }; - channel.basicConsume("test_header_queue", true, deliverCallback, consumerTag -> { });*/ - } -} diff --git a/src/main/java/com/example/EmitLogDirect.java b/src/main/java/com/example/EmitLogDirect.java deleted file mode 100644 index 5cab28f..0000000 --- a/src/main/java/com/example/EmitLogDirect.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.example; - -import com.example.utils.ConnectionFactoryUtils; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -/** - * @author wangxiyuan - * @description - * @createDate 2021/3/24 18:34 - **/ -public class EmitLogDirect { - private static final String EXCHANGE_NAME = "direct_logs"; - - public static void main(String[] argv) throws Exception { - ConnectionFactory factory = ConnectionFactoryUtils.getConnectionFactory(); - try (Connection connection = factory.newConnection(); - Channel channel = connection.createChannel()) { - channel.exchangeDeclare(EXCHANGE_NAME, "direct"); - - String message = "测试消息"; - - channel.basicPublish(EXCHANGE_NAME, argv[0], null, message.getBytes("UTF-8")); - System.out.println(" [x] Sent '" + argv[0] + "':'" + message + "'"); - } - } -} diff --git a/src/main/java/com/example/ProducerHead.java b/src/main/java/com/example/ProducerHead.java deleted file mode 100644 index 91355d5..0000000 --- a/src/main/java/com/example/ProducerHead.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.example; - -import com.example.utils.ConnectionFactoryUtils; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.BuiltinExchangeType; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeoutException; - -/** - * @author wangxiyuan - * @description - * @createDate 2021/3/25 17:33 - **/ -public class ProducerHead { - public static void main(String[] args) throws IOException, TimeoutException { - String exchangeName = "test_header"; - try (Connection connection = ConnectionFactoryUtils.getConnection(); - Channel channel = connection.createChannel()) { - // 声明交换机和类型headers - channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, false, false, false, null); - - String queue = "test_header_queue"; - // 定义队列 - channel.queueDeclare(queue, false, false, false, null); - - // 声明header - Map bindingHeaders = new HashMap<>(); - /* - all:代表发送消息时必须匹配header中的所有key/val对 - any:代表发送消息时只需要匹配header中的任意一个key/val对 - */ - bindingHeaders.put("x-match", "any"); - bindingHeaders.put("key1", "147"); - bindingHeaders.put("key2", "258"); - bindingHeaders.put("key3", "369"); - - // 将交换机绑定到队列,并指定header信息 - channel.queueBind(queue, exchangeName, "", bindingHeaders); - - String body = "header...."; - - // 发送信息携带的header - Map requestHeader = new HashMap<>(); - requestHeader.put("key1", "147"); -// requestHeader.put("key2", "258"); -// requestHeader.put("key3", "369"); - - // 消息配置对象 - AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); - properties.headers(requestHeader); - - // 将消息直接发送给交换机(携带指定的参数header),让交换机自己根据header条件去转发到指定queue - channel.basicPublish(exchangeName, "", properties.build(), body.getBytes()); - } - } -} diff --git a/src/main/java/com/example/PublishConfirmDemo.java b/src/main/java/com/example/PublishConfirmDemo.java deleted file mode 100644 index 98f213a..0000000 --- a/src/main/java/com/example/PublishConfirmDemo.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.example; - -import com.example.utils.ConnectionFactoryUtils; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.ConfirmCallback; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.function.BooleanSupplier; - -/** - * @author wangxiyuan - * @description - * @createDate 2021/3/25 9:21 - **/ -public class PublishConfirmDemo { - static final int MESSAGE_COUNT = 50_000; - - public static void main(String[] args) throws Exception { - publishMessagesIndividually(); - publishMessagesInBatch(); - handlePublishConfirmsAsynchronously(); - } - - static void publishMessagesIndividually() throws Exception { - try (Connection connection = ConnectionFactoryUtils.getConnection()) { - Channel ch = connection.createChannel(); - String queue = UUID.randomUUID().toString(); - ch.queueDeclare(queue, false, false, true, null); - - ch.confirmSelect(); - long start = System.nanoTime(); - for (int i = 0; i < MESSAGE_COUNT; i++) { - String body = String.valueOf(i); - ch.basicPublish("", queue, null, body.getBytes()); - ch.waitForConfirmsOrDie(5_000); - } - long end = System.nanoTime(); - System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis()); - } - } - - static void publishMessagesInBatch() throws Exception { - try (Connection connection = ConnectionFactoryUtils.getConnection()) { - Channel ch = connection.createChannel(); - - String queue = UUID.randomUUID().toString(); - ch.queueDeclare(queue, false, false, true, null); - - ch.confirmSelect(); - - int batchSize = 100; - int outstandingMessageCount = 0; - - long start = System.nanoTime(); - for (int i = 0; i < MESSAGE_COUNT; i++) { - String body = String.valueOf(i); - ch.basicPublish("", queue, null, body.getBytes()); - outstandingMessageCount++; - - if (outstandingMessageCount == batchSize) { - ch.waitForConfirmsOrDie(5_000); - outstandingMessageCount = 0; - } - } - - if (outstandingMessageCount > 0) { - ch.waitForConfirmsOrDie(5_000); - } - long end = System.nanoTime(); - System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis()); - } - } - - static void handlePublishConfirmsAsynchronously() throws Exception { - try (Connection connection = ConnectionFactoryUtils.getConnection()) { - Channel ch = connection.createChannel(); - - String queue = UUID.randomUUID().toString(); - ch.queueDeclare(queue, false, false, true, null); - - ch.confirmSelect(); - - ConcurrentNavigableMap outstandingConfirms = new ConcurrentSkipListMap<>(); - - ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> { - if (multiple) { - ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( - sequenceNumber, true - ); - confirmed.clear(); - } else { - outstandingConfirms.remove(sequenceNumber); - } - }; - - ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> { - String body = outstandingConfirms.get(sequenceNumber); - System.err.format( - "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n", - body, sequenceNumber, multiple - ); - cleanOutstandingConfirms.handle(sequenceNumber, multiple); - }); - - long start = System.nanoTime(); - for (int i = 0; i < MESSAGE_COUNT; i++) { - String body = String.valueOf(i); - outstandingConfirms.put(ch.getNextPublishSeqNo(), body); - ch.basicPublish("", queue, null, body.getBytes()); - } - - if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) { - throw new IllegalStateException("All messages could not be confirmed in 60 seconds"); - } - - long end = System.nanoTime(); - System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis()); - } - } - - static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException { - int waited = 0; - while (!condition.getAsBoolean() && waited < timeout.toMillis()) { - Thread.sleep(100L); - waited += 100; - } - return condition.getAsBoolean(); - } -} diff --git a/src/main/java/com/example/RPCServer.java b/src/main/java/com/example/RPCServer.java deleted file mode 100644 index 778515e..0000000 --- a/src/main/java/com/example/RPCServer.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.example; - -import com.example.utils.ConnectionFactoryUtils; -import com.rabbitmq.client.*; - -/** - * @author wangxiyuan - * @description - * @createDate 2021/3/24 19:25 - **/ -public class RPCServer { - private static final String RPC_QUEUE_NAME = "rpc_queue"; - - private static int fib(int n) { - if (n == 0) return 0; - if (n == 1) return 1; - return fib(n - 1) + fib(n - 2); - } - - public static void main(String[] argv) throws Exception { - ConnectionFactory factory = ConnectionFactoryUtils.getConnectionFactory(); - try (Connection connection = factory.newConnection(); - Channel channel = connection.createChannel()) { - channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); - // 清空指定队列 - channel.queuePurge(RPC_QUEUE_NAME); - - channel.basicQos(1); - - System.out.println(" [x] Awaiting RPC requests"); - - Object monitor = new Object(); - DeliverCallback deliverCallback = (consumerTag, delivery) -> { - AMQP.BasicProperties replyProps = new AMQP.BasicProperties - .Builder() - .correlationId(delivery.getProperties().getCorrelationId()) - .build(); - - String response = ""; - - try { - String message = new String(delivery.getBody(), "UTF-8"); - int n = Integer.parseInt(message); - - System.out.println(" [.] fib(" + message + ")"); - response += fib(n); - } catch (RuntimeException e) { - System.out.println(" [.] " + e.toString()); - } finally { - channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); - // RabbitMq consumer worker thread notifies the RPC server owner thread - synchronized (monitor) { - monitor.notify(); - } - } - }; - - channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); - // Wait and be prepared to consume the message from RPC client. - while (true) { - synchronized (monitor) { - try { - monitor.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - } -} diff --git a/src/main/java/com/example/utils/ConnectionFactoryUtils.java b/src/main/java/com/example/utils/ConnectionFactoryUtils.java index 674d0b6..cb67db7 100644 --- a/src/main/java/com/example/utils/ConnectionFactoryUtils.java +++ b/src/main/java/com/example/utils/ConnectionFactoryUtils.java @@ -9,7 +9,7 @@ import java.util.concurrent.TimeoutException; /** * @author wangxiyuan * @description - * @createDate 2021/3/24 18:41 + * @createDate 2021/4/15 14:28 **/ public class ConnectionFactoryUtils { public static ConnectionFactory getConnectionFactory(){ -- Gitee From a3e6f4f795de1c7371493b8efe0b3e872b50d3d9 Mon Sep 17 00:00:00 2001 From: yuanzhishang <18355256200@163.com> Date: Thu, 15 Apr 2021 14:42:40 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c64883c..d48582d 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,4 @@ ### git reset ### int i = 1/2 - +1 \ No newline at end of file -- Gitee From e1f96ac36575c62d6d045f6c05380c63ec360fc6 Mon Sep 17 00:00:00 2001 From: yuanzhishang <18355256200@163.com> Date: Thu, 15 Apr 2021 14:42:50 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d48582d..cd64088 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,4 @@ ### git reset ### int i = 1/2 -1 \ No newline at end of file +12 \ No newline at end of file -- Gitee From 54caff27d432b898b778779cf240dfd69767beaa Mon Sep 17 00:00:00 2001 From: yuanzhishang <18355256200@163.com> Date: Thu, 15 Apr 2021 14:42:58 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cd64088..396e964 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,4 @@ ### git reset ### int i = 1/2 -12 \ No newline at end of file +1222 \ No newline at end of file -- Gitee From c3ef794604be403c05587a761f82cafa0cb9a391 Mon Sep 17 00:00:00 2001 From: yuanzhishang <18355256200@163.com> Date: Thu, 15 Apr 2021 14:43:12 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 396e964..dbb0d4d 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,4 @@ ### git reset ### int i = 1/2 -1222 \ No newline at end of file +1222555 \ No newline at end of file -- Gitee From 68e6c2c616500891db85bd82b0861143260ddf6e Mon Sep 17 00:00:00 2001 From: yuanzhishang <18355256200@163.com> Date: Thu, 15 Apr 2021 14:43:30 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d48582d..dbb0d4d 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,4 @@ ### git reset ### int i = 1/2 -1 \ No newline at end of file +1222555 \ No newline at end of file -- Gitee From caab212c6cf312b1b990cc4ec63a6f3400233821 Mon Sep 17 00:00:00 2001 From: yuanzhishang <18355256200@163.com> Date: Fri, 16 Apr 2021 09:44:52 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index dbb0d4d..d4021f0 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,5 @@ ### git reset ### int i = 1/2 -1222555 \ No newline at end of file +1222555 +测试创建新的分支 提交代码 \ No newline at end of file -- Gitee