From 9d161d24ea2fed6ed7392deda6342fd068df3b6a Mon Sep 17 00:00:00 2001
From: lizhenglei <127465317+jackyyyyyssss@users.noreply.github.com>
Date: Mon, 12 Aug 2024 14:03:21 +0800
Subject: [PATCH] [Fix] Fix http e2e case (#7356)
* 1
* fix
* fix
* fix
* fix
* fix
---------
Co-authored-by: lizhenglei <673421862@qq.com>
---
.../connector-http-e2e/pom.xml | 7 ++
.../seatunnel/e2e/connector/http/HttpIT.java | 115 +++++++++++++++++-
2 files changed, 119 insertions(+), 3 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
index 84b73a5998b..69b776da5f0 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
@@ -98,6 +98,13 @@
${project.version}
test
+
+ org.mock-server
+ mockserver-netty-no-dependencies
+ 5.14.0
+ test
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index f53d8c1d458..af1d7125eae 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.e2e.connector.http;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -27,6 +31,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.model.Format;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -36,22 +42,35 @@
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;
+import com.google.common.collect.Lists;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+
import java.io.File;
import java.io.IOException;
+import java.math.BigDecimal;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.mockserver.model.HttpRequest.request;
+
public class HttpIT extends TestSuiteBase implements TestResource {
private static final String TMP_DIR = "/tmp";
- private static final String successCount = "Total Write Count : 2";
-
private static final String IMAGE = "mockserver/mockserver:5.14.0";
private GenericContainer> mockserverContainer;
+ private static final List records = new ArrayList<>();
+
+ private MockServerClient mockServerClient;
+
@BeforeAll
@Override
public void startUp() {
@@ -78,7 +97,48 @@ public void startUp() {
.withEnv("MOCKSERVER_LOG_LEVEL", "WARN")
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
.waitingFor(new HttpWaitStrategy().forPath("/").forStatusCode(404));
+ mockserverContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 1080, 1080)));
Startables.deepStart(Stream.of(mockserverContainer)).join();
+ mockServerClient = new MockServerClient("127.0.0.1", 1080);
+ fillMockRecords();
+ }
+
+ private static void fillMockRecords() {
+ Record recordFirst = new Record();
+ RequestBody requestBodyFirst = new RequestBody();
+ JsonBody jsonBodyFirst = new JsonBody();
+ jsonBodyFirst.setId(1);
+ jsonBodyFirst.setVal_bool(true);
+ jsonBodyFirst.setVal_int8(new Byte("1"));
+ jsonBodyFirst.setVal_int16((short) 2);
+ jsonBodyFirst.setVal_int32(3);
+ jsonBodyFirst.setVal_int64(4);
+ jsonBodyFirst.setVal_float(4.3F);
+ jsonBodyFirst.setVal_double(5.3);
+ jsonBodyFirst.setVal_decimal(BigDecimal.valueOf(6.3));
+ jsonBodyFirst.setVal_string("NEW");
+ jsonBodyFirst.setVal_unixtime_micros("2020-02-02T02:02:02");
+ requestBodyFirst.setJson(jsonBodyFirst);
+ recordFirst.setBody(requestBodyFirst);
+
+ Record recordSec = new Record();
+ RequestBody requestBodySec = new RequestBody();
+ JsonBody jsonBodySec = new JsonBody();
+ jsonBodySec.setId(2);
+ jsonBodySec.setVal_bool(true);
+ jsonBodySec.setVal_int8(new Byte("1"));
+ jsonBodySec.setVal_int16((short) 2);
+ jsonBodySec.setVal_int32(3);
+ jsonBodySec.setVal_int64(4);
+ jsonBodySec.setVal_float(4.3F);
+ jsonBodySec.setVal_double(5.3);
+ jsonBodySec.setVal_decimal(BigDecimal.valueOf(6.3));
+ jsonBodySec.setVal_string("NEW");
+ jsonBodySec.setVal_unixtime_micros("2020-02-02T02:02:02");
+ requestBodySec.setJson(jsonBodySec);
+ recordSec.setBody(requestBodySec);
+ records.add(recordFirst);
+ records.add(recordSec);
}
@AfterAll
@@ -87,6 +147,9 @@ public void tearDown() {
if (mockserverContainer != null) {
mockserverContainer.stop();
}
+ if (mockServerClient != null) {
+ mockServerClient.close();
+ }
}
@TestTemplate
@@ -176,7 +239,53 @@ public void testMultiTableHttp(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/fake_to_multitable.conf");
Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertTrue(execResult.getStdout().contains(successCount));
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ String mockResponse =
+ mockServerClient.retrieveRecordedRequests(
+ request().withPath("/example/httpMultiTableContentSink").withMethod("POST"),
+ Format.JSON);
+ List recordResponse =
+ objectMapper.readValue(mockResponse, new TypeReference>() {});
+ recordResponse =
+ recordResponse.stream()
+ .sorted(
+ (r1, r2) ->
+ r1.getBody().getJson().getId()
+ - r2.getBody().getJson().getId())
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(records, recordResponse);
+ }
+
+ @Getter
+ @Setter
+ @EqualsAndHashCode
+ static class Record {
+ private RequestBody body;
+ }
+
+ @Getter
+ @Setter
+ @EqualsAndHashCode
+ static class RequestBody {
+ private JsonBody json;
+ }
+
+ @Getter
+ @Setter
+ @EqualsAndHashCode
+ static class JsonBody {
+ private int id;
+ private boolean val_bool;
+ private byte val_int8;
+ private short val_int16;
+ private int val_int32;
+ private long val_int64;
+ private float val_float;
+ private double val_double;
+ private BigDecimal val_decimal;
+ private String val_string;
+ private String val_unixtime_micros;
}
public String getMockServerConfig() {