diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index a1e84f9ec76..ce959652509 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -29,13 +29,19 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; @Slf4j public class BaseLogService extends BaseService { + public BaseLogService(NodeEngineImpl nodeEngine) { super(nodeEngine); } + private static final String AUTHORIZATION_HEADER = "Authorization"; + private static final String BASIC_PREFIX = "Basic "; + /** Get configuration log path */ public String getLogPath() { try { @@ -46,31 +52,83 @@ public String getLogPath() { } } + /** + * Send a simple HTTP GET request. + * + * @param urlString url + * @return the response body as a string, or {@code null} if the request failed + */ protected String sendGet(String urlString) { + return sendGet(urlString, null, null); + } + + /** + * Send GET request (optionally with Basic Auth) + * + * @param urlString url + * @param user username, nullable + * @param pass password, nullable + * @return the response body as a string, or {@code null} if the request failed + */ + protected String sendGet(String urlString, String user, String pass) { + HttpURLConnection connection = null; try { - HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection(); + connection = (HttpURLConnection) new URL(urlString).openConnection(); connection.setRequestMethod("GET"); connection.setConnectTimeout(5000); connection.setReadTimeout(5000); + + // Basic Auth + if (user != null && pass != null) { + String auth = user + ":" + pass; + String token = + Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + token); + } + connection.connect(); - if (connection.getResponseCode() == 200) { - try (InputStream is = connection.getInputStream(); - ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - byte[] buffer = new byte[1024]; - int len; - while ((len = is.read(buffer)) != -1) { - baos.write(buffer, 0, len); - } - return baos.toString(); - } + int code = connection.getResponseCode(); + if (code == HttpURLConnection.HTTP_OK) { + return readResponseBody(connection.getInputStream()); + } else { + log.warn("GET {} -> HTTP {}", urlString, code); + drainErrorStream(connection); } } catch (IOException e) { - log.error("Send get Fail.{}", ExceptionUtils.getMessage(e)); + log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e)); + } finally { + if (connection != null) { + connection.disconnect(); + } } return null; } + private String readResponseBody(InputStream is) throws IOException { + try (InputStream input = is; + ByteArrayOutputStream output = new ByteArrayOutputStream()) { + + byte[] buf = new byte[4096]; + int len; + while ((len = input.read(buf)) != -1) { + output.write(buf, 0, len); + } + return output.toString(StandardCharsets.UTF_8.name()); + } + } + + private void drainErrorStream(HttpURLConnection connection) throws IOException { + try (InputStream err = connection.getErrorStream()) { + if (err != null) { + byte[] buffer = new byte[1024]; + while (err.read(buffer) != -1) { + // discard + } + } + } + } + public String getLogParam(String uri, String contextPath) { uri = uri.substring(uri.indexOf(contextPath) + contextPath.length()); uri = StringUtil.stripTrailingSlash(uri).substring(1); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java index d112ae069bb..013b08fdec2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java @@ -70,15 +70,25 @@ public List> allLogNameList(String jobId) { systemMonitoringInformation -> { String host = systemMonitoringInformation.asObject().get("host").asString(); String url = "http://" + host + ":" + port + contextPath; - String allName = sendGet(url + REST_URL_GET_ALL_LOG_NAME); + String logUrl = url + REST_URL_GET_ALL_LOG_NAME; + + String allName = + httpConfig.isEnableBasicAuth() + ? sendGet( + logUrl, + httpConfig.getBasicAuthUsername(), + httpConfig.getBasicAuthPassword()) + : sendGet(logUrl); + if (StringUtils.isBlank(allName)) { log.warn( - "Get log file name failed: response logName is blank. url: {}, response: {}", - url + REST_URL_GET_ALL_LOG_NAME, - allName); + "GET {} returned empty body (null/empty). Skip this node.", logUrl); return; } - log.debug("Request: {} , Result: {}", url, allName); + + if (log.isDebugEnabled()) { + log.debug("Request: {} , Result: {}", url, allName); + } ArrayNode jsonNodes = JsonUtils.parseArray(allName); jsonNodes.forEach( diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java new file mode 100644 index 00000000000..0117325913b --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest; + +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; + +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.common.config.server.HttpConfig; +import org.apache.seatunnel.engine.common.runtime.ExecutionMode; +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.engine.server.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.hazelcast.config.Config; +import com.hazelcast.internal.serialization.Data; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Base64.Encoder; +import java.util.Collections; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS; +import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW; + +/** Test for Rest API with Basic. */ +class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest { + + private static final int HTTP_PORT = 18081; + private static final Long JOB_1 = System.currentTimeMillis() + 1L; + private static final String USER = "admin"; + private static final String PASS = "admin"; + private static final String DOMAIN = "http://localhost:" + HTTP_PORT; + + private static final String AUTHORIZATION_HEADER = "Authorization"; + private static final String BASIC_PREFIX = "Basic "; + + @BeforeAll + void setUp() { + String name = this.getClass().getName(); + Config hazelcastConfig = Config.loadFromString(getHazelcastConfig()); + hazelcastConfig.setClusterName( + TestUtils.getClusterName("RestApiServletHttpBasicTest_" + name)); + SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig(); + seaTunnelConfig.setHazelcastConfig(hazelcastConfig); + seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL); + + HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig(); + httpConfig.setEnabled(Boolean.TRUE); + httpConfig.setPort(HTTP_PORT); + + httpConfig.setEnableBasicAuth(Boolean.TRUE); + httpConfig.setBasicAuthUsername(USER); + httpConfig.setBasicAuthPassword(PASS); + + instance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + nodeEngine = instance.node.nodeEngine; + server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME); + LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class); + } + + @AfterAll + public void after() { + // Disable basic auth + // Because of the ConfigProvider.locateAndGetSeaTunnelConfig() single-case, + // if you change, other use cases will also change + // managed via org.apache.seatunnel.engine.common.config.YamlSeaTunnelDomConfigProcessor + SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig(); + HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig(); + httpConfig.setEnableBasicAuth(Boolean.FALSE); + httpConfig.setBasicAuthUsername(""); + httpConfig.setBasicAuthPassword(""); + } + + @Test + public void testRestApiOverview() throws Exception { + HttpURLConnection conn = null; + try { + URL url = new URL(DOMAIN + REST_URL_OVERVIEW); + conn = (HttpURLConnection) url.openConnection(); + setBasicAuth(conn); + + Assertions.assertEquals(200, conn.getResponseCode()); + Assertions.assertTrue( + conn.getHeaderFields() + .get("Content-Type") + .toString() + .contains("charset=utf-8")); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + @Test + void testLogRestApiResponseFailure() throws IOException { + startJob(); + HttpURLConnection conn = null; + try { + URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=JSON"); + conn = (HttpURLConnection) url.openConnection(); + + Assertions.assertEquals(401, conn.getResponseCode()); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + @Test + void testLogRestApiResponseSuccess() throws IOException { + startJob(); + testLogRestApiResponse("JSON"); + } + + public void setBasicAuth(HttpURLConnection connection) { + // Basic Auth + Encoder encoder = Base64.getEncoder(); + String auth = USER + ":" + PASS; + String token = encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + token); + } + + public void testLogRestApiResponse(String format) throws IOException { + HttpURLConnection conn = null; + try { + URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=" + format); + conn = (HttpURLConnection) url.openConnection(); + setBasicAuth(conn); + + Assertions.assertEquals(200, conn.getResponseCode()); + Assertions.assertTrue( + conn.getHeaderFields() + .get("Content-Type") + .toString() + .contains("charset=utf-8")); + + try (BufferedReader in = + new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + // [ { + // "node" : "localhost:18080", + // "logLink" : "http://localhost:18080/logs/job-1760939539658.log", + // "logName" : "job-1760939539658.log" + // }, { + // "node" : "localhost:18080", + // "logLink" : "http://localhost:18080/logs/job-${ctx:ST-JID}.log", + // "logName" : "job-${ctx:ST-JID}.log" + // } ] + String response = in.lines().collect(Collectors.joining()); + Assertions.assertFalse(StringUtils.isBlank(response)); + } + + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + private void startJob() { + LogicalDag testLogicalDag = + TestUtils.createTestLogicalPlan( + "fake_to_console.conf", + RestApiHttpBasicTest.JOB_1.toString(), + RestApiHttpBasicTest.JOB_1); + + JobImmutableInformation jobImmutableInformation = + new JobImmutableInformation( + RestApiHttpBasicTest.JOB_1, + "Test", + nodeEngine.getSerializationService(), + testLogicalDag, + Collections.emptyList(), + Collections.emptyList()); + + Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); + + PassiveCompletableFuture voidPassiveCompletableFuture = + server.getCoordinatorService() + .submitJob( + RestApiHttpBasicTest.JOB_1, + data, + jobImmutableInformation.isStartWithSavePoint()); + voidPassiveCompletableFuture.join(); + } +}