Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

@Slf4j
public class BaseLogService extends BaseService {

public BaseLogService(NodeEngineImpl nodeEngine) {
super(nodeEngine);
}

private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String BASIC_PREFIX = "Basic ";

/** Get configuration log path */
public String getLogPath() {
try {
Expand All @@ -46,31 +52,83 @@ public String getLogPath() {
}
}

/**
* Send a simple HTTP GET request.
*
* @param urlString url
* @return the response body as a string, or {@code null} if the request failed
*/
protected String sendGet(String urlString) {
return sendGet(urlString, null, null);
}

/**
* Send GET request (optionally with Basic Auth)
*
* @param urlString url
* @param user username, nullable
* @param pass password, nullable
* @return the response body as a string, or {@code null} if the request failed
*/
protected String sendGet(String urlString, String user, String pass) {
HttpURLConnection connection = null;
try {
HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection();
connection = (HttpURLConnection) new URL(urlString).openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);

// Basic Auth
if (user != null && pass != null) {
String auth = user + ":" + pass;
String token =
Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + token);
}

connection.connect();

if (connection.getResponseCode() == 200) {
try (InputStream is = connection.getInputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}
return baos.toString();
}
int code = connection.getResponseCode();
if (code == HttpURLConnection.HTTP_OK) {
return readResponseBody(connection.getInputStream());
} else {
log.warn("GET {} -> HTTP {}", urlString, code);
drainErrorStream(connection);
}
} catch (IOException e) {
log.error("Send get Fail.{}", ExceptionUtils.getMessage(e));
log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e));
} finally {
if (connection != null) {
connection.disconnect();
}
}
return null;
}

private String readResponseBody(InputStream is) throws IOException {
try (InputStream input = is;
ByteArrayOutputStream output = new ByteArrayOutputStream()) {

byte[] buf = new byte[4096];
int len;
while ((len = input.read(buf)) != -1) {
output.write(buf, 0, len);
}
return output.toString(StandardCharsets.UTF_8.name());
}
}

private void drainErrorStream(HttpURLConnection connection) throws IOException {
try (InputStream err = connection.getErrorStream()) {
if (err != null) {
byte[] buffer = new byte[1024];
while (err.read(buffer) != -1) {
// discard
}
}
}
}

public String getLogParam(String uri, String contextPath) {
uri = uri.substring(uri.indexOf(contextPath) + contextPath.length());
uri = StringUtil.stripTrailingSlash(uri).substring(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,25 @@ public List<Tuple3<String, String, String>> allLogNameList(String jobId) {
systemMonitoringInformation -> {
String host = systemMonitoringInformation.asObject().get("host").asString();
String url = "http://" + host + ":" + port + contextPath;
String allName = sendGet(url + REST_URL_GET_ALL_LOG_NAME);
String logUrl = url + REST_URL_GET_ALL_LOG_NAME;

String allName =
httpConfig.isEnableBasicAuth()
? sendGet(
logUrl,
httpConfig.getBasicAuthUsername(),
httpConfig.getBasicAuthPassword())
: sendGet(logUrl);

if (StringUtils.isBlank(allName)) {
log.warn(
"Get log file name failed: response logName is blank. url: {}, response: {}",
url + REST_URL_GET_ALL_LOG_NAME,
allName);
"GET {} returned empty body (null/empty). Skip this node.", logUrl);
return;
}
log.debug("Request: {} , Result: {}", url, allName);

if (log.isDebugEnabled()) {
log.debug("Request: {} , Result: {}", url, allName);
}
ArrayNode jsonNodes = JsonUtils.parseArray(allName);

jsonNodes.forEach(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.rest;

import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.TestUtils;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import com.hazelcast.config.Config;
import com.hazelcast.internal.serialization.Data;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.Collections;
import java.util.stream.Collectors;

import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW;

/** Test for Rest API with Basic. */
class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest {

private static final int HTTP_PORT = 18081;
private static final Long JOB_1 = System.currentTimeMillis() + 1L;
private static final String USER = "admin";
private static final String PASS = "admin";
private static final String DOMAIN = "http://localhost:" + HTTP_PORT;

private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String BASIC_PREFIX = "Basic ";

@BeforeAll
void setUp() {
String name = this.getClass().getName();
Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
hazelcastConfig.setClusterName(
TestUtils.getClusterName("RestApiServletHttpBasicTest_" + name));
SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);

HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig();
httpConfig.setEnabled(Boolean.TRUE);
httpConfig.setPort(HTTP_PORT);

httpConfig.setEnableBasicAuth(Boolean.TRUE);
httpConfig.setBasicAuthUsername(USER);
httpConfig.setBasicAuthPassword(PASS);

instance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
nodeEngine = instance.node.nodeEngine;
server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
}

@AfterAll
public void after() {
// Disable basic auth
// Because of the ConfigProvider.locateAndGetSeaTunnelConfig() single-case,
// if you change, other use cases will also change
// managed via org.apache.seatunnel.engine.common.config.YamlSeaTunnelDomConfigProcessor
SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig();
httpConfig.setEnableBasicAuth(Boolean.FALSE);
httpConfig.setBasicAuthUsername("");
httpConfig.setBasicAuthPassword("");
}

@Test
public void testRestApiOverview() throws Exception {
HttpURLConnection conn = null;
try {
URL url = new URL(DOMAIN + REST_URL_OVERVIEW);
conn = (HttpURLConnection) url.openConnection();
setBasicAuth(conn);

Assertions.assertEquals(200, conn.getResponseCode());
Assertions.assertTrue(
conn.getHeaderFields()
.get("Content-Type")
.toString()
.contains("charset=utf-8"));
} finally {
if (conn != null) {
conn.disconnect();
}
}
}

@Test
void testLogRestApiResponseFailure() throws IOException {
startJob();
HttpURLConnection conn = null;
try {
URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=JSON");
conn = (HttpURLConnection) url.openConnection();

Assertions.assertEquals(401, conn.getResponseCode());
} finally {
if (conn != null) {
conn.disconnect();
}
}
}

@Test
void testLogRestApiResponseSuccess() throws IOException {
startJob();
testLogRestApiResponse("JSON");
}

public void setBasicAuth(HttpURLConnection connection) {
// Basic Auth
Encoder encoder = Base64.getEncoder();
String auth = USER + ":" + PASS;
String token = encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + token);
}

public void testLogRestApiResponse(String format) throws IOException {
HttpURLConnection conn = null;
try {
URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=" + format);
conn = (HttpURLConnection) url.openConnection();
setBasicAuth(conn);

Assertions.assertEquals(200, conn.getResponseCode());
Assertions.assertTrue(
conn.getHeaderFields()
.get("Content-Type")
.toString()
.contains("charset=utf-8"));

try (BufferedReader in =
new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
// [ {
// "node" : "localhost:18080",
// "logLink" : "http://localhost:18080/logs/job-1760939539658.log",
// "logName" : "job-1760939539658.log"
// }, {
// "node" : "localhost:18080",
// "logLink" : "http://localhost:18080/logs/job-${ctx:ST-JID}.log",
// "logName" : "job-${ctx:ST-JID}.log"
// } ]
String response = in.lines().collect(Collectors.joining());
Assertions.assertFalse(StringUtils.isBlank(response));
}

} finally {
if (conn != null) {
conn.disconnect();
}
}
}

private void startJob() {
LogicalDag testLogicalDag =
TestUtils.createTestLogicalPlan(
"fake_to_console.conf",
RestApiHttpBasicTest.JOB_1.toString(),
RestApiHttpBasicTest.JOB_1);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
RestApiHttpBasicTest.JOB_1,
"Test",
nodeEngine.getSerializationService(),
testLogicalDag,
Collections.emptyList(),
Collections.emptyList());

Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService()
.submitJob(
RestApiHttpBasicTest.JOB_1,
data,
jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
}