diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index bdf29edb9beb..1643b32da320 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -309,7 +309,7 @@ jobs: - name: run updated modules integration test (part-1) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 5 0` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 0` ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci env: MAVEN_OPTS: -Xmx2048m @@ -334,7 +334,7 @@ jobs: - name: run updated modules integration test (part-2) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 5 1` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 1` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -363,7 +363,7 @@ jobs: - name: run updated modules integration test (part-3) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 5 2` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 2` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -392,7 +392,7 @@ jobs: - name: run updated modules integration test (part-4) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 5 3` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 3` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -401,33 +401,89 @@ jobs: env: MAVEN_OPTS: -Xmx2048m updated-modules-integration-test-part-5: - needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - runs-on: ${{ matrix.os }} - strategy: - matrix: - java: [ '8', '11' ] - os: [ 'ubuntu-latest' ] - timeout-minutes: 90 - steps: - - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v3 - with: - java-version: ${{ matrix.java }} - distribution: 'temurin' - cache: 'maven' - - name: run updated modules integration test (part-5) - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 5 4` - if [ ! -z $sub_modules ]; then - ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci - else - echo "sub modules is empty, skipping" - fi - env: - MAVEN_OPTS: -Xmx2048m + needs: [ changes, sanity-check ] + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + runs-on: ${{ matrix.os }} + strategy: + matrix: + java: [ '8', '11' ] + os: [ 'ubuntu-latest' ] + timeout-minutes: 90 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + - name: run updated modules integration test (part-5) + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + run: | + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 4` + if [ ! -z $sub_modules ]; then + ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + else + echo "sub modules is empty, skipping" + fi + env: + MAVEN_OPTS: -Xmx2048m + updated-modules-integration-test-part-6: + needs: [ changes, sanity-check ] + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + runs-on: ${{ matrix.os }} + strategy: + matrix: + java: [ '8', '11' ] + os: [ 'ubuntu-latest' ] + timeout-minutes: 90 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + - name: run updated modules integration test (part-6) + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + run: | + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 5` + if [ ! -z $sub_modules ]; then + ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + else + echo "sub modules is empty, skipping" + fi + env: + MAVEN_OPTS: -Xmx2048m + updated-modules-integration-test-part-7: + needs: [ changes, sanity-check ] + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + runs-on: ${{ matrix.os }} + strategy: + matrix: + java: [ '8', '11' ] + os: [ 'ubuntu-latest' ] + timeout-minutes: 90 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + - name: run updated modules integration test (part-7) + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + run: | + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 6` + if [ ! -z $sub_modules ]; then + ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + else + echo "sub modules is empty, skipping" + fi + env: + MAVEN_OPTS: -Xmx2048m engine-v2-it: needs: [ changes, sanity-check ] if: needs.changes.outputs.api == 'true' diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java index c8666cd9a55e..5868fba91276 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException; import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule; import org.apache.commons.lang3.StringUtils; @@ -27,6 +29,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -108,33 +111,8 @@ private boolean pass(Object value, AssertFieldRule.AssertRule valueRule) { return ((Number) value).doubleValue() >= valueRule.getRuleValue(); } if (valueRule.getEqualTo() != null) { - if (value instanceof String) { - return value.equals(valueRule.getEqualTo()); - } - if (value instanceof Number) { - return ((Number) value).doubleValue() == Double.parseDouble(valueRule.getEqualTo()); - } - if (value instanceof Boolean) { - return value.equals(Boolean.parseBoolean(valueRule.getEqualTo())); - } - if (value instanceof LocalDateTime) { - TemporalAccessor parsedTimestamp = - DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(valueRule.getEqualTo()); - LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); - LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); - return ((LocalDateTime) value).isEqual(LocalDateTime.of(localDate, localTime)); - } - if (value instanceof LocalDate) { - DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - return ((LocalDate) value).isEqual(LocalDate.parse(valueRule.getEqualTo(), fmt)); - } - if (value instanceof LocalTime) { - DateTimeFormatter fmt = DateTimeFormatter.ofPattern("HH:mm:ss"); - return value.equals(LocalTime.parse(valueRule.getEqualTo(), fmt)); - } - return false; + return compareValue(value, valueRule); } - String valueStr = Objects.isNull(value) ? StringUtils.EMPTY : String.valueOf(value); if (AssertFieldRule.AssertRuleType.MAX_LENGTH.equals(valueRule.getRuleType())) { return valueStr.length() <= valueRule.getRuleValue(); @@ -146,6 +124,44 @@ private boolean pass(Object value, AssertFieldRule.AssertRule valueRule) { return Boolean.TRUE; } + private boolean compareValue(Object value, AssertFieldRule.AssertRule valueRule) { + if (value instanceof String) { + return value.equals(valueRule.getEqualTo()); + } else if (value instanceof Integer) { + return value.equals(Integer.parseInt(valueRule.getEqualTo())); + } else if (value instanceof Long) { + return value.equals(Long.parseLong(valueRule.getEqualTo())); + } else if (value instanceof Short) { + return value.equals(Short.parseShort(valueRule.getEqualTo())); + } else if (value instanceof Float) { + return value.equals((Float.parseFloat(valueRule.getEqualTo()))); + } else if (value instanceof Byte) { + return value.equals((Byte.parseByte(valueRule.getEqualTo()))); + } else if (value instanceof Double) { + return value.equals(Double.parseDouble(valueRule.getEqualTo())); + } else if (value instanceof BigDecimal) { + return value.equals(new BigDecimal(valueRule.getEqualTo())); + } else if (value instanceof Boolean) { + return value.equals(Boolean.parseBoolean(valueRule.getEqualTo())); + } else if (value instanceof LocalDateTime) { + TemporalAccessor parsedTimestamp = + DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(valueRule.getEqualTo()); + LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); + LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); + return ((LocalDateTime) value).isEqual(LocalDateTime.of(localDate, localTime)); + } else if (value instanceof LocalDate) { + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + return ((LocalDate) value).isEqual(LocalDate.parse(valueRule.getEqualTo(), fmt)); + } else if (value instanceof LocalTime) { + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("HH:mm:ss"); + return value.equals(LocalTime.parse(valueRule.getEqualTo(), fmt)); + } else { + throw new AssertConnectorException( + AssertConnectorErrorCode.TYPES_NOT_SUPPORTED_FAILED, + String.format(" %s types not supported yet", value.getClass().getSimpleName())); + } + } + private Boolean checkType(Object value, SeaTunnelDataType fieldType) { return value.getClass().equals(fieldType.getTypeClass()); } diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/exception/AssertConnectorErrorCode.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/exception/AssertConnectorErrorCode.java index abb085e2837e..16ae8aed1c7d 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/exception/AssertConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/exception/AssertConnectorErrorCode.java @@ -20,7 +20,8 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; public enum AssertConnectorErrorCode implements SeaTunnelErrorCode { - RULE_VALIDATION_FAILED("ASSERT-01", "Rule validate failed"); + RULE_VALIDATION_FAILED("ASSERT-01", "Rule validate failed"), + TYPES_NOT_SUPPORTED_FAILED("ASSERT-02", "Types not supported"); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java index f479dfa5c99b..eccf2c684505 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -105,5 +106,6 @@ private SeaTunnelDataType getFieldType(String fieldTypeStr) { TYPES.put("datetime", LocalTimeType.LOCAL_DATE_TIME_TYPE); TYPES.put("date", LocalTimeType.LOCAL_DATE_TYPE); TYPES.put("time", LocalTimeType.LOCAL_TIME_TYPE); + TYPES.put("decimal", new DecimalType(38, 18)); } }