Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.rest;

import lombok.Data;

@Data
public class HttpBasic {
public String basicUser;
public String basicPass;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,87 @@ public String getLogPath() {
}

protected String sendGet(String urlString) {
HttpURLConnection connection = null;
try {
HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection();
connection = (HttpURLConnection) new URL(urlString).openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);
connection.connect();

int code = connection.getResponseCode();
if (connection.getResponseCode() == 200) {
try (InputStream is = connection.getInputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}
return baos.toString();
}
return readAll(connection.getInputStream());
}
log.warn("GET {} -> HTTP {}", urlString, code);
} catch (IOException e) {
log.error("Send get Fail.{}", ExceptionUtils.getMessage(e));
log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e));
} finally {
if (connection != null) {
connection.disconnect();
}
}
return null;
}

/**
* Send GET request with Basic Auth
*
* @param urlString url
* @param user name
* @param pass password
* @return log
*/
protected String sendGet(String urlString, String user, String pass) {
HttpURLConnection connection = null;
try {
connection = (HttpURLConnection) new URL(urlString).openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);

// Basic Auth
if (user != null && pass != null) {
String token =
java.util.Base64.getEncoder()
.encodeToString(
(user + ":" + pass)
.getBytes(java.nio.charset.StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + token);
}

connection.connect();

int code = connection.getResponseCode();
if (connection.getResponseCode() == 200) {
return readAll(connection.getInputStream());
}
log.warn("GET {} -> HTTP {}", urlString, code);
} catch (IOException e) {
log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e));
} finally {
if (connection != null) {
connection.disconnect();
}
}
return null;
}

private String readAll(InputStream is) throws IOException {
if (is == null) {
return null;
}
try (InputStream in = is;
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
byte[] buf = new byte[4096];
int len;
while ((len = in.read(buf)) != -1) {
baos.write(buf, 0, len);
}
return baos.toString(java.nio.charset.StandardCharsets.UTF_8.name());
}
}

public String getLogParam(String uri, String contextPath) {
uri = uri.substring(uri.indexOf(contextPath) + contextPath.length());
uri = StringUtil.stripTrailingSlash(uri).substring(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public List<Tuple3<String, String, String>> allLogNameList(String jobId) {
String contextPath = httpConfig.getContextPath();
int port = httpConfig.getPort();

// Take BasicAuth from configuration (if enabled)
HttpBasic result = getHttpBasicAuth(httpConfig);

List<Tuple3<String, String, String>> allLogNameList = new ArrayList<>();

JsonArray systemMonitoringInformationJsonValues =
Expand All @@ -70,15 +73,22 @@ public List<Tuple3<String, String, String>> allLogNameList(String jobId) {
systemMonitoringInformation -> {
String host = systemMonitoringInformation.asObject().get("host").asString();
String url = "http://" + host + ":" + port + contextPath;
String allName = sendGet(url + REST_URL_GET_ALL_LOG_NAME);
if (StringUtils.isBlank(allName)) {
String logUrl = url + REST_URL_GET_ALL_LOG_NAME;

String allName =
httpConfig.isEnableBasicAuth()
? sendGet(logUrl, result.basicUser, result.basicPass)
: sendGet(logUrl);

if (allName == null || allName.trim().isEmpty()) {
log.warn(
"Get log file name failed: response logName is blank. url: {}, response: {}",
url + REST_URL_GET_ALL_LOG_NAME,
allName);
"GET {} returned empty body (null/empty). Skip this node.", logUrl);
return;
}
log.debug("Request: {} , Result: {}", url, allName);

if (log.isDebugEnabled()) {
log.debug("Request: {} , Result: {}", url, allName);
}
ArrayNode jsonNodes = JsonUtils.parseArray(allName);

jsonNodes.forEach(
Expand All @@ -98,6 +108,40 @@ public List<Tuple3<String, String, String>> allLogNameList(String jobId) {
return allLogNameList;
}

private static HttpBasic getHttpBasicAuth(HttpConfig httpConfig) {
String basicUser = "";
String basicPass = "";
try {
if (httpConfig.isEnableBasicAuth()) {
basicUser = httpConfig.getBasicAuthUsername();
basicPass = httpConfig.getBasicAuthPassword();
}
} catch (Throwable ignore) {
// Compatible with older versions: If HttpConfig does not have these methods, use system
// properties or environment variables to find out
basicUser = System.getProperty("seatunnel.http.user", System.getenv("BASIC_AUTH_USER"));
basicPass = System.getProperty("seatunnel.http.pass", System.getenv("BASIC_AUTH_PASS"));
log.warn("Use system property or environment variable to set basic auth.");
}

if (StringUtils.isNotBlank(basicUser) && StringUtils.isNotBlank(basicPass)) {
httpConfig.setBasicAuthUsername(basicUser);
httpConfig.setBasicAuthPassword(basicPass);
}
return new HttpBasic(basicUser, basicPass);
}

private static class HttpBasic {

public final String basicUser;
public final String basicPass;

public HttpBasic(String basicUser, String basicPass) {
this.basicUser = basicUser;
this.basicPass = basicPass;
}
}

public JsonArray allNodeLogFormatJson(String jobId) {

return allLogNameList(jobId).stream()
Expand Down
Loading