代码拉取完成,页面将自动刷新
From 955428278ccd9bfa0f15e21a8d3040c5213358bd Mon Sep 17 00:00:00 2001
From: Dongyuan Pan <dongyuanpan0@gmail.com>
Date: Tue, 4 Jul 2023 18:01:48 +0800
Subject: [PATCH 1/5] [ISSUE #6991] Delete rocketmq.client.logUseSlf4j=true in
JAVA_OPT
---
distribution/bin/runbroker.cmd | 1 -
distribution/bin/runbroker.sh | 1 -
2 files changed, 2 deletions(-)
diff --git a/distribution/bin/runbroker.cmd b/distribution/bin/runbroker.cmd
index 15f676aa8..77a0d1ff8 100644
--- a/distribution/bin/runbroker.cmd
+++ b/distribution/bin/runbroker.cmd
@@ -36,7 +36,6 @@ set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
set "JAVA_OPT=%JAVA_OPT% -XX:+AlwaysPreTouch"
set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g"
set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages -XX:-UseBiasedLocking"
-set "JAVA_OPT=%JAVA_OPT% -Drocketmq.client.logUseSlf4j=true"
set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp %CLASSPATH%"
"%JAVA%" %JAVA_OPT% %*
\ No newline at end of file
diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh
index a081df79e..e6e2132ab 100644
--- a/distribution/bin/runbroker.sh
+++ b/distribution/bin/runbroker.sh
@@ -106,7 +106,6 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
-JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
--
2.32.0.windows.2
From 00fc42b8be848fc3f5c550cbab007b92f128dc38 Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Tue, 4 Jul 2023 18:02:16 +0800
Subject: [PATCH 2/5] [ISSUE #6957] Support Proxy Protocol for gRPC and
Remoting Server (#6958)
---
WORKSPACE | 1 +
.../common/constant/HAProxyConstants.java | 28 ++++
pom.xml | 5 +
proxy/BUILD.bazel | 2 +
proxy/pom.xml | 4 +
.../proxy/grpc/GrpcServerBuilder.java | 2 +-
...ava => ProxyAndTlsProtocolNegotiator.java} | 139 ++++++++++++++++--
.../proxy/grpc/constant/AttributeKeys.java | 44 ++++++
.../grpc/interceptor/HeaderInterceptor.java | 32 +++-
.../remoting/MultiProtocolRemotingServer.java | 5 +-
.../remoting/common/RemotingHelper.java | 42 ++++--
.../remoting/netty/AttributeKeys.java | 45 ++++++
.../remoting/netty/NettyRemotingServer.java | 129 ++++++++++++++--
.../rocketmq/remoting/ProxyProtocolTest.java | 116 +++++++++++++++
.../org/apache/rocketmq/remoting/TlsTest.java | 28 ++--
15 files changed, 563 insertions(+), 59 deletions(-)
create mode 100644 common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
rename proxy/src/main/java/org/apache/rocketmq/proxy/grpc/{OptionalSSLProtocolNegotiator.java => ProxyAndTlsProtocolNegotiator.java} (51%)
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
diff --git a/WORKSPACE b/WORKSPACE
index fbb694efe..e3a8f37dc 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -104,6 +104,7 @@ maven_install(
"software.amazon.awssdk:s3:2.20.29",
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
+ "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
],
fetch_sources = True,
repositories = [
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
new file mode 100644
index 000000000..c1ae0cca1
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.constant;
+
+public class HAProxyConstants {
+
+ public static final String PROXY_PROTOCOL_PREFIX = "proxy_protocol_";
+ public static final String PROXY_PROTOCOL_ADDR = PROXY_PROTOCOL_PREFIX + "addr";
+ public static final String PROXY_PROTOCOL_PORT = PROXY_PROTOCOL_PREFIX + "port";
+ public static final String PROXY_PROTOCOL_SERVER_ADDR = PROXY_PROTOCOL_PREFIX + "server_addr";
+ public static final String PROXY_PROTOCOL_SERVER_PORT = PROXY_PROTOCOL_PREFIX + "server_port";
+ public static final String PROXY_PROTOCOL_TLV_PREFIX = PROXY_PROTOCOL_PREFIX + "tlv_0x";
+}
diff --git a/pom.xml b/pom.xml
index a3b474602..12bc2dbd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -888,6 +888,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.github.aliyunmq</groupId>
+ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
+ <version>1.0.0</version>
+ </dependency>
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
index fcb85e46f..b4f3c16e2 100644
--- a/proxy/BUILD.bazel
+++ b/proxy/BUILD.bazel
@@ -46,6 +46,7 @@ java_library(
"@maven//:io_grpc_grpc_services",
"@maven//:io_grpc_grpc_stub",
"@maven//:io_netty_netty_all",
+ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
"@maven//:io_openmessaging_storage_dledger",
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
@@ -94,6 +95,7 @@ java_library(
"@maven//:io_grpc_grpc_netty_shaded",
"@maven//:io_grpc_grpc_stub",
"@maven//:io_netty_netty_all",
+ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
diff --git a/proxy/pom.xml b/proxy/pom.xml
index f14155737..3fbea107a 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -75,6 +75,10 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.github.aliyunmq</groupId>
+ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 0ca6a1fcb..437b9216b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -50,7 +50,7 @@ public class GrpcServerBuilder {
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
serverBuilder = NettyServerBuilder.forPort(port);
- serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());
+ serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
// build server
int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
similarity index 51%
rename from proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
rename to proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index 670e1c1a2..ceb9becc0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -16,36 +16,53 @@
*/
package org.apache.rocketmq.proxy.grpc;
+import io.grpc.Attributes;
import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
+import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
+import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionResult;
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import io.grpc.netty.shaded.io.netty.util.AsciiString;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
+import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
-public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final String HA_PROXY_DECODER = "HAProxyDecoder";
+ private static final String HA_PROXY_HANDLER = "HAProxyHandler";
+ private static final String TLS_MODE_HANDLER = "TlsModeHandler";
/**
* the length of the ssl record header (in bytes)
*/
@@ -53,7 +70,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
private static SslContext sslContext;
- public OptionalSSLProtocolNegotiator() {
+ public ProxyAndTlsProtocolNegotiator() {
sslContext = loadSslContext();
}
@@ -64,11 +81,12 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
- return new PortUnificationServerHandler(grpcHandler);
+ return new ProxyAndTlsProtocolHandler(grpcHandler);
}
@Override
- public void close() {}
+ public void close() {
+ }
private static SslContext loadSslContext() {
try {
@@ -85,8 +103,8 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
try (InputStream serverKeyInputStream = Files.newInputStream(
Paths.get(tlsKeyPath));
- InputStream serverCertificateStream = Files.newInputStream(
- Paths.get(tlsCertPath))) {
+ InputStream serverCertificateStream = Files.newInputStream(
+ Paths.get(tlsCertPath))) {
SslContext res = GrpcSslContexts.forServer(serverCertificateStream,
serverKeyInputStream)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
@@ -102,12 +120,95 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
}
}
- public static class PortUnificationServerHandler extends ByteToMessageDecoder {
+ private static class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder {
+
+ private final GrpcHttp2ConnectionHandler grpcHandler;
+
+ public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler grpcHandler) {
+ this.grpcHandler = grpcHandler;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
+ try {
+ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(
+ in);
+ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+ return;
+ }
+ if (ha.state() == ProtocolDetectionState.DETECTED) {
+ ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
+ .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
+ .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
+ } else {
+ ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
+ }
+
+ ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+ ctx.pipeline().remove(this);
+ } catch (Exception e) {
+ log.error("process proxy protocol negotiator failed.", e);
+ throw e;
+ }
+ }
+ }
+
+ private static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
+
+ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof HAProxyMessage) {
+ replaceEventWithMessage((HAProxyMessage) msg);
+ ctx.fireUserEventTriggered(pne);
+ } else {
+ super.channelRead(ctx, msg);
+ }
+ ctx.pipeline().remove(this);
+ }
+
+ /**
+ * The definition of key refers to the implementation of nginx
+ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
+ *
+ * @param msg
+ */
+ private void replaceEventWithMessage(HAProxyMessage msg) {
+ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ Attributes.Key<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ builder.set(key, value);
+ });
+ }
+ pne = InternalProtocolNegotiationEvent
+ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+ }
+ }
+
+ private static class TlsModeHandler extends ByteToMessageDecoder {
+
+ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
private final ChannelHandler ssl;
private final ChannelHandler plaintext;
- public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) {
+ public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
.newHandler(grpcHandler);
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
@@ -115,8 +216,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
}
@Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
- throws Exception {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
if (TlsMode.ENFORCING.equals(tlsMode)) {
@@ -134,12 +234,21 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
}
}
- ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+ ctx.fireUserEventTriggered(pne);
ctx.pipeline().remove(this);
} catch (Exception e) {
log.error("process ssl protocol negotiator failed.", e);
throw e;
}
}
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof ProtocolNegotiationEvent) {
+ pne = (ProtocolNegotiationEvent) evt;
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
new file mode 100644
index 000000000..096a5ba3d
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.grpc.constant;
+
+import io.grpc.Attributes;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AttributeKeys {
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_ADDR =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_PORT =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_ADDR =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
+
+ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_PORT =
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
+
+ private static final Map<String, Attributes.Key<String>> ATTRIBUTES_KEY_MAP = new ConcurrentHashMap<>();
+
+ public static Attributes.Key<String> valueOf(String name) {
+ return ATTRIBUTES_KEY_MAP.computeIfAbsent(name, key -> Attributes.Key.create(name));
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
index 1cbb00361..13893e5ed 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
@@ -18,11 +18,16 @@
package org.apache.rocketmq.proxy.grpc.interceptor;
import com.google.common.net.HostAndPort;
+import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
+
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -33,13 +38,27 @@ public class HeaderInterceptor implements ServerInterceptor {
Metadata headers,
ServerCallHandler<R, W> next
) {
- SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
- String remoteAddress = parseSocketAddress(remoteSocketAddress);
+ String remoteAddress = getProxyProtocolAddress(call.getAttributes());
+ if (StringUtils.isBlank(remoteAddress)) {
+ SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+ remoteAddress = parseSocketAddress(remoteSocketAddress);
+ }
headers.put(InterceptorConstants.REMOTE_ADDRESS, remoteAddress);
SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
String localAddress = parseSocketAddress(localSocketAddress);
headers.put(InterceptorConstants.LOCAL_ADDRESS, localAddress);
+
+ for (Attributes.Key<?> key : call.getAttributes().keys()) {
+ if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
+ continue;
+ }
+ Metadata.Key<String> headerKey
+ = Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER);
+ String headerValue = String.valueOf(call.getAttributes().get(key));
+ headers.put(headerKey, headerValue);
+ }
+
return next.startCall(call, headers);
}
@@ -55,4 +74,13 @@ public class HeaderInterceptor implements ServerInterceptor {
return "";
}
+
+ private String getProxyProtocolAddress(Attributes attributes) {
+ String proxyProtocolAddr = attributes.get(AttributeKeys.PROXY_PROTOCOL_ADDR);
+ String proxyProtocolPort = attributes.get(AttributeKeys.PROXY_PROTOCOL_PORT);
+ if (StringUtils.isBlank(proxyProtocolAddr) || StringUtils.isBlank(proxyProtocolPort)) {
+ return null;
+ }
+ return proxyProtocolAddr + ":" + proxyProtocolPort;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 1142132b7..858b1f022 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -20,8 +20,6 @@ package org.apache.rocketmq.proxy.remoting;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
-import java.io.IOException;
-import java.security.cert.CertificateException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,6 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import java.io.IOException;
+import java.security.cert.CertificateException;
+
/**
* support remoting and http2 protocol at one port
*/
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 75e25a83a..d0750b678 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -21,14 +21,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -43,6 +37,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
public class RemotingHelper {
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
@@ -50,6 +53,9 @@ public class RemotingHelper {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
+ private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+ private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
@@ -203,12 +209,16 @@ public class RemotingHelper {
if (null == channel) {
return "";
}
+ String addr = getProxyProtocolAddress(channel);
+ if (StringUtils.isNotBlank(addr)) {
+ return addr;
+ }
Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
if (att == null) {
// mocked in unit test
return parseChannelRemoteAddr0(channel);
}
- String addr = att.get();
+ addr = att.get();
if (addr == null) {
addr = parseChannelRemoteAddr0(channel);
att.set(addr);
@@ -216,6 +226,18 @@ public class RemotingHelper {
return addr;
}
+ private static String getProxyProtocolAddress(Channel channel) {
+ if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
+ return null;
+ }
+ String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel);
+ String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel);
+ if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) {
+ return null;
+ }
+ return proxyProtocolAddr + ":" + proxyProtocolPort;
+ }
+
private static String parseChannelRemoteAddr0(final Channel channel) {
SocketAddress remote = channel.remoteAddress();
final String addr = remote != null ? remote.toString() : "";
@@ -255,7 +277,7 @@ public class RemotingHelper {
return "";
}
- public static int parseSocketAddressPort(SocketAddress socketAddress) {
+ public static Integer parseSocketAddressPort(SocketAddress socketAddress) {
if (socketAddress instanceof InetSocketAddress) {
return ((InetSocketAddress) socketAddress).getPort();
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
new file mode 100644
index 000000000..4e69ab82d
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+
+import io.netty.util.AttributeKey;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AttributeKeys {
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_PORT =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_ADDR =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
+
+ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_PORT =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
+
+ private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>();
+
+ public static AttributeKey<String> valueOf(String name) {
+ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 9f39d672e..94ffd8d07 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -36,27 +37,25 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.ProtocolDetectionState;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.CharsetUtil;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.cert.CertificateException;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -71,6 +70,19 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
@SuppressWarnings("NullableProblems")
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
@@ -96,6 +108,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>();
public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+ public static final String HA_PROXY_DECODER = "HAProxyDecoder";
+ public static final String HA_PROXY_HANDLER = "HAProxyHandler";
+ public static final String TLS_MODE_HANDLER = "TlsModeHandler";
public static final String TLS_HANDLER_NAME = "sslHandler";
public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
@@ -387,7 +402,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
private void prepareSharableHandlers() {
- handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
+ handshakeHandler = new HandshakeHandler();
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
@@ -437,11 +452,51 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@ChannelHandler.Sharable
public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ private final TlsModeHandler tlsModeHandler;
+
+ public HandshakeHandler() {
+ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
+ try {
+ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in);
+ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+ return;
+ }
+ if (ha.state() == ProtocolDetectionState.DETECTED) {
+ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
+ } else {
+ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
+ }
+
+ try {
+ // Remove this handler
+ ctx.pipeline().remove(this);
+ } catch (NoSuchElementException e) {
+ log.error("Error while removing HandshakeHandler", e);
+ }
+
+ // Hand over this message to the next .
+ ctx.fireChannelRead(in.retain());
+ } catch (Exception e) {
+ log.error("process proxy protocol negotiator failed.", e);
+ throw e;
+ }
+ }
+ }
+
+ @ChannelHandler.Sharable
+ public class TlsModeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
private final TlsMode tlsMode;
private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
- HandshakeHandler(TlsMode tlsMode) {
+ TlsModeHandler(TlsMode tlsMode) {
this.tlsMode = tlsMode;
}
@@ -461,7 +516,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
- .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(defaultEventExecutorGroup, TLS_MODE_HANDLER, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
@@ -483,7 +538,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
- log.error("Error while removing HandshakeHandler", e);
+ log.error("Error while removing TlsModeHandler", e);
}
// Hand over this message to the next .
@@ -706,4 +761,46 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return NettyRemotingServer.this.getCallbackExecutor();
}
}
+
+ public static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof HAProxyMessage) {
+ fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
+ } else {
+ super.channelRead(ctx, msg);
+ }
+ ctx.pipeline().remove(this);
+ }
+
+ /**
+ * The definition of key refers to the implementation of nginx
+ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
+ * @param msg
+ * @param channel
+ */
+ private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) {
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ AttributeKey<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ channel.attr(key).set(value);
+ });
+ }
+ }
+ }
}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
new file mode 100644
index 000000000..c39fd2132
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyProtocolTest {
+
+ private RemotingServer remotingServer;
+ private RemotingClient remotingClient;
+
+ @Before
+ public void setUp() throws Exception {
+ NettyClientConfig clientConfig = new NettyClientConfig();
+ clientConfig.setUseTLS(false);
+
+ remotingServer = RemotingServerTest.createRemotingServer();
+ remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
+
+ await().pollDelay(Duration.ofMillis(10))
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(20, TimeUnit.SECONDS).until(() -> isHostConnectable(getServerAddress()));
+ }
+
+ @Test
+ public void testProxyProtocol() throws Exception {
+ sendHAProxyMessage(remotingClient);
+ requestThenAssertResponse(remotingClient);
+ }
+
+ private void requestThenAssertResponse(RemotingClient remotingClient) throws Exception {
+ RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 10000 * 3);
+ assertNotNull(response);
+ assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
+ assertThat(response.getExtFields()).hasSize(2);
+ assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome");
+ }
+
+ private void sendHAProxyMessage(RemotingClient remotingClient) throws Exception {
+ Method getAndCreateChannel = NettyRemotingClient.class.getDeclaredMethod("getAndCreateChannel", String.class);
+ getAndCreateChannel.setAccessible(true);
+ NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) remotingClient;
+ Channel channel = (Channel) getAndCreateChannel.invoke(nettyRemotingClient, getServerAddress());
+ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY,
+ HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000, 9000);
+
+ ByteBuf byteBuf = Unpooled.directBuffer();
+ Method encode = HAProxyMessageEncoder.class.getDeclaredMethod("encodeV2", HAProxyMessage.class, ByteBuf.class);
+ encode.setAccessible(true);
+ encode.invoke(HAProxyMessageEncoder.INSTANCE, message, byteBuf);
+ channel.writeAndFlush(byteBuf).sync();
+ }
+
+ private static RemotingCommand createRequest() {
+ RequestHeader requestHeader = new RequestHeader();
+ requestHeader.setCount(1);
+ requestHeader.setMessageTitle("Welcome");
+ return RemotingCommand.createRequestCommand(0, requestHeader);
+ }
+
+
+ private String getServerAddress() {
+ return "localhost:" + remotingServer.localListenPort();
+ }
+
+ private boolean isHostConnectable(String addr) {
+ try (Socket socket = new Socket()) {
+ socket.connect(NetworkUtil.string2SocketAddress(addr));
+ return true;
+ } catch (IOException ignored) {
+ }
+ return false;
+ }
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index 3da7abf57..de7edbbfb 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -17,19 +17,6 @@
package org.apache.rocketmq.remoting;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.net.Socket;
-import java.time.Duration;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -47,6 +34,20 @@ import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD;
@@ -234,6 +235,7 @@ public class TlsTest {
@Test
public void serverAcceptsUntrustedClientCert() throws Exception {
requestThenAssertResponse();
+// Thread.sleep(1000000L);
}
/**
--
2.32.0.windows.2
From 4f840afcb04f5cc328795896198c6fba96ff37ec Mon Sep 17 00:00:00 2001
From: mxsm <ljbmxsm@gmail.com>
Date: Wed, 5 Jul 2023 11:03:52 +0800
Subject: [PATCH 3/5] [ISSUE #6960] Added Slot formatting sketch comments
(#6961)
---
.../java/org/apache/rocketmq/store/timer/Slot.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
index b91193b94..2da846cee 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
@@ -16,9 +16,17 @@
*/
package org.apache.rocketmq.store.timer;
+/**
+ * Represents a slot of timing wheel. Format:
+ * ┌────────────┬───────────┬───────────┬───────────┬───────────┐
+ * │delayed time│ first pos │ last pos │ num │ magic │
+ * ├────────────┼───────────┼───────────┼───────────┼───────────┤
+ * │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │
+ * └────────────┴───────────┴───────────┴───────────┴───────────┘
+ */
public class Slot {
public static final short SIZE = 32;
- public final long timeMs;
+ public final long timeMs; //delayed time
public final long firstPos;
public final long lastPos;
public final int num;
--
2.32.0.windows.2
From 58550f074ec101c0a158ede0df1839950e08837a Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Mon, 10 Jul 2023 14:13:18 +0800
Subject: [PATCH 4/5] [ISSUE #7008] Fix the issue of protocol parsing failure
when using haproxy and tls together (#7009)
---
.../remoting/netty/NettyRemotingServer.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 94ffd8d07..445f06cc6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -459,13 +459,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
try {
- ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in);
- if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+ ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
+ if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
return;
}
- if (ha.state() == ProtocolDetectionState.DETECTED) {
+ if (detectionResult.state() == ProtocolDetectionState.DETECTED) {
ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
.addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
.addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
@@ -481,7 +481,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
// Hand over this message to the next .
- ctx.fireChannelRead(in.retain());
+ ctx.fireChannelRead(byteBuf.retain());
} catch (Exception e) {
log.error("process proxy protocol negotiator failed.", e);
throw e;
@@ -503,8 +503,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- // Peek the first byte to determine if the content is starting with TLS handshake
- byte b = msg.getByte(0);
+ // Peek the current read index byte to determine if the content is starting with TLS handshake
+ byte b = msg.getByte(msg.readerIndex());
if (b == HANDSHAKE_MAGIC_CODE) {
switch (tlsMode) {
--
2.32.0.windows.2
From 8e6b5e62bd4da78c0a7d265891c52685fcffd08a Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Mon, 10 Jul 2023 20:14:17 +0800
Subject: [PATCH 5/5] [ISSUE #6999] Add interface ReceiptHandleManager (#7000)
* Add interface ReceiptHandleManager
* fix unit test
* fix
---
.../processor/ReceiptHandleProcessor.java | 10 +-
.../receipt/DefaultReceiptHandleManager.java | 282 ++++++++++++++++++
.../service/receipt/ReceiptHandleManager.java | 260 +---------------
...a => DefaultReceiptHandleManagerTest.java} | 34 +--
4 files changed, 307 insertions(+), 279 deletions(-)
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
rename proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/{ReceiptHandleManagerTest.java => DefaultReceiptHandleManagerTest.java} (93%)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9c7e8dea9..fc49e7622 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -28,12 +28,12 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager;
+import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
import org.apache.rocketmq.proxy.service.ServiceManager;
public class ReceiptHandleProcessor extends AbstractProcessor {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- protected ReceiptHandleManager receiptHandleManager;
+ protected DefaultReceiptHandleManager receiptHandleManager;
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
super(messagingProcessor, serviceManager);
@@ -51,7 +51,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
event.getFuture().complete(v);
});
};
- this.receiptHandleManager = new ReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener);
+ this.receiptHandleManager = new DefaultReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener);
}
protected ProxyContext createContext(String actionName) {
@@ -59,11 +59,11 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
}
public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
- receiptHandleManager.addReceiptHandle(channel, group, msgID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(ctx, channel, group, msgID, messageReceiptHandle);
}
public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) {
- return receiptHandleManager.removeReceiptHandle(channel, group, msgID, receiptHandle);
+ return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle);
}
public static class ReceiptHandleGroupKey {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
new file mode 100644
index 000000000..c7633d658
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.receipt;
+
+import com.google.common.base.Stopwatch;
+import io.netty.channel.Channel;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
+import org.apache.rocketmq.common.state.StateEventListener;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.common.RenewEvent;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+import org.apache.rocketmq.proxy.service.metadata.MetadataService;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+
+public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implements ReceiptHandleManager {
+ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ protected final MetadataService metadataService;
+ protected final ConsumerManager consumerManager;
+ protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
+ protected final StateEventListener<RenewEvent> eventListener;
+ protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
+ protected final ScheduledExecutorService scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
+ protected final ThreadPoolExecutor renewalWorkerService;
+
+ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
+ this.metadataService = metadataService;
+ this.consumerManager = consumerManager;
+ this.eventListener = eventListener;
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
+ proxyConfig.getRenewThreadPoolNums(),
+ proxyConfig.getRenewMaxThreadPoolNums(),
+ 1, TimeUnit.MINUTES,
+ "RenewalWorkerThread",
+ proxyConfig.getRenewThreadPoolQueueCapacity()
+ );
+ consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
+ @Override
+ public void handle(ConsumerGroupEvent event, String group, Object... args) {
+ if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
+ if (args == null || args.length < 1) {
+ return;
+ }
+ if (args[0] instanceof ClientChannelInfo) {
+ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+ if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+ // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
+ return;
+ }
+ clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
+ log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+ });
+ this.receiptHandleGroupMap = new ConcurrentHashMap<>();
+ this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
+ this.appendStartAndShutdown(new StartAndShutdown() {
+ @Override
+ public void start() throws Exception {
+ scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
+ ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ scheduledExecutorService.shutdown();
+ clearAllHandle();
+ }
+ });
+ }
+
+ public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
+ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
+ k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
+ }
+
+ public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) {
+ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
+ if (handleGroup == null) {
+ return null;
+ }
+ return handleGroup.remove(msgID, receiptHandle);
+ }
+
+ protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
+ return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
+ }
+
+ protected void scheduleRenewTask() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
+ ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
+ if (clientIsOffline(key)) {
+ clearGroup(key);
+ continue;
+ }
+
+ ReceiptHandleGroup group = entry.getValue();
+ group.scan((msgID, handleStr, v) -> {
+ long current = System.currentTimeMillis();
+ ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
+ if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
+ return;
+ }
+ renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
+ });
+ }
+ } catch (Exception e) {
+ log.error("unexpect error when schedule renew task", e);
+ }
+
+ log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
+ }
+
+ protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
+ try {
+ group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
+ } catch (Exception e) {
+ log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
+ }
+ }
+
+ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
+ CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ long current = System.currentTimeMillis();
+ try {
+ if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
+ log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
+ return CompletableFuture.completedFuture(null);
+ }
+ if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
+ future.whenComplete((ackResult, throwable) -> {
+ if (throwable != null) {
+ log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
+ if (renewExceptionNeedRetry(throwable)) {
+ messageReceiptHandle.incrementAndGetRenewRetryTimes();
+ resFuture.complete(messageReceiptHandle);
+ } else {
+ resFuture.complete(null);
+ }
+ } else if (AckStatus.OK.equals(ackResult.getStatus())) {
+ messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
+ messageReceiptHandle.resetRenewRetryTimes();
+ messageReceiptHandle.incrementRenewTimes();
+ resFuture.complete(messageReceiptHandle);
+ } else {
+ log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
+ resFuture.complete(null);
+ }
+ });
+ } else {
+ ProxyContext context = createContext("RenewMessage");
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
+ if (subscriptionGroupConfig == null) {
+ log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
+ return CompletableFuture.completedFuture(null);
+ }
+ RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
+ future.whenComplete((ackResult, throwable) -> {
+ if (throwable != null) {
+ log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
+ }
+ resFuture.complete(null);
+ });
+ }
+ } catch (Throwable t) {
+ log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
+ resFuture.complete(null);
+ }
+ return resFuture;
+ }
+
+ protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
+ if (key == null) {
+ return;
+ }
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
+ if (handleGroup == null) {
+ return;
+ }
+ handleGroup.scan((msgID, handle, v) -> {
+ try {
+ handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
+ return CompletableFuture.completedFuture(null);
+ });
+ } catch (Exception e) {
+ log.error("error when clear handle for group. key:{}", key, e);
+ }
+ });
+ }
+
+ protected void clearAllHandle() {
+ log.info("start clear all handle in receiptHandleProcessor");
+ Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
+ for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
+ clearGroup(key);
+ }
+ log.info("clear all handle in receiptHandleProcessor done");
+ }
+
+ protected boolean renewExceptionNeedRetry(Throwable t) {
+ t = ExceptionUtils.getRealException(t);
+ if (t instanceof ProxyException) {
+ ProxyException proxyException = (ProxyException) t;
+ if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
+ ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected ProxyContext createContext(String actionName) {
+ return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName);
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
index f3b805624..6a8888e97 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
@@ -17,266 +17,12 @@
package org.apache.rocketmq.proxy.service.receipt;
-import com.google.common.base.Stopwatch;
import io.netty.channel.Channel;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
-import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.client.consumer.AckResult;
-import org.apache.rocketmq.client.consumer.AckStatus;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.common.state.StateEventListener;
-import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
-import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
-import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
-import org.apache.rocketmq.proxy.service.metadata.MetadataService;
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
-import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-public class ReceiptHandleManager extends AbstractStartAndShutdown {
- protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- protected final MetadataService metadataService;
- protected final ConsumerManager consumerManager;
- protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
- protected final StateEventListener<RenewEvent> eventListener;
- protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
- protected final ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
- protected final ThreadPoolExecutor renewalWorkerService;
+public interface ReceiptHandleManager {
+ void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle);
- public ReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
- this.metadataService = metadataService;
- this.consumerManager = consumerManager;
- this.eventListener = eventListener;
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
- proxyConfig.getRenewThreadPoolNums(),
- proxyConfig.getRenewMaxThreadPoolNums(),
- 1, TimeUnit.MINUTES,
- "RenewalWorkerThread",
- proxyConfig.getRenewThreadPoolQueueCapacity()
- );
- consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
- @Override
- public void handle(ConsumerGroupEvent event, String group, Object... args) {
- if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
- if (args == null || args.length < 1) {
- return;
- }
- if (args[0] instanceof ClientChannelInfo) {
- ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
- if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
- // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
- return;
- }
- clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
- log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
- }
- }
- }
-
- @Override
- public void shutdown() {
-
- }
- });
- this.receiptHandleGroupMap = new ConcurrentHashMap<>();
- this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
- this.appendStartAndShutdown(new StartAndShutdown() {
- @Override
- public void start() throws Exception {
- scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
- ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void shutdown() throws Exception {
- scheduledExecutorService.shutdown();
- clearAllHandle();
- }
- });
- }
-
- public void addReceiptHandle(Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
- k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
- }
-
- public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) {
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
- if (handleGroup == null) {
- return null;
- }
- return handleGroup.remove(msgID, receiptHandle);
- }
-
- protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
- return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
- }
-
- public void scheduleRenewTask() {
- Stopwatch stopwatch = Stopwatch.createStarted();
- try {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
- ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
- if (clientIsOffline(key)) {
- clearGroup(key);
- continue;
- }
-
- ReceiptHandleGroup group = entry.getValue();
- group.scan((msgID, handleStr, v) -> {
- long current = System.currentTimeMillis();
- ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
- if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
- return;
- }
- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
- });
- }
- } catch (Exception e) {
- log.error("unexpect error when schedule renew task", e);
- }
-
- log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
- }
-
- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
- try {
- group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
- } catch (Exception e) {
- log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
- }
- }
-
- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
- CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- long current = System.currentTimeMillis();
- try {
- if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
- log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
- return CompletableFuture.completedFuture(null);
- }
- if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
- CompletableFuture<AckResult> future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
- future.whenComplete((ackResult, throwable) -> {
- if (throwable != null) {
- log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
- if (renewExceptionNeedRetry(throwable)) {
- messageReceiptHandle.incrementAndGetRenewRetryTimes();
- resFuture.complete(messageReceiptHandle);
- } else {
- resFuture.complete(null);
- }
- } else if (AckStatus.OK.equals(ackResult.getStatus())) {
- messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
- messageReceiptHandle.resetRenewRetryTimes();
- messageReceiptHandle.incrementRenewTimes();
- resFuture.complete(messageReceiptHandle);
- } else {
- log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
- resFuture.complete(null);
- }
- });
- } else {
- ProxyContext context = createContext("RenewMessage");
- SubscriptionGroupConfig subscriptionGroupConfig =
- metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
- if (subscriptionGroupConfig == null) {
- log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
- return CompletableFuture.completedFuture(null);
- }
- RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
- CompletableFuture<AckResult> future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
- future.whenComplete((ackResult, throwable) -> {
- if (throwable != null) {
- log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
- }
- resFuture.complete(null);
- });
- }
- } catch (Throwable t) {
- log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
- resFuture.complete(null);
- }
- return resFuture;
- }
-
- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
- if (key == null) {
- return;
- }
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
- if (handleGroup == null) {
- return;
- }
- handleGroup.scan((msgID, handle, v) -> {
- try {
- handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
- CompletableFuture<AckResult> future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
- return CompletableFuture.completedFuture(null);
- });
- } catch (Exception e) {
- log.error("error when clear handle for group. key:{}", key, e);
- }
- });
- }
-
- public void clearAllHandle() {
- log.info("start clear all handle in receiptHandleProcessor");
- Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
- clearGroup(key);
- }
- log.info("clear all handle in receiptHandleProcessor done");
- }
-
- protected boolean renewExceptionNeedRetry(Throwable t) {
- t = ExceptionUtils.getRealException(t);
- if (t instanceof ProxyException) {
- ProxyException proxyException = (ProxyException) t;
- if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
- ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
- return false;
- }
- }
- return true;
- }
-
- protected ProxyContext createContext(String actionName) {
- return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName);
- }
+ MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle);
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
similarity index 93%
rename from proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
rename to proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index 877c9fd6f..7c6943e44 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -62,8 +62,8 @@ import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class ReceiptHandleManagerTest extends BaseServiceTest {
- private ReceiptHandleManager receiptHandleManager;
+public class DefaultReceiptHandleManagerTest extends BaseServiceTest {
+ private DefaultReceiptHandleManager receiptHandleManager;
@Mock
protected MessagingProcessor messagingProcessor;
@Mock
@@ -87,7 +87,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
@Before
public void setup() {
- receiptHandleManager = new ReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() {
+ receiptHandleManager = new DefaultReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() {
@Override
public void fireEvent(RenewEvent event) {
MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle();
@@ -125,7 +125,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
@Test
public void testAddReceiptHandle() {
Channel channel = new LocalChannel();
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleManager.scheduleRenewTask();
@@ -152,9 +152,9 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
.build().encode();
MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
}
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleManager.scheduleRenewTask();
@@ -170,7 +170,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
public void testRenewReceiptHandle() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -216,7 +216,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
public void testRenewExceedMaxRenewTimes() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
@@ -246,7 +246,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
public void testRenewWithInvalidHandle() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -270,7 +270,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
AtomicInteger count = new AtomicInteger(0);
List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -348,7 +348,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -382,7 +382,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()))
@@ -418,7 +418,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -431,8 +431,8 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
@Test
public void testRemoveReceiptHandle() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
- receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleManager.scheduleRenewTask();
@@ -444,7 +444,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
@Test
public void testClearGroup() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -459,7 +459,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
}
--
2.32.0.windows.2
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。