From 0057b66815d134cb7b13bd99dd5f8c15beee0f8d Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Mon, 20 Oct 2025 14:14:15 +0800 Subject: [PATCH 01/15] # 9755 Fix logs Basic issue --- .../engine/server/rest/HttpBasic.java | 26 +++ .../server/rest/service/BaseLogService.java | 75 ++++++-- .../server/rest/service/LogService.java | 52 ++++- .../server/rest/RestApiHttpBasicTest.java | 178 ++++++++++++++++++ 4 files changed, 318 insertions(+), 13 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java new file mode 100644 index 00000000000..e18251478e2 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest; + +import lombok.Data; + +@Data +public class HttpBasic { + public String basicUser; + public String basicPass; +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index a1e84f9ec76..989412cc9bc 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -47,30 +47,83 @@ public String getLogPath() { } protected String sendGet(String urlString) { + HttpURLConnection connection = null; try { - HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection(); + connection = (HttpURLConnection) new URL(urlString).openConnection(); connection.setRequestMethod("GET"); connection.setConnectTimeout(5000); connection.setReadTimeout(5000); connection.connect(); + int code = connection.getResponseCode(); if (connection.getResponseCode() == 200) { - try (InputStream is = connection.getInputStream(); - ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - byte[] buffer = new byte[1024]; - int len; - while ((len = is.read(buffer)) != -1) { - baos.write(buffer, 0, len); - } - return baos.toString(); - } + return readAll(connection.getInputStream()); } + log.warn("GET {} -> HTTP {}", urlString, code); } catch (IOException e) { - log.error("Send get Fail.{}", ExceptionUtils.getMessage(e)); + log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e)); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + return null; + } + + /** + * Send GET request with Basic Auth + * + * @param urlString url + * @param user name + * @param pass password + * @return log + */ + protected String sendGet(String urlString, String user, String pass) { + HttpURLConnection connection = null; + try { + connection = (HttpURLConnection) new URL(urlString).openConnection(); + connection.setRequestMethod("GET"); + connection.setConnectTimeout(5000); + connection.setReadTimeout(5000); + + // Basic Auth + if (user != null && pass != null) { + String token = java.util.Base64.getEncoder() + .encodeToString((user + ":" + pass).getBytes(java.nio.charset.StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + token); + } + + connection.connect(); + + int code = connection.getResponseCode(); + if (connection.getResponseCode() == 200) { + return readAll(connection.getInputStream()); + } + log.warn("GET {} -> HTTP {}", urlString, code); + } catch (IOException e) { + log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e)); + } finally { + if (connection != null) { + connection.disconnect(); + } } return null; } + private String readAll(InputStream is) throws IOException { + if (is == null) { + return null; + } + try (InputStream in = is; ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + byte[] buf = new byte[4096]; + int len; + while ((len = in.read(buf)) != -1) { + baos.write(buf, 0, len); + } + return baos.toString(java.nio.charset.StandardCharsets.UTF_8.name()); + } + } + public String getLogParam(String uri, String contextPath) { uri = uri.substring(uri.indexOf(contextPath) + contextPath.length()); uri = StringUtil.stripTrailingSlash(uri).substring(1); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java index 304265f2fe8..08caa969d47 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java @@ -62,6 +62,9 @@ public List> allLogNameList(String jobId) { String contextPath = httpConfig.getContextPath(); int port = httpConfig.getPort(); + // Take BasicAuth from configuration (if enabled) + HttpBasic result = getHttpBasicAuth(httpConfig); + List> allLogNameList = new ArrayList<>(); JsonArray systemMonitoringInformationJsonValues = @@ -70,8 +73,20 @@ public List> allLogNameList(String jobId) { systemMonitoringInformation -> { String host = systemMonitoringInformation.asObject().get("host").asString(); String url = "http://" + host + ":" + port + contextPath; - String allName = sendGet(url + REST_URL_GET_ALL_LOG_NAME); - log.debug(String.format("Request: %s , Result: %s", url, allName)); + String logUrl = url + REST_URL_GET_ALL_LOG_NAME; + + String allName = httpConfig.isEnableBasicAuth() + ? sendGet(logUrl, result.basicUser, result.basicPass) + : sendGet(logUrl); + + if (allName == null || allName.trim().isEmpty()) { + log.warn("GET {} returned empty body (null/empty). Skip this node.", logUrl); + return; + } + + if (log.isDebugEnabled()) { + log.debug("Request: {} , Result: {}", url, allName); + } ArrayNode jsonNodes = JsonUtils.parseArray(allName); jsonNodes.forEach( @@ -91,6 +106,39 @@ public List> allLogNameList(String jobId) { return allLogNameList; } + private static HttpBasic getHttpBasicAuth(HttpConfig httpConfig) { + String basicUser = ""; + String basicPass = ""; + try { + if (httpConfig.isEnableBasicAuth()) { + basicUser = httpConfig.getBasicAuthUsername(); + basicPass = httpConfig.getBasicAuthPassword(); + } + } catch (Throwable ignore) { + // Compatible with older versions: If HttpConfig does not have these methods, use system properties or environment variables to find out + basicUser = System.getProperty("seatunnel.http.user", System.getenv("BASIC_AUTH_USER")); + basicPass = System.getProperty("seatunnel.http.pass", System.getenv("BASIC_AUTH_PASS")); + log.warn("Use system property or environment variable to set basic auth."); + } + + if (StringUtils.isNotBlank(basicUser) && StringUtils.isNotBlank(basicPass)) { + httpConfig.setBasicAuthUsername(basicUser); + httpConfig.setBasicAuthPassword(basicPass); + } + return new HttpBasic(basicUser, basicPass); + } + + private static class HttpBasic { + + public final String basicUser; + public final String basicPass; + + public HttpBasic(String basicUser, String basicPass) { + this.basicUser = basicUser; + this.basicPass = basicPass; + } + } + public JsonArray allNodeLogFormatJson(String jobId) { return allLogNameList(jobId).stream() diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java new file mode 100644 index 00000000000..8a476645c55 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.stream.Collectors; + +import com.hazelcast.config.Config; +import com.hazelcast.internal.serialization.Data; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; +import org.apache.seatunnel.engine.common.config.server.HttpConfig; +import org.apache.seatunnel.engine.common.runtime.ExecutionMode; +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.engine.server.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +/** Test for Rest API with Basic. */ +@DisabledOnOs(OS.MAC) +public class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { + + private static final int HTTP_PORT = 18080; + private static final Long JOB_1 = System.currentTimeMillis() + 1L; + public static final String USER = "admin"; + public static final String PASS = "admin"; + + @BeforeAll + void setUp() { + String name = this.getClass().getName(); + Config hazelcastConfig = Config.loadFromString(getHazelcastConfig()); + hazelcastConfig.setClusterName(TestUtils.getClusterName("RestApiServletHttpBasicTest_" + name)); + SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig(); + seaTunnelConfig.setHazelcastConfig(hazelcastConfig); + seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL); + + HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig(); + httpConfig.setEnabled(true); + httpConfig.setPort(HTTP_PORT); + + instance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + nodeEngine = instance.node.nodeEngine; + server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME); + LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class); + } + + @Test + public void testRestApiHttp() throws Exception { + HttpURLConnection conn = + (HttpURLConnection) + new java.net.URL("http://localhost:" + HTTP_PORT + "/overview") + .openConnection(); + setBasicAuth(conn); + try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + + Assertions.assertEquals(200, conn.getResponseCode()); + String response = in.lines().collect(Collectors.joining()); + Assertions.assertTrue(response.contains("projectVersion")); + } finally { + conn.disconnect(); + } + } + + @Override + public SeaTunnelConfig loadSeaTunnelConfig() { + SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig(); + + HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig(); + httpConfig.setEnabled(Boolean.TRUE); + httpConfig.setEnableBasicAuth(Boolean.TRUE); + httpConfig.setBasicAuthUsername("admin"); + httpConfig.setBasicAuthPassword("admin"); + httpConfig.setPort(HTTP_PORT); + + CheckpointConfig checkpointConfig = seaTunnelConfig.getEngineConfig().getCheckpointConfig(); + + checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE); + seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig); + return seaTunnelConfig; + } + + @Test + void testWriteJsonWithObject() throws IOException { + startJob(); + testLogRestApiResponse("JSON"); + } + + public void setBasicAuth(HttpURLConnection connection){ + // Basic Auth + String token = java.util.Base64.getEncoder() + .encodeToString((USER + ":" + PASS).getBytes(java.nio.charset.StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + token); + } + + public void testLogRestApiResponse(String format) throws IOException { + HttpURLConnection conn = null; + try { + java.net.URL url = + new java.net.URL("http://localhost:" + HTTP_PORT + "/logs?format=" + format); + conn = (HttpURLConnection) url.openConnection(); + setBasicAuth(conn); + + try (BufferedReader in = + new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + // [ { + // "node" : "localhost:18080", + // "logLink" : "http://localhost:18080/logs/job-1760939539658.log", + // "logName" : "job-1760939539658.log" + //}, { + // "node" : "localhost:18080", + // "logLink" : "http://localhost:18080/logs/job-${ctx:ST-JID}.log", + // "logName" : "job-${ctx:ST-JID}.log" + //} ] + String response = in.lines().collect(Collectors.joining()); + Assertions.assertNotNull(response); + } + + Assertions.assertEquals(200, conn.getResponseCode()); + Assertions.assertTrue( + conn.getHeaderFields() + .get("Content-Type") + .toString() + .contains("charset=utf-8")); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + private void startJob() { + LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan("fake_to_console.conf", RestApiHttpBasicTest.JOB_1.toString(), + RestApiHttpBasicTest.JOB_1); + + JobImmutableInformation jobImmutableInformation = + new JobImmutableInformation( + RestApiHttpBasicTest.JOB_1, + "Test", + nodeEngine.getSerializationService(), + testLogicalDag, + Collections.emptyList(), + Collections.emptyList()); + + Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); + + PassiveCompletableFuture voidPassiveCompletableFuture = + server.getCoordinatorService() + .submitJob(RestApiHttpBasicTest.JOB_1, data, jobImmutableInformation.isStartWithSavePoint()); + voidPassiveCompletableFuture.join(); + } +} From 50a94bed55d39a38e7b61893a9f7845615096c55 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Mon, 20 Oct 2025 14:29:35 +0800 Subject: [PATCH 02/15] Trigger workflow From d3175f1f00e3ecd83ef99b3a09e11c5550508fb6 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Mon, 20 Oct 2025 16:04:45 +0800 Subject: [PATCH 03/15] Trigger workflow From 4160872313c0a99c2eb7f205d789dddc676926f1 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Mon, 20 Oct 2025 16:55:26 +0800 Subject: [PATCH 04/15] # 9755 Modify code style --- .../server/rest/service/BaseLogService.java | 14 ++++-- .../server/rest/service/LogService.java | 13 +++-- .../server/rest/RestApiHttpBasicTest.java | 48 ++++++++++++------- 3 files changed, 47 insertions(+), 28 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index 989412cc9bc..833a789f7ee 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -74,8 +74,8 @@ protected String sendGet(String urlString) { * Send GET request with Basic Auth * * @param urlString url - * @param user name - * @param pass password + * @param user name + * @param pass password * @return log */ protected String sendGet(String urlString, String user, String pass) { @@ -88,8 +88,11 @@ protected String sendGet(String urlString, String user, String pass) { // Basic Auth if (user != null && pass != null) { - String token = java.util.Base64.getEncoder() - .encodeToString((user + ":" + pass).getBytes(java.nio.charset.StandardCharsets.UTF_8)); + String token = + java.util.Base64.getEncoder() + .encodeToString( + (user + ":" + pass) + .getBytes(java.nio.charset.StandardCharsets.UTF_8)); connection.setRequestProperty("Authorization", "Basic " + token); } @@ -114,7 +117,8 @@ private String readAll(InputStream is) throws IOException { if (is == null) { return null; } - try (InputStream in = is; ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (InputStream in = is; + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { byte[] buf = new byte[4096]; int len; while ((len = in.read(buf)) != -1) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java index 08caa969d47..016fb2ace42 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java @@ -75,12 +75,14 @@ public List> allLogNameList(String jobId) { String url = "http://" + host + ":" + port + contextPath; String logUrl = url + REST_URL_GET_ALL_LOG_NAME; - String allName = httpConfig.isEnableBasicAuth() - ? sendGet(logUrl, result.basicUser, result.basicPass) - : sendGet(logUrl); + String allName = + httpConfig.isEnableBasicAuth() + ? sendGet(logUrl, result.basicUser, result.basicPass) + : sendGet(logUrl); if (allName == null || allName.trim().isEmpty()) { - log.warn("GET {} returned empty body (null/empty). Skip this node.", logUrl); + log.warn( + "GET {} returned empty body (null/empty). Skip this node.", logUrl); return; } @@ -115,7 +117,8 @@ private static HttpBasic getHttpBasicAuth(HttpConfig httpConfig) { basicPass = httpConfig.getBasicAuthPassword(); } } catch (Throwable ignore) { - // Compatible with older versions: If HttpConfig does not have these methods, use system properties or environment variables to find out + // Compatible with older versions: If HttpConfig does not have these methods, use system + // properties or environment variables to find out basicUser = System.getProperty("seatunnel.http.user", System.getenv("BASIC_AUTH_USER")); basicPass = System.getProperty("seatunnel.http.pass", System.getenv("BASIC_AUTH_PASS")); log.warn("Use system property or environment variable to set basic auth."); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index 8a476645c55..6999a80d897 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -17,15 +17,6 @@ package org.apache.seatunnel.engine.server.rest; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.util.Collections; -import java.util.stream.Collectors; - -import com.hazelcast.config.Config; -import com.hazelcast.internal.serialization.Data; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; import org.apache.seatunnel.engine.common.config.server.HttpConfig; @@ -37,12 +28,23 @@ import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; import org.apache.seatunnel.engine.server.TestUtils; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import com.hazelcast.config.Config; +import com.hazelcast.internal.serialization.Data; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.stream.Collectors; + /** Test for Rest API with Basic. */ @DisabledOnOs(OS.MAC) public class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { @@ -56,7 +58,8 @@ public class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { void setUp() { String name = this.getClass().getName(); Config hazelcastConfig = Config.loadFromString(getHazelcastConfig()); - hazelcastConfig.setClusterName(TestUtils.getClusterName("RestApiServletHttpBasicTest_" + name)); + hazelcastConfig.setClusterName( + TestUtils.getClusterName("RestApiServletHttpBasicTest_" + name)); SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig(); seaTunnelConfig.setHazelcastConfig(hazelcastConfig); seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL); @@ -112,10 +115,13 @@ void testWriteJsonWithObject() throws IOException { testLogRestApiResponse("JSON"); } - public void setBasicAuth(HttpURLConnection connection){ + public void setBasicAuth(HttpURLConnection connection) { // Basic Auth - String token = java.util.Base64.getEncoder() - .encodeToString((USER + ":" + PASS).getBytes(java.nio.charset.StandardCharsets.UTF_8)); + String token = + java.util.Base64.getEncoder() + .encodeToString( + (USER + ":" + PASS) + .getBytes(java.nio.charset.StandardCharsets.UTF_8)); connection.setRequestProperty("Authorization", "Basic " + token); } @@ -133,11 +139,11 @@ public void testLogRestApiResponse(String format) throws IOException { // "node" : "localhost:18080", // "logLink" : "http://localhost:18080/logs/job-1760939539658.log", // "logName" : "job-1760939539658.log" - //}, { + // }, { // "node" : "localhost:18080", // "logLink" : "http://localhost:18080/logs/job-${ctx:ST-JID}.log", // "logName" : "job-${ctx:ST-JID}.log" - //} ] + // } ] String response = in.lines().collect(Collectors.joining()); Assertions.assertNotNull(response); } @@ -156,8 +162,11 @@ public void testLogRestApiResponse(String format) throws IOException { } private void startJob() { - LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan("fake_to_console.conf", RestApiHttpBasicTest.JOB_1.toString(), - RestApiHttpBasicTest.JOB_1); + LogicalDag testLogicalDag = + TestUtils.createTestLogicalPlan( + "fake_to_console.conf", + RestApiHttpBasicTest.JOB_1.toString(), + RestApiHttpBasicTest.JOB_1); JobImmutableInformation jobImmutableInformation = new JobImmutableInformation( @@ -172,7 +181,10 @@ private void startJob() { PassiveCompletableFuture voidPassiveCompletableFuture = server.getCoordinatorService() - .submitJob(RestApiHttpBasicTest.JOB_1, data, jobImmutableInformation.isStartWithSavePoint()); + .submitJob( + RestApiHttpBasicTest.JOB_1, + data, + jobImmutableInformation.isStartWithSavePoint()); voidPassiveCompletableFuture.join(); } } From e84a1e31a6eac64ae8004da7a030a32fff78e11b Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Tue, 28 Oct 2025 12:03:17 +0800 Subject: [PATCH 05/15] # 9755 Modify DisabledOnOs OS.WINDOWS --- .../seatunnel/engine/server/rest/RestApiHttpBasicTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index 6999a80d897..5f575140089 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -46,7 +46,7 @@ import java.util.stream.Collectors; /** Test for Rest API with Basic. */ -@DisabledOnOs(OS.MAC) +@DisabledOnOs(OS.WINDOWS) public class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { private static final int HTTP_PORT = 18080; From 01e3f2166b33508493e1b5a27572ac5d9e083ae1 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Tue, 28 Oct 2025 17:22:02 +0800 Subject: [PATCH 06/15] # 9755 Closure processing --- .../server/rest/RestApiHttpBasicTest.java | 69 ++++++++----------- 1 file changed, 28 insertions(+), 41 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index 5f575140089..c9950ba515d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.engine.server.rest; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; -import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; import org.apache.seatunnel.engine.common.config.server.HttpConfig; import org.apache.seatunnel.engine.common.runtime.ExecutionMode; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; @@ -32,8 +31,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledOnOs; -import org.junit.jupiter.api.condition.OS; import com.hazelcast.config.Config; import com.hazelcast.internal.serialization.Data; @@ -46,10 +43,9 @@ import java.util.stream.Collectors; /** Test for Rest API with Basic. */ -@DisabledOnOs(OS.WINDOWS) -public class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { +class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { - private static final int HTTP_PORT = 18080; + private static final int HTTP_PORT = 18081; private static final Long JOB_1 = System.currentTimeMillis() + 1L; public static final String USER = "admin"; public static final String PASS = "admin"; @@ -65,9 +61,13 @@ void setUp() { seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL); HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig(); - httpConfig.setEnabled(true); + httpConfig.setEnabled(Boolean.TRUE); httpConfig.setPort(HTTP_PORT); + httpConfig.setEnableBasicAuth(Boolean.TRUE); + httpConfig.setBasicAuthUsername(USER); + httpConfig.setBasicAuthPassword(PASS); + instance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); nodeEngine = instance.node.nodeEngine; server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME); @@ -75,40 +75,26 @@ void setUp() { } @Test - public void testRestApiHttp() throws Exception { - HttpURLConnection conn = - (HttpURLConnection) - new java.net.URL("http://localhost:" + HTTP_PORT + "/overview") - .openConnection(); - setBasicAuth(conn); - try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + public void testRestApiOverview() throws Exception { + HttpURLConnection conn = null; + try { + java.net.URL url = new java.net.URL("http://localhost:" + HTTP_PORT + "/overview"); + conn = (HttpURLConnection) url.openConnection(); + setBasicAuth(conn); Assertions.assertEquals(200, conn.getResponseCode()); - String response = in.lines().collect(Collectors.joining()); - Assertions.assertTrue(response.contains("projectVersion")); + Assertions.assertTrue( + conn.getHeaderFields() + .get("Content-Type") + .toString() + .contains("charset=utf-8")); } finally { - conn.disconnect(); + if (conn != null) { + conn.disconnect(); + } } } - @Override - public SeaTunnelConfig loadSeaTunnelConfig() { - SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig(); - - HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig(); - httpConfig.setEnabled(Boolean.TRUE); - httpConfig.setEnableBasicAuth(Boolean.TRUE); - httpConfig.setBasicAuthUsername("admin"); - httpConfig.setBasicAuthPassword("admin"); - httpConfig.setPort(HTTP_PORT); - - CheckpointConfig checkpointConfig = seaTunnelConfig.getEngineConfig().getCheckpointConfig(); - - checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE); - seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig); - return seaTunnelConfig; - } - @Test void testWriteJsonWithObject() throws IOException { startJob(); @@ -133,6 +119,13 @@ public void testLogRestApiResponse(String format) throws IOException { conn = (HttpURLConnection) url.openConnection(); setBasicAuth(conn); + Assertions.assertEquals(200, conn.getResponseCode()); + Assertions.assertTrue( + conn.getHeaderFields() + .get("Content-Type") + .toString() + .contains("charset=utf-8")); + try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { // [ { @@ -148,12 +141,6 @@ public void testLogRestApiResponse(String format) throws IOException { Assertions.assertNotNull(response); } - Assertions.assertEquals(200, conn.getResponseCode()); - Assertions.assertTrue( - conn.getHeaderFields() - .get("Content-Type") - .toString() - .contains("charset=utf-8")); } finally { if (conn != null) { conn.disconnect(); From 704d697c71d54abf01cda957c39b07cc4ddcb686 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 30 Oct 2025 12:49:45 +0800 Subject: [PATCH 07/15] Trigger workflow From d62105c3eb2a4a24f9b7677c4209ceddf7b427b3 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 30 Oct 2025 23:36:53 +0800 Subject: [PATCH 08/15] # 9755 ConfigProvider.locateAndGetSeaTunnelConfig() single-case , Use case is reset later --- .../server/rest/RestApiHttpBasicTest.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index c9950ba515d..b6172614ecb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; import org.apache.seatunnel.engine.server.TestUtils; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -74,6 +75,19 @@ void setUp() { LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class); } + @AfterAll + public void after() { + // Disable basic auth + // Because of the ConfigProvider.locateAndGetSeaTunnelConfig() single-case, + // if you change, other use cases will also change + // managed via org.apache.seatunnel.engine.common.config.YamlSeaTunnelDomConfigProcessor + SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig(); + HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig(); + httpConfig.setEnableBasicAuth(Boolean.FALSE); + httpConfig.setBasicAuthUsername(""); + httpConfig.setBasicAuthPassword(""); + } + @Test public void testRestApiOverview() throws Exception { HttpURLConnection conn = null; @@ -96,7 +110,24 @@ public void testRestApiOverview() throws Exception { } @Test - void testWriteJsonWithObject() throws IOException { + void testLogRestApiResponseFailure() throws IOException { + startJob(); + HttpURLConnection conn = null; + try { + java.net.URL url = + new java.net.URL("http://localhost:" + HTTP_PORT + "/logs?format=JSON"); + conn = (HttpURLConnection) url.openConnection(); + + Assertions.assertEquals(401, conn.getResponseCode()); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + @Test + void testLogRestApiResponseSuccess() throws IOException { startJob(); testLogRestApiResponse("JSON"); } From 1fa40631ffb70f60afe28481aef98ea5ca5def5a Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 6 Nov 2025 11:31:20 +0800 Subject: [PATCH 09/15] Code optimization # 9755 --- .../engine/server/rest/HttpBasic.java | 26 ------------- .../server/rest/service/BaseLogService.java | 3 +- .../server/rest/service/LogService.java | 39 ++++--------------- .../server/rest/RestApiHttpBasicTest.java | 33 +++++++++------- 4 files changed, 30 insertions(+), 71 deletions(-) delete mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java deleted file mode 100644 index e18251478e2..00000000000 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/HttpBasic.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.engine.server.rest; - -import lombok.Data; - -@Data -public class HttpBasic { - public String basicUser; - public String basicPass; -} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index 833a789f7ee..bac443cec9b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -29,6 +29,7 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; @Slf4j public class BaseLogService extends BaseService { @@ -124,7 +125,7 @@ private String readAll(InputStream is) throws IOException { while ((len = in.read(buf)) != -1) { baos.write(buf, 0, len); } - return baos.toString(java.nio.charset.StandardCharsets.UTF_8.name()); + return baos.toString(StandardCharsets.UTF_8.name()); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java index 7c14283ecce..73a5b9648f9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java @@ -62,9 +62,6 @@ public List> allLogNameList(String jobId) { String contextPath = httpConfig.getContextPath(); int port = httpConfig.getPort(); - // Take BasicAuth from configuration (if enabled) - HttpBasic result = getHttpBasicAuth(httpConfig); - List> allLogNameList = new ArrayList<>(); JsonArray systemMonitoringInformationJsonValues = @@ -77,10 +74,13 @@ public List> allLogNameList(String jobId) { String allName = httpConfig.isEnableBasicAuth() - ? sendGet(logUrl, result.basicUser, result.basicPass) + ? sendGet( + logUrl, + httpConfig.getBasicAuthUsername(), + httpConfig.getBasicAuthPassword()) : sendGet(logUrl); - if (allName == null || allName.trim().isEmpty()) { + if (StringUtils.isBlank(allName)) { log.warn( "GET {} returned empty body (null/empty). Skip this node.", logUrl); return; @@ -108,38 +108,15 @@ public List> allLogNameList(String jobId) { return allLogNameList; } - private static HttpBasic getHttpBasicAuth(HttpConfig httpConfig) { - String basicUser = ""; - String basicPass = ""; + private void getHttpBasicAuth(HttpConfig httpConfig) { try { if (httpConfig.isEnableBasicAuth()) { - basicUser = httpConfig.getBasicAuthUsername(); - basicPass = httpConfig.getBasicAuthPassword(); + httpConfig.setBasicAuthUsername(httpConfig.getBasicAuthUsername()); + httpConfig.setBasicAuthPassword(httpConfig.getBasicAuthPassword()); } } catch (Throwable ignore) { - // Compatible with older versions: If HttpConfig does not have these methods, use system - // properties or environment variables to find out - basicUser = System.getProperty("seatunnel.http.user", System.getenv("BASIC_AUTH_USER")); - basicPass = System.getProperty("seatunnel.http.pass", System.getenv("BASIC_AUTH_PASS")); log.warn("Use system property or environment variable to set basic auth."); } - - if (StringUtils.isNotBlank(basicUser) && StringUtils.isNotBlank(basicPass)) { - httpConfig.setBasicAuthUsername(basicUser); - httpConfig.setBasicAuthPassword(basicPass); - } - return new HttpBasic(basicUser, basicPass); - } - - private static class HttpBasic { - - public final String basicUser; - public final String basicPass; - - public HttpBasic(String basicUser, String basicPass) { - this.basicUser = basicUser; - this.basicPass = basicPass; - } } public JsonArray allNodeLogFormatJson(String jobId) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index b6172614ecb..eb3cf8e08f4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -28,6 +28,8 @@ import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; import org.apache.seatunnel.engine.server.TestUtils; +import org.apache.http.HttpHeaders; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -40,16 +42,25 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Base64.Encoder; import java.util.Collections; import java.util.stream.Collectors; +import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS; +import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW; + /** Test for Rest API with Basic. */ class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { private static final int HTTP_PORT = 18081; private static final Long JOB_1 = System.currentTimeMillis() + 1L; - public static final String USER = "admin"; - public static final String PASS = "admin"; + private static final String USER = "admin"; + private static final String PASS = "admin"; + private static final String DOMAIN = "http://localhost:" + HTTP_PORT; + private static final String BASIC_PREFIX = "Basic "; @BeforeAll void setUp() { @@ -92,7 +103,7 @@ public void after() { public void testRestApiOverview() throws Exception { HttpURLConnection conn = null; try { - java.net.URL url = new java.net.URL("http://localhost:" + HTTP_PORT + "/overview"); + URL url = new URL(DOMAIN + REST_URL_OVERVIEW); conn = (HttpURLConnection) url.openConnection(); setBasicAuth(conn); @@ -114,8 +125,7 @@ void testLogRestApiResponseFailure() throws IOException { startJob(); HttpURLConnection conn = null; try { - java.net.URL url = - new java.net.URL("http://localhost:" + HTTP_PORT + "/logs?format=JSON"); + URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=JSON"); conn = (HttpURLConnection) url.openConnection(); Assertions.assertEquals(401, conn.getResponseCode()); @@ -134,19 +144,16 @@ void testLogRestApiResponseSuccess() throws IOException { public void setBasicAuth(HttpURLConnection connection) { // Basic Auth - String token = - java.util.Base64.getEncoder() - .encodeToString( - (USER + ":" + PASS) - .getBytes(java.nio.charset.StandardCharsets.UTF_8)); - connection.setRequestProperty("Authorization", "Basic " + token); + Encoder encoder = Base64.getEncoder(); + String auth = USER + ":" + PASS; + String token = encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty(HttpHeaders.AUTHORIZATION, BASIC_PREFIX + token); } public void testLogRestApiResponse(String format) throws IOException { HttpURLConnection conn = null; try { - java.net.URL url = - new java.net.URL("http://localhost:" + HTTP_PORT + "/logs?format=" + format); + URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=" + format); conn = (HttpURLConnection) url.openConnection(); setBasicAuth(conn); From 013e0ac72faa6bcd9311c7e39df1ff2a7a1f8895 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 6 Nov 2025 14:53:08 +0800 Subject: [PATCH 10/15] Code optimization # 9755 --- .../engine/server/rest/service/LogService.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java index 73a5b9648f9..013b08fdec2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java @@ -108,17 +108,6 @@ public List> allLogNameList(String jobId) { return allLogNameList; } - private void getHttpBasicAuth(HttpConfig httpConfig) { - try { - if (httpConfig.isEnableBasicAuth()) { - httpConfig.setBasicAuthUsername(httpConfig.getBasicAuthUsername()); - httpConfig.setBasicAuthPassword(httpConfig.getBasicAuthPassword()); - } - } catch (Throwable ignore) { - log.warn("Use system property or environment variable to set basic auth."); - } - } - public JsonArray allNodeLogFormatJson(String jobId) { return allLogNameList(jobId).stream() From 63ea9887662d6d6ea317833e6805904494402c9a Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 6 Nov 2025 15:42:41 +0800 Subject: [PATCH 11/15] Code optimization # 9755 --- .../server/rest/service/BaseLogService.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index bac443cec9b..161adfb9e76 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.common.utils.LogUtil; +import org.apache.http.HttpHeaders; + import com.hazelcast.internal.util.StringUtil; import com.hazelcast.spi.impl.NodeEngineImpl; import lombok.extern.slf4j.Slf4j; @@ -30,6 +32,8 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Base64.Encoder; @Slf4j public class BaseLogService extends BaseService { @@ -37,6 +41,8 @@ public BaseLogService(NodeEngineImpl nodeEngine) { super(nodeEngine); } + private static final String BASIC = "Basic "; + /** Get configuration log path */ public String getLogPath() { try { @@ -89,12 +95,12 @@ protected String sendGet(String urlString, String user, String pass) { // Basic Auth if (user != null && pass != null) { - String token = - java.util.Base64.getEncoder() - .encodeToString( - (user + ":" + pass) - .getBytes(java.nio.charset.StandardCharsets.UTF_8)); - connection.setRequestProperty("Authorization", "Basic " + token); + + Encoder encoder = Base64.getEncoder(); + String auth = user + ":" + pass; + String token = encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8)); + + connection.setRequestProperty(HttpHeaders.AUTHORIZATION, BASIC + token); } connection.connect(); From 84e26e901978a84b7e4323603bc367f841331c04 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 6 Nov 2025 15:47:18 +0800 Subject: [PATCH 12/15] Code optimization # 9755 --- .../engine/server/rest/service/BaseLogService.java | 7 +++---- .../seatunnel/engine/server/rest/RestApiHttpBasicTest.java | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index 161adfb9e76..0a2202cce0f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -20,8 +20,6 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.common.utils.LogUtil; -import org.apache.http.HttpHeaders; - import com.hazelcast.internal.util.StringUtil; import com.hazelcast.spi.impl.NodeEngineImpl; import lombok.extern.slf4j.Slf4j; @@ -41,7 +39,8 @@ public BaseLogService(NodeEngineImpl nodeEngine) { super(nodeEngine); } - private static final String BASIC = "Basic "; + private static final String AUTHORIZATION_HEADER = "Authorization"; + private static final String BASIC_PREFIX = "Basic "; /** Get configuration log path */ public String getLogPath() { @@ -100,7 +99,7 @@ protected String sendGet(String urlString, String user, String pass) { String auth = user + ":" + pass; String token = encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8)); - connection.setRequestProperty(HttpHeaders.AUTHORIZATION, BASIC + token); + connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + token); } connection.connect(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index eb3cf8e08f4..685b07d9120 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -28,8 +28,6 @@ import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; import org.apache.seatunnel.engine.server.TestUtils; -import org.apache.http.HttpHeaders; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -60,6 +58,8 @@ class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { private static final String USER = "admin"; private static final String PASS = "admin"; private static final String DOMAIN = "http://localhost:" + HTTP_PORT; + + private static final String AUTHORIZATION_HEADER = "Authorization"; private static final String BASIC_PREFIX = "Basic "; @BeforeAll @@ -147,7 +147,7 @@ public void setBasicAuth(HttpURLConnection connection) { Encoder encoder = Base64.getEncoder(); String auth = USER + ":" + PASS; String token = encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8)); - connection.setRequestProperty(HttpHeaders.AUTHORIZATION, BASIC_PREFIX + token); + connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + token); } public void testLogRestApiResponse(String format) throws IOException { From 22cb2be9dad3f483abb4c990f72e079784b8e09d Mon Sep 17 00:00:00 2001 From: dotfive-star Date: Thu, 6 Nov 2025 17:20:03 +0800 Subject: [PATCH 13/15] Update seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java Co-authored-by: dy102 <132787602+dybyte@users.noreply.github.com> --- .../seatunnel/engine/server/rest/RestApiHttpBasicTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index 685b07d9120..716a541b9d2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -176,7 +176,7 @@ public void testLogRestApiResponse(String format) throws IOException { // "logName" : "job-${ctx:ST-JID}.log" // } ] String response = in.lines().collect(Collectors.joining()); - Assertions.assertNotNull(response); + Assertions.assertFalse(StringUtils.isBlank(response)); } } finally { From 9e6c2ec4fc9ed48d73392f380e59f20f7cc0293d Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 6 Nov 2025 18:08:02 +0800 Subject: [PATCH 14/15] import StringUtils # 9755 --- .../seatunnel/engine/server/rest/RestApiHttpBasicTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java index 716a541b9d2..0117325913b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.server.rest; +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; + import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.server.HttpConfig; import org.apache.seatunnel.engine.common.runtime.ExecutionMode; From a01750a3023aa9bb3a9eb810e309fd09f7142a92 Mon Sep 17 00:00:00 2001 From: dayan1852 Date: Thu, 6 Nov 2025 22:31:23 +0800 Subject: [PATCH 15/15] Optimize sendGet method # 9755 --- .../server/rest/service/BaseLogService.java | 79 +++++++++---------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index 0a2202cce0f..ce959652509 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -31,10 +31,10 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Base64; -import java.util.Base64.Encoder; @Slf4j public class BaseLogService extends BaseService { + public BaseLogService(NodeEngineImpl nodeEngine) { super(nodeEngine); } @@ -52,37 +52,23 @@ public String getLogPath() { } } + /** + * Send a simple HTTP GET request. + * + * @param urlString url + * @return the response body as a string, or {@code null} if the request failed + */ protected String sendGet(String urlString) { - HttpURLConnection connection = null; - try { - connection = (HttpURLConnection) new URL(urlString).openConnection(); - connection.setRequestMethod("GET"); - connection.setConnectTimeout(5000); - connection.setReadTimeout(5000); - connection.connect(); - - int code = connection.getResponseCode(); - if (connection.getResponseCode() == 200) { - return readAll(connection.getInputStream()); - } - log.warn("GET {} -> HTTP {}", urlString, code); - } catch (IOException e) { - log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e)); - } finally { - if (connection != null) { - connection.disconnect(); - } - } - return null; + return sendGet(urlString, null, null); } /** - * Send GET request with Basic Auth + * Send GET request (optionally with Basic Auth) * * @param urlString url - * @param user name - * @param pass password - * @return log + * @param user username, nullable + * @param pass password, nullable + * @return the response body as a string, or {@code null} if the request failed */ protected String sendGet(String urlString, String user, String pass) { HttpURLConnection connection = null; @@ -94,21 +80,21 @@ protected String sendGet(String urlString, String user, String pass) { // Basic Auth if (user != null && pass != null) { - - Encoder encoder = Base64.getEncoder(); String auth = user + ":" + pass; - String token = encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8)); - + String token = + Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8)); connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + token); } connection.connect(); int code = connection.getResponseCode(); - if (connection.getResponseCode() == 200) { - return readAll(connection.getInputStream()); + if (code == HttpURLConnection.HTTP_OK) { + return readResponseBody(connection.getInputStream()); + } else { + log.warn("GET {} -> HTTP {}", urlString, code); + drainErrorStream(connection); } - log.warn("GET {} -> HTTP {}", urlString, code); } catch (IOException e) { log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e)); } finally { @@ -119,18 +105,27 @@ protected String sendGet(String urlString, String user, String pass) { return null; } - private String readAll(InputStream is) throws IOException { - if (is == null) { - return null; - } - try (InputStream in = is; - ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + private String readResponseBody(InputStream is) throws IOException { + try (InputStream input = is; + ByteArrayOutputStream output = new ByteArrayOutputStream()) { + byte[] buf = new byte[4096]; int len; - while ((len = in.read(buf)) != -1) { - baos.write(buf, 0, len); + while ((len = input.read(buf)) != -1) { + output.write(buf, 0, len); + } + return output.toString(StandardCharsets.UTF_8.name()); + } + } + + private void drainErrorStream(HttpURLConnection connection) throws IOException { + try (InputStream err = connection.getErrorStream()) { + if (err != null) { + byte[] buffer = new byte[1024]; + while (err.read(buffer) != -1) { + // discard + } } - return baos.toString(StandardCharsets.UTF_8.name()); } }