diff --git a/docs/layouts/shortcodes/generated/security_configuration.html b/docs/layouts/shortcodes/generated/security_configuration.html index ff19479042e88..85109758ab99c 100644 --- a/docs/layouts/shortcodes/generated/security_configuration.html +++ b/docs/layouts/shortcodes/generated/security_configuration.html @@ -244,7 +244,7 @@
security.ssl.verify-hostname
- true + false Boolean Flag to enable peer’s hostname verification during ssl handshake. diff --git a/docs/layouts/shortcodes/generated/security_ssl_section.html b/docs/layouts/shortcodes/generated/security_ssl_section.html index ad5c72b3cf232..8a4f869e3085d 100644 --- a/docs/layouts/shortcodes/generated/security_ssl_section.html +++ b/docs/layouts/shortcodes/generated/security_ssl_section.html @@ -136,7 +136,7 @@
security.ssl.verify-hostname
- true + false Boolean Flag to enable peer’s hostname verification during ssl handshake. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 24ecf2f6eaf1e..d4ff443133572 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -519,7 +519,7 @@ public static Configuration forProvider(Configuration configuration, String prov public static final ConfigOption SSL_VERIFY_HOSTNAME = key("security.ssl.verify-hostname") .booleanType() - .defaultValue(true) + .defaultValue(false) .withDescription( "Flag to enable peer’s hostname verification during ssl handshake."); diff --git a/flink-kubernetes/pom.xml b/flink-kubernetes/pom.xml index 1bd47c6f4fa72..2e6ec979718dc 100644 --- a/flink-kubernetes/pom.xml +++ b/flink-kubernetes/pom.xml @@ -32,7 +32,6 @@ under the License. 7.3.1 - 4.1.108.Final --add-opens=java.base/java.util=ALL-UNNAMED @@ -109,22 +108,6 @@ under the License. - - - - - io.netty - netty-codec-http - ${netty.override.version} - test - - - - diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 411fa96d63ca9..bd0f7d620fc49 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -418,6 +418,14 @@ under the License. 2.5.1 runtime + + + io.netty + netty-bom + 4.1.100.Final + pom + import + diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE index 569a191bcb845..a7af47112a60f 100644 --- a/flink-python/src/main/resources/META-INF/NOTICE +++ b/flink-python/src/main/resources/META-INF/NOTICE @@ -58,17 +58,7 @@ The bundled Apache Beam dependencies bundle the following dependencies under the - io.grpc:grpc-stub:1.59.1 - io.grpc:grpc-testing:1.59.1 - io.netty:netty-buffer:4.1.100.Final -- io.netty:netty-codec:4.1.100.Final -- io.netty:netty-codec-http:4.1.100.Final -- io.netty:netty-codec-http2:4.1.100.Final -- io.netty:netty-codec-socks:4.1.100.Final - io.netty:netty-common:4.1.100.Final -- io.netty:netty-handler:4.1.100.Final -- io.netty:netty-handler-proxy:4.1.100.Final -- io.netty:netty-resolver:4.1.100.Final -- io.netty:netty-transport:4.1.100.Final -- io.netty:netty-transport-native-epoll:4.1.100.Final:linux-x86_64 -- io.netty:netty-transport-native-unix-common:4.1.100.Final - io.opencensus:opencensus-api:0.31.0 - io.opencensus:opencensus-contrib-grpc-metrics:0.31.0 - io.perfmark:perfmark-api:0.26.0 diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index a048d67762527..1ebf06708cda7 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -31,7 +31,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -236,8 +237,9 @@ private boolean attemptToBind(final int port) throws Throwable { .setNameFormat("Flink " + serverName + " EventLoop Thread %d") .build(); - final NioEventLoopGroup nioGroup = - new NioEventLoopGroup(numEventLoopThreads, threadFactory); + final MultiThreadIoEventLoopGroup nioGroup = + new MultiThreadIoEventLoopGroup( + numEventLoopThreads, threadFactory, NioIoHandler.newFactory()); this.bootstrap = new ServerBootstrap() diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index f3ced1904e773..2d3e2f6862376 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -33,7 +33,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -110,7 +111,10 @@ public Client( .setNameFormat("Flink " + clientName + " Event Loop Thread %d") .build(); - final EventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); + final MultiThreadIoEventLoopGroup nioGroup = + new MultiThreadIoEventLoopGroup( + numEventLoopThreads, threadFactory, NioIoHandler.newFactory()); + final ByteBufAllocator bufferPool = new NettyBufferPool(numEventLoopThreads); this.bootstrap = diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index 6ba325fef0941..b74e8b6de9422 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -55,7 +55,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -93,11 +94,11 @@ class ClientTest { private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); // Thread pool for client bootstrap (shared between tests) - private NioEventLoopGroup nioGroup; + private MultiThreadIoEventLoopGroup nioGroup; @BeforeEach void setUp() { - nioGroup = new NioEventLoopGroup(); + nioGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); } @AfterEach diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java index a5e91fb89396c..dd83662d7ef05 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java @@ -53,7 +53,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -74,7 +75,8 @@ class KvStateServerTest { // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + private static final MultiThreadIoEventLoopGroup NIO_GROUP = + new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); private static final int TIMEOUT_MILLIS = 10000; diff --git a/flink-rpc/flink-rpc-akka/pom.xml b/flink-rpc/flink-rpc-akka/pom.xml index 795064056e9ca..72b4f08adc3f0 100644 --- a/flink-rpc/flink-rpc-akka/pom.xml +++ b/flink-rpc/flink-rpc-akka/pom.xml @@ -37,7 +37,7 @@ under the License. - 1.1.2 + 1.4.0 @@ -180,12 +180,7 @@ under the License. io.netty:* - - META-INF/license/** - - META-INF/NOTICE.txt + **/* diff --git a/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE b/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE index e0fe50ed48c22..7c5085f590734 100644 --- a/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE +++ b/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE @@ -9,14 +9,39 @@ This project bundles the following dependencies under the Apache Software Licens - com.hierynomus:asn-one:0.6.0 - com.typesafe:config:1.4.2 - com.typesafe:ssl-config-core_2.12:0.6.1 -- io.netty:netty-all:4.1.100.Final - org.agrona:agrona:1.22.0 -- org.apache.pekko:pekko-actor_2.12:1.1.2 -- org.apache.pekko:pekko-remote_2.12:1.1.2 -- org.apache.pekko:pekko-pki_2.12:1.1.2 -- org.apache.pekko:pekko-protobuf-v3_2.12:1.1.2 -- org.apache.pekko:pekko-slf4j_2.12:1.1.2 -- org.apache.pekko:pekko-stream_2.12:1.1.2 +- org.apache.pekko:pekko-actor_2.12:1.4.0 +- org.apache.pekko:pekko-remote_2.12:1.4.0 +- org.apache.pekko:pekko-pki_2.12:1.4.0 +- org.apache.pekko:pekko-protobuf-v3_2.12:1.4.0 +- org.apache.pekko:pekko-slf4j_2.12:1.4.0 +- org.apache.pekko:pekko-stream_2.12:1.4.0 +- com.typesafe:ssl-config-core_2.12:0.7.1 +- io.netty:netty-buffer:4.2.6.Final +- io.netty:netty-transport-native-unix-common:4.2.6.Final +- io.netty:netty-common:4.2.6.Final +- io.netty:netty-transport:4.2.6.Final +- io.netty:netty-transport-classes-epoll:4.2.6.Final +- io.netty:netty-transport-classes-kqueue:4.2.6.Final +- io.netty:netty-resolver-dns-classes-macos:4.2.6.Final +- io.netty:netty-transport-classes-io_uring:4.2.6.Final +- io.netty:netty-codec-classes-quic:4.2.6.Final +- io.netty:netty-transport-native-epoll:linux-x86_64:4.2.6.Final +- io.netty:netty-transport-native-epoll:linux-aarch_64:4.2.6.Final +- io.netty:netty-transport-native-epoll:linux-riscv64:4.2.6.Final +- io.netty:netty-transport-native-io_uring:linux-x86_64:4.2.6.Final +- io.netty:netty-transport-native-io_uring:linux-aarch_64:4.2.6.Final +- io.netty:netty-transport-native-io_uring:linux-riscv64:4.2.6.Final +- io.netty:netty-transport-native-kqueue:osx-x86_64:4.2.6.Final +- io.netty:netty-transport-native-kqueue:osx-aarch_64:4.2.6.Final +- io.netty:netty-resolver-dns-native-macos:osx-x86_64:4.2.6.Final +- io.netty:netty-resolver-dns-native-macos:osx-aarch_64:4.2.6.Final +- io.netty:netty-codec-native-quic:linux-x86_64:4.2.6.Final +- io.netty:netty-codec-native-quic:linux-aarch_64:4.2.6.Final +- io.netty:netty-codec-native-quic:osx-x86_64:4.2.6.Final +- io.netty:netty-codec-native-quic:osx-aarch_64:4.2.6.Final +- io.netty:netty-codec-native-quic:windows-x86_64:4.2.6.Final + The following dependencies all share the same BSD license which you find under licenses/LICENSE.scala. diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java index 78095b8453cc5..773cfae0b7ff5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java @@ -36,7 +36,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec; @@ -147,8 +148,10 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException { } }; - NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); - NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + MultiThreadIoEventLoopGroup bossGroup = + new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()); + MultiThreadIoEventLoopGroup workerGroup = + new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); this.bootstrap = new ServerBootstrap(); this.bootstrap diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java index 0741e3b15de59..3da3f3c16f86c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java @@ -26,8 +26,9 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; @@ -102,7 +103,7 @@ public HttpTestClient(String host, int port) { this.host = host; this.port = port; - this.group = new NioEventLoopGroup(); + this.group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); this.bootstrap = new Bootstrap(); this.bootstrap diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 406ebe85ef005..89f7339b38b42 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1469,7 +1469,7 @@ "properties" : { "checkpointed_size" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto", "properties" : { "min" : { "type" : "integer" @@ -1499,23 +1499,23 @@ }, "state_size" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "end_to_end_duration" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "alignment_buffered" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "processed_data" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "persisted_data" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" } } }, @@ -2072,7 +2072,7 @@ "properties" : { "checkpointed_size" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto", "properties" : { "min" : { "type" : "integer" @@ -2102,11 +2102,11 @@ }, "state_size" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "end_to_end_duration" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "checkpoint_duration" : { "type" : "object", @@ -2114,11 +2114,11 @@ "properties" : { "sync" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "async" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" } } }, @@ -2128,25 +2128,25 @@ "properties" : { "buffered" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "processed" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "persisted" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" }, "duration" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" } } }, "start_delay" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util.stats:StatsSummaryDto" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto" } } }, @@ -2737,7 +2737,30 @@ }, "response" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody" + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody", + "properties" : { + "jobVertexResourceRequirements" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobgraph:JobVertexResourceRequirements", + "properties" : { + "parallelism" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobgraph:JobVertexResourceRequirements:Parallelism", + "properties" : { + "lowerBound" : { + "type" : "integer" + }, + "upperBound" : { + "type" : "integer" + } + } + } + } + } + } + } } }, { "url" : "/jobs/:jobid/resource-requirements", @@ -2754,7 +2777,30 @@ }, "request" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody" + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody", + "properties" : { + "jobVertexResourceRequirements" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobgraph:JobVertexResourceRequirements", + "properties" : { + "parallelism" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobgraph:JobVertexResourceRequirements:Parallelism", + "properties" : { + "lowerBound" : { + "type" : "integer" + }, + "upperBound" : { + "type" : "integer" + } + } + } + } + } + } + } }, "response" : { "type" : "object", @@ -4358,12 +4404,12 @@ "jobId" : { "type" : "any" }, - "assignedTasks" : { - "type" : "integer" - }, "resource" : { "type" : "object", "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo" + }, + "assignedTasks" : { + "type" : "integer" } } } @@ -4551,4 +4597,4 @@ } } } ] -} +} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java index d235f7ea81206..51cfb155fb24a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java @@ -25,11 +25,12 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll; import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption; -import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; @@ -149,9 +150,11 @@ private void initNioBootstrap() { String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")"; - NioEventLoopGroup nioGroup = - new NioEventLoopGroup( - config.getClientNumThreads(), NettyServer.getNamedThreadFactory(name)); + MultiThreadIoEventLoopGroup nioGroup = + new MultiThreadIoEventLoopGroup( + config.getClientNumThreads(), + NettyServer.getNamedThreadFactory(name), + NioIoHandler.newFactory()); bootstrap.group(nioGroup).channel(NioSocketChannel.class); config.getTcpKeepIdleInSeconds() @@ -182,9 +185,11 @@ private void initEpollBootstrap() { String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")"; - EpollEventLoopGroup epollGroup = - new EpollEventLoopGroup( - config.getClientNumThreads(), NettyServer.getNamedThreadFactory(name)); + MultiThreadIoEventLoopGroup epollGroup = + new MultiThreadIoEventLoopGroup( + config.getClientNumThreads(), + NettyServer.getNamedThreadFactory(name), + EpollIoHandler.newFactory()); bootstrap.group(epollGroup).channel(EpollSocketChannel.class); config.getTcpKeepIdleInSeconds() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java index 5d9f4756755cf..18ecb2bfb352b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java @@ -26,10 +26,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll; -import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors; @@ -211,8 +212,11 @@ private void initNioBootstrap() { String name = NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")"; - NioEventLoopGroup nioGroup = - new NioEventLoopGroup(config.getServerNumThreads(), getNamedThreadFactory(name)); + MultiThreadIoEventLoopGroup nioGroup = + new MultiThreadIoEventLoopGroup( + config.getServerNumThreads(), + getNamedThreadFactory(name), + NioIoHandler.newFactory()); bootstrap.group(nioGroup).channel(NioServerSocketChannel.class); } @@ -222,8 +226,11 @@ private void initEpollBootstrap() { String name = NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")"; - EpollEventLoopGroup epollGroup = - new EpollEventLoopGroup(config.getServerNumThreads(), getNamedThreadFactory(name)); + MultiThreadIoEventLoopGroup epollGroup = + new MultiThreadIoEventLoopGroup( + config.getServerNumThreads(), + getNamedThreadFactory(name), + EpollIoHandler.newFactory()); bootstrap.group(epollGroup).channel(EpollServerSocketChannel.class); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 0660873b1cb88..898511b95567d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -367,6 +367,8 @@ private static SslContext createInternalNettySSLContext( sslContextBuilder = SslContextBuilder.forServer(kmf); } + setHostnameVerification(sslContextBuilder, config, clientMode); + Optional tmf = getTrustManagerFactory(config, true); tmf.map(sslContextBuilder::trustManager); @@ -433,6 +435,8 @@ public static SslContext createRestNettySSLContext( sslContextBuilder = SslContextBuilder.forServer(kmf); } + setHostnameVerification(sslContextBuilder, config, clientMode); + if (clientMode || clientAuth != ClientAuth.NONE) { Optional tmf = getTrustManagerFactory(config, false); tmf.map( @@ -453,6 +457,18 @@ public static SslContext createRestNettySSLContext( // Utilities // ------------------------------------------------------------------------ + /** + * Set hostname verification. By default, Netty will enable hostname verification since 4.2.x + * for client-mode connections. + */ + private static void setHostnameVerification( + SslContextBuilder sslContextBuilder, Configuration config, boolean clientMode) { + if (clientMode) { + sslContextBuilder.endpointIdentificationAlgorithm( + config.get(SecurityOptions.SSL_VERIFY_HOSTNAME) ? "HTTPS" : null); + } + } + private static String getAndCheckOption( Configuration config, ConfigOption primaryOption, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 7da3afa63da46..bbe2d0cf3df23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -60,9 +60,10 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory; import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException; @@ -281,16 +282,15 @@ protected void initChannel(SocketChannel socketChannel) { }; if (group == null) { - // No NioEventLoopGroup constructor available that allows passing nThreads, - // threadFactory, - // and selectStrategyFactory without also passing a SelectorProvider, so mimicking its + // No NioIoHandler constructor available that allows only passing selectStrategyFactory + // without also passing a SelectorProvider, so mimicking its // default value seen in other constructors group = - new NioEventLoopGroup( + new MultiThreadIoEventLoopGroup( 1, new ExecutorThreadFactory("flink-rest-client-netty"), - SelectorProvider.provider(), - selectStrategyFactory); + NioIoHandler.newFactory( + SelectorProvider.provider(), selectStrategyFactory)); useInternalEventLoopGroup = true; } else { Preconditions.checkArgument( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index bd521227b8c11..acf626ff1974e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -48,7 +48,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec; @@ -242,12 +243,16 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException { } }; - NioEventLoopGroup bossGroup = - new NioEventLoopGroup( - 1, new ExecutorThreadFactory("flink-rest-server-netty-boss")); - NioEventLoopGroup workerGroup = - new NioEventLoopGroup( - 0, new ExecutorThreadFactory("flink-rest-server-netty-worker")); + MultiThreadIoEventLoopGroup bossGroup = + new MultiThreadIoEventLoopGroup( + 1, + new ExecutorThreadFactory("flink-rest-server-netty-boss"), + NioIoHandler.newFactory()); + MultiThreadIoEventLoopGroup workerGroup = + new MultiThreadIoEventLoopGroup( + 0, + new ExecutorThreadFactory("flink-rest-server-netty-worker"), + NioIoHandler.newFactory()); bootstrap = new ServerBootstrap(); bootstrap diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java index 2ecbe743cdaa7..aab65977635f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java @@ -85,14 +85,15 @@ protected CompletableFuture respondToRequest( response = FutureUtils.completedExceptionally(e); } - return response.thenAccept( + return response.thenAcceptAsync( resp -> HandlerUtils.sendResponse( ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), - responseHeaders)); + responseHeaders), + ctx.executor()); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java index c20087155ec22..1b4d8db7815e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.utils.ExecutionUtils; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; @@ -74,7 +75,11 @@ class ExecutionPartitionLifecycleTest { @RegisterExtension static final TestExecutorExtension EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorExtension(); + TestingUtils.jmAsyncThreadExecutorExtension(); + + @RegisterExtension + static final TestExecutorExtension JM_MAIN_THREAD_EXECUTOR_RESOURCE = + TestingUtils.jmMainThreadExecutorExtension(); private Execution execution; private ResultPartitionDeploymentDescriptor descriptor; @@ -118,11 +123,14 @@ private void testPartitionReleaseOnStateTransitionsAfterRunning( taskManagerGateway, testingShuffleMaster); - stateTransition1.accept(execution); - assertThat(releasePartitionsCallFuture).isNotDone(); + runInMainThread( + () -> { + stateTransition1.accept(execution); + assertThat(releasePartitionsCallFuture).isNotDone(); - stateTransition2.accept(execution); - assertThat(releasePartitionsCallFuture).isDone(); + stateTransition2.accept(execution); + assertThat(releasePartitionsCallFuture).isDone(); + }); final Tuple2> releasePartitionsCall = releasePartitionsCallFuture.get(); @@ -221,7 +229,10 @@ private void testPartitionTrackingForStateTransition( assertThat(startTrackingCall.f0).isEqualTo(taskExecutorResourceId); assertThat(startTrackingCall.f1).isEqualTo(descriptor); - stateTransition.accept(execution); + runInMainThread( + () -> { + stateTransition.accept(execution); + }); switch (partitionReleaseResult) { case NONE: @@ -279,7 +290,8 @@ private void setupExecutionGraphAndStartRunningJob( final SchedulerBase scheduler = new DefaultSchedulerBuilder( jobGraph, - ComponentMainThreadExecutorServiceAdapter.forMainThread(), + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + JM_MAIN_THREAD_EXECUTOR_RESOURCE.getExecutor()), EXECUTOR_RESOURCE.getExecutor()) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( @@ -289,15 +301,16 @@ private void setupExecutionGraphAndStartRunningJob( .build(); final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); - final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID()); final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; execution = executionVertex.getCurrentExecutionAttempt(); - scheduler.startScheduling(); - execution.switchToInitializing(); - execution.switchToRunning(); + runInMainThread(scheduler::startScheduling); + ExecutionUtils.waitForTaskDeploymentDescriptorsCreation(executionVertex); + + assertThat(execution.switchToInitializing()).isTrue(); + assertThat(execution.switchToRunning()).isTrue(); final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex.getProducedDataSets()[0].getPartitions()[0].getPartitionId(); @@ -319,6 +332,10 @@ private JobVertex createNoOpJobVertex() { return jobVertex; } + private static void runInMainThread(final Runnable runnable) { + CompletableFuture.runAsync(runnable, JM_MAIN_THREAD_EXECUTOR_RESOURCE.getExecutor()).join(); + } + private static class TestingShuffleMaster implements ShuffleMaster { final Queue externallyReleasedPartitions = new ArrayBlockingQueue<>(4); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java index 27566b2e3bae4..9fcb4c0abd817 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java @@ -36,6 +36,8 @@ /** Simple netty connection manager test. */ class NettyConnectionManagerTest { + private static final String EVENT_EXECUTOR_GROUP_FIELD_CHILDREN = "children"; + /** * Tests that the number of arenas and number of threads of the client and server are set to the * same number, that is the number of configured task slots. @@ -68,7 +70,7 @@ void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception { Bootstrap boostrap = connectionManager.getClient().getBootstrap(); EventLoopGroup group = boostrap.config().group(); - Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + Field f = getInheritedField(group.getClass()); f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); @@ -80,7 +82,7 @@ void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception { ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap(); EventLoopGroup group = bootstrap.config().group(); - Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + Field f = getInheritedField(group.getClass()); f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); @@ -92,7 +94,7 @@ void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception { ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap(); EventLoopGroup group = bootstrap.childGroup(); - Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + Field f = getInheritedField(group.getClass()); f.setAccessible(true); Object[] eventExecutors = (Object[]) f.get(group); @@ -104,4 +106,18 @@ private NettyConnectionManager createNettyConnectionManager(NettyConfig config) return new NettyConnectionManager( new ResultPartitionManager(), new TaskEventDispatcher(), config, true); } + + private static Field getInheritedField(Class clazz) { + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(EVENT_EXECUTOR_GROUP_FIELD_CHILDREN); + field.setAccessible(true); + return field; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new IllegalArgumentException( + "Field " + EVENT_EXECUTOR_GROUP_FIELD_CHILDREN + " not found in hierarchy"); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java index 25d685e3189b6..3b2f96d7912c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -39,10 +39,10 @@ import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.MultiThreadIoEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategy; import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioIoHandler; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.assertj.core.api.InstanceOfAssertFactories; @@ -121,9 +121,11 @@ void testConnectionTimeout() throws Exception { @Test void testExternalEventGroup() throws Exception { - EventLoopGroup externalGroup = - new NioEventLoopGroup( - 1, new ExecutorThreadFactory("flink-rest-client-netty-external")); + MultiThreadIoEventLoopGroup externalGroup = + new MultiThreadIoEventLoopGroup( + 1, + new ExecutorThreadFactory("flink-rest-client-netty-external"), + NioIoHandler.newFactory()); final RestClient restClient = new RestClient( diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java index a3613e38a06ae..c2e849ff0b622 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java @@ -88,14 +88,15 @@ protected CompletableFuture respondToRequest( response = FutureUtils.completedExceptionally(e); } - return response.thenAccept( + return response.thenAcceptAsync( resp -> HandlerUtils.sendResponse( ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), - responseHeaders)); + responseHeaders), + ctx.executor()); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java index 92d9f8fbd959c..26a56ad19680d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java @@ -92,7 +92,7 @@ public abstract class ExecNodeBase implements ExecNode { * Retrieves the default context from the {@link ExecNodeMetadata} annotation to be serialized * into the JSON plan. */ - @JsonProperty(value = FIELD_NAME_TYPE, access = JsonProperty.Access.READ_ONLY, index = 1) + @JsonProperty(value = FIELD_NAME_TYPE, index = 1) protected final ExecNodeContext getContextFromAnnotation() { return isCompiled ? context : ExecNodeContext.newContext(this.getClass()).withId(getId()); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java index b86dfa67c103b..b70f5e45c693a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java @@ -97,8 +97,10 @@ public class UnalignedCheckpointFailureHandlingITCase { @Test public void testCheckpointSuccessAfterFailure() throws Exception { + SharedReference failOnCloseRef = sharedObjects.add(new AtomicBoolean(true)); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - TestCheckpointStorageFactory.failOnCloseRef = sharedObjects.add(new AtomicBoolean(true)); + TestCheckpointStorageFactory.failOnCloseRef = failOnCloseRef; TestCheckpointStorageFactory.tempFolderRef = sharedObjects.add(temporaryFolder); configure( @@ -118,7 +120,7 @@ public void testCheckpointSuccessAfterFailure() throws Exception { waitForJobStatus(jobClient, singletonList(RUNNING)); waitForAllTaskRunning(miniCluster, jobID, false); - triggerFailingCheckpoint(jobID, TestException.class, miniCluster); + triggerFailingCheckpoint(jobID, TestException.class, failOnCloseRef, miniCluster); miniCluster.triggerCheckpoint(jobID).get(); } @@ -161,24 +163,34 @@ private void buildGraph(StreamExecutionEnvironment env) { .sinkTo(new DiscardingSink<>()); } + /** + * Trigger checkpoints until the first failing checkpoint. The exception should come from {@link + * CheckpointStateOutputStream#close()} which should get called by the channel state writer + * after catching an exception in {@link CheckpointStateOutputStream#closeAndGetHandle()}. If + * there are no records in the state, only {@link CheckpointStateOutputStream#close()} will be + * called, so `failOnClose` has to be checked here. + */ private void triggerFailingCheckpoint( - JobID jobID, Class expectedException, MiniCluster miniCluster) + JobID jobID, + Class expectedException, + SharedReference failOnCloseRef, + MiniCluster miniCluster) throws InterruptedException, ExecutionException { - while (true) { + boolean foundCheckpointFailure = false; + do { Optional cpFailure = miniCluster .triggerCheckpoint(jobID) .thenApply(ign -> Optional.empty()) .handle((ign, err) -> Optional.ofNullable(err)) .get(); - if (!cpFailure.isPresent()) { - Thread.sleep(50); // trigger again - in case of no channel data was written - } else if (isCausedBy(cpFailure.get(), expectedException)) { - return; - } else { - rethrow(cpFailure.get()); + + if (cpFailure.isPresent()) { + if (isCausedBy(cpFailure.get(), expectedException)) { + foundCheckpointFailure = true; + } } - } + } while (!foundCheckpointFailure || failOnCloseRef.get().get()); } private boolean isCausedBy(Throwable t, Class expectedException) { diff --git a/pom.xml b/pom.xml index 2029f1cf0752c..3da940de6ffed 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,12 @@ under the License. + + temporary-shaded-rc3 + https://repository.apache.org/content/repositories/orgapacheflink-1878/ + + + redhat @@ -127,9 +133,9 @@ under the License. 4 true -XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${surefire.module.config} - 20.0 - 2.18.2 - 2.9.0 + 21.0 + 2.20.1 + 2.10.0 true 11 17 @@ -355,13 +361,13 @@ under the License. org.apache.flink flink-shaded-asm-9 - 9.6-${flink.shaded.version} + 9.9.1-${flink.shaded.version} org.apache.flink flink-shaded-guava - 33.4.0-jre-${flink.shaded.version} + 33.5.0-jre-${flink.shaded.version} @@ -379,13 +385,13 @@ under the License. org.apache.flink flink-shaded-netty - 4.1.100.Final-${flink.shaded.version} + 4.2.6.Final-${flink.shaded.version} org.apache.flink flink-shaded-netty-tcnative-dynamic - 2.0.62.Final-${flink.shaded.version} + 2.0.74.Final-${flink.shaded.version} test @@ -914,7 +920,7 @@ under the License. io.netty netty-bom - 4.1.100.Final + 4.2.6.Final pom import