diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index a7260df48c0e..c565381dc926 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -225,7 +225,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.42 + dockerImageTag: 0.3.43 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 9a1353e3c128..5a418f83e059 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3622,7 +3622,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.42" +- dockerImage: "airbyte/destination-redshift:0.3.43" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 3cbda6b7be68..416f33553e54 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.jdbc; import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES; -import static java.util.stream.Collectors.toSet; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; @@ -104,7 +103,7 @@ private static Function toWriteConfig( /** * Defer to the {@link AirbyteStream}'s namespace. If this is not set, use the destination's default * schema. This namespace is source-provided, and can be potentially empty. - * + *

* The logic here matches the logic in the catalog_process.py for Normalization. Any modifications * need to be reflected there and vice versa. */ @@ -159,7 +158,7 @@ private static OnCloseFunction onCloseFunction(final JdbcDatabase database, // copy data if (!hasFailed) { final List queryList = new ArrayList<>(); - sqlOperations.onDestinationCloseOperations(database, writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(toSet())); + sqlOperations.onDestinationCloseOperations(database, writeConfigs); LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size()); for (final WriteConfig writeConfig : writeConfigs) { final String schemaName = writeConfig.getOutputSchemaName(); @@ -193,7 +192,9 @@ private static OnCloseFunction onCloseFunction(final JdbcDatabase database, sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); } LOGGER.info("Cleaning tmp tables in destination completed."); - }; + } + + ; } private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java index 77be0d088239..07104b5b9f66 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java @@ -8,7 +8,6 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.util.List; -import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,10 +129,10 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN * Redshift destination: * * @param database - Database that the connector is interacting with - * @param schemaNames - schemas will be discovered + * @param writeConfigs - schemas and tables (streams) will be discovered * @see io.airbyte.integrations.destination.redshift.RedshiftSqlOperations#onDestinationCloseOperations */ - default void onDestinationCloseOperations(JdbcDatabase database, Set schemaNames) { + default void onDestinationCloseOperations(final JdbcDatabase database, final List writeConfigs) { // do nothing LOGGER.info("No onDestinationCloseOperations required for this destination."); } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 12bdfa66ae49..6d8783ec12bf 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.jdbc.copy; import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES; -import static java.util.stream.Collectors.toSet; import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.JdbcDatabase; @@ -162,8 +161,6 @@ private static void closeAsOneTransaction(final Map outputRecordCollector) { - final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config)); final EncryptionConfig encryptionConfig = config.has("uploading_method") ? EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption(); + final JsonNode s3Options = findS3Options(config); + final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options); return new StagingConsumerFactory().create( outputRecordCollector, getDatabase(getDataSource(config)), @@ -135,7 +136,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)), config, catalog, - isPurgeStagingData(config)); + isPurgeStagingData(s3Options)); } private boolean isPurgeStagingData(final JsonNode config) { diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java index b416d5f3b604..6014fb36440a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.redshift.operations; import static io.airbyte.db.jdbc.JdbcUtils.getDefaultSourceOperations; +import static java.lang.String.join; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; @@ -12,14 +13,17 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils; +import io.airbyte.integrations.destination.jdbc.WriteConfig; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,21 +33,25 @@ public class RedshiftSqlOperations extends JdbcSqlOperations { public static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535; public static final int REDSHIFT_SUPER_MAX_BYTE_SIZE = 1000000; - private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT = """ - select tablename, schemaname - from pg_table_def - where tablename in ( - select tablename as tablename - from pg_table_def - where schemaname = '%1$s' - and tablename like '%%airbyte_raw%%' - and "column" in ('%2$s', '%3$s', '%4$s') - group by tablename - having count(*) = 3) - and schemaname = '%1$s' - and type <> 'super' - and "column" = '_airbyte_data'; - """; + private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT = + """ + select tablename, schemaname + from pg_table_def + where tablename in ( + select tablename as tablename + from pg_table_def + where schemaname = '%1$s' + and tablename in ('%5$s') + and tablename like '%%airbyte_raw%%' + and tablename not in (select table_name + from information_schema.views + where table_schema in ('%1$s')) + and "column" in ('%2$s', '%3$s', '%4$s') + group by tablename + having count(*) = 3) + and schemaname = '%1$s' + and type <> 'super' + and "column" = '_airbyte_data' """; private static final String ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE = """ @@ -51,8 +59,8 @@ having count(*) = 3) ALTER TABLE %1$s ADD COLUMN %3$s_reserve TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP; UPDATE %1$s SET %2$s_super = JSON_PARSE(%2$s); UPDATE %1$s SET %3$s_reserve = %3$s; - ALTER TABLE %1$s DROP COLUMN %2$s; - ALTER TABLE %1$s DROP COLUMN %3$s; + ALTER TABLE %1$s DROP COLUMN %2$s CASCADE; + ALTER TABLE %1$s DROP COLUMN %3$s CASCADE; ALTER TABLE %1$s RENAME %2$s_super to %2$s; ALTER TABLE %1$s RENAME %3$s_reserve to %3$s; """; @@ -104,8 +112,8 @@ public boolean isValidData(final JsonNode data) { // check VARCHAR limits for VARCHAR fields within the SUPER object, if overall object is valid if (isValid) { - Map dataMap = Jsons.flatten(data); - for (Object value : dataMap.values()) { + final Map dataMap = Jsons.flatten(data); + for (final Object value : dataMap.values()) { if (value instanceof String stringValue) { final int stringDataSize = stringValue.getBytes(StandardCharsets.UTF_8).length; isValid = stringDataSize <= REDSHIFT_VARCHAR_MAX_BYTE_SIZE; @@ -123,29 +131,60 @@ public boolean isValidData(final JsonNode data) { * SUPER type. This would be done once. * * @param database - Database object for interacting with a JDBC connection. - * @param writeConfigSet - list of write configs. + * @param writeConfigs - list of write configs. */ @Override - public void onDestinationCloseOperations(final JdbcDatabase database, final Set writeConfigSet) { + public void onDestinationCloseOperations(final JdbcDatabase database, final List writeConfigs) { LOGGER.info("Executing operations for Redshift Destination DB engine..."); - List schemaAndTableWithNotSuperType = writeConfigSet + if (writeConfigs.isEmpty()) { + LOGGER.warn("Write config list is EMPTY."); + return; + } + final Map> schemaTableMap = getTheSchemaAndRelatedStreamsMap(writeConfigs); + final List schemaAndTableWithNotSuperType = schemaTableMap + .entrySet() .stream() - .flatMap(schemaName -> discoverNotSuperTables(database, schemaName).stream()) - .toList(); + // String.join() we use to concat tables from list, in query, as follows: SELECT * FROM some_table + // WHERE smt_column IN ('test1', 'test2', etc) + .map(e -> discoverNotSuperTables(database, e.getKey(), join("', '", e.getValue()))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + if (!schemaAndTableWithNotSuperType.isEmpty()) { updateVarcharDataColumnToSuperDataColumn(database, schemaAndTableWithNotSuperType); } LOGGER.info("Executing operations for Redshift Destination DB engine completed."); } + /** + * The method is responsible for building the map which consists from: Keys - Schema names, Values - + * List of related tables (Streams) + * + * @param writeConfigs - write configs from which schema-related tables map will be built + * @return map with Schemas as Keys and with Tables (Streams) as values + */ + private Map> getTheSchemaAndRelatedStreamsMap(final List writeConfigs) { + final Map> schemaTableMap = new HashMap<>(); + for (final WriteConfig writeConfig : writeConfigs) { + if (schemaTableMap.containsKey(writeConfig.getOutputSchemaName())) { + schemaTableMap.get(writeConfig.getOutputSchemaName()).add(writeConfig.getOutputTableName()); + } else { + schemaTableMap.put(writeConfig.getOutputSchemaName(), new ArrayList<>(Collections.singletonList(writeConfig.getOutputTableName()))); + } + } + return schemaTableMap; + } + /** * @param database - Database object for interacting with a JDBC connection. * @param schemaName - schema to update. + * @param tableName - tables to update. */ - private List discoverNotSuperTables(final JdbcDatabase database, - final String schemaName) { - List schemaAndTableWithNotSuperType = new ArrayList<>(); + private List discoverNotSuperTables(final JdbcDatabase database, final String schemaName, final String tableName) { + + final List schemaAndTableWithNotSuperType = new ArrayList<>(); + try { LOGGER.info("Discovering NOT SUPER table types..."); database.execute(String.format("set search_path to %s", schemaName)); @@ -154,7 +193,8 @@ private List discoverNotSuperTables(final JdbcDatabase database, schemaName, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT, - JavaBaseConstants.COLUMN_NAME_AB_ID)), + JavaBaseConstants.COLUMN_NAME_AB_ID, + tableName)), getDefaultSourceOperations()::rowToJson); if (tablesNameWithoutSuperDatatype.isEmpty()) { return Collections.emptyList(); @@ -163,7 +203,7 @@ private List discoverNotSuperTables(final JdbcDatabase database, .forEach(e -> schemaAndTableWithNotSuperType.add(e.get("schemaname").textValue() + "." + e.get("tablename").textValue())); return schemaAndTableWithNotSuperType; } - } catch (SQLException e) { + } catch (final SQLException e) { LOGGER.error("Error during discoverNotSuperTables() appears: ", e); throw new RuntimeException(e); } @@ -177,7 +217,7 @@ private List discoverNotSuperTables(final JdbcDatabase database, */ private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase database, final List schemaAndTableWithNotSuperType) { LOGGER.info("Updating VARCHAR data column to SUPER..."); - StringBuilder finalSqlStatement = new StringBuilder(); + final StringBuilder finalSqlStatement = new StringBuilder(); // To keep the previous data, we need to add next columns: _airbyte_data, _airbyte_emitted_at // We do such workflow because we can't directly CAST VARCHAR to SUPER column. _airbyte_emitted_at // column recreated to keep @@ -191,7 +231,7 @@ private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase databas }); try { database.execute(finalSqlStatement.toString()); - } catch (SQLException e) { + } catch (final SQLException e) { LOGGER.error("Error during updateVarcharDataColumnToSuperDataColumn() appears: ", e); throw new RuntimeException(e); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/util/RedshiftUtil.java similarity index 94% rename from airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java rename to airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/util/RedshiftUtil.java index 78d7c5d81be0..1e4186f67d01 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/validator/RedshiftUtil.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/util/RedshiftUtil.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.redshift.validator; +package io.airbyte.integrations.destination.redshift.util; import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD; diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java index 80a53948a483..4e2268540f53 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java @@ -46,6 +46,7 @@ class RedshiftInsertDestinationAcceptanceTest extends RedshiftStagingS3Destinati private static final Instant NOW = Instant.now(); private static final String USERS_STREAM_NAME = "users_" + RandomStringUtils.randomAlphabetic(5); + private static final String BOOKS_STREAM_NAME = "books_" + RandomStringUtils.randomAlphabetic(5); private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) @@ -77,14 +78,17 @@ void setup() { .withDestinationSyncMode(DestinationSyncMode.APPEND))); } + @Test void testIfSuperTmpTableWasCreatedAfterVarcharTmpTable() throws Exception { setup(); - Database database = getDatabase(); - String rawTableName = this.getNamingResolver().getRawTableName(USERS_STREAM_NAME); - createTmpTableWithVarchar(database, rawTableName); + final Database database = getDatabase(); + final String usersStream = getNamingResolver().getRawTableName(USERS_STREAM_NAME); + final String booksStream = getNamingResolver().getRawTableName(BOOKS_STREAM_NAME); + createTmpTableWithVarchar(database, usersStream); + createTmpTableWithVarchar(database, booksStream); - assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, rawTableName, "character varying")); + assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, usersStream, "character varying")); final Destination destination = new RedshiftDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); @@ -94,7 +98,8 @@ void testIfSuperTmpTableWasCreatedAfterVarcharTmpTable() throws Exception { consumer.accept(MESSAGE_STATE); consumer.close(); - assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, rawTableName, "super")); + assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, usersStream, "super")); + assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, booksStream, "character varying")); final List usersActual = retrieveRecords(testDestinationEnv, USERS_STREAM_NAME, DATASET_ID, config); final List expectedUsersJson = Lists.newArrayList( diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java index 41589fd55cd6..2c35d769f2b7 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java @@ -40,6 +40,8 @@ public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftSt private ConfiguredAirbyteCatalog catalog; private static final Instant NOW = Instant.now(); + + private static final String USERS_STREAM_NAME = "users_" + RandomStringUtils.randomAlphabetic(5); private static final String BOOKS_STREAM_NAME = "books_" + RandomStringUtils.randomAlphabetic(5); private static final AirbyteMessage MESSAGE_BOOKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) @@ -75,11 +77,13 @@ void setup() { @Test void testIfSuperTmpTableWasCreatedAfterVarcharTmpTableDuringS3Staging() throws Exception { setup(); - Database database = getDatabase(); - String rawTableName = this.getNamingResolver().getRawTableName(BOOKS_STREAM_NAME); - createTmpTableWithVarchar(database, rawTableName); + final Database database = getDatabase(); + final String booksStream = getNamingResolver().getRawTableName(BOOKS_STREAM_NAME); + final String usersStream = getNamingResolver().getRawTableName(USERS_STREAM_NAME); + createTmpTableWithVarchar(database, usersStream); + createTmpTableWithVarchar(database, booksStream); - assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, rawTableName, "character varying")); + assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, booksStream, "character varying")); final Destination destination = new RedshiftDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); @@ -89,7 +93,8 @@ void testIfSuperTmpTableWasCreatedAfterVarcharTmpTableDuringS3Staging() throws E consumer.accept(MESSAGE_STATE); consumer.close(); - assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, rawTableName, "super")); + assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, booksStream, "super")); + assertTrue(isTmpTableDataColumnInExpectedType(database, DATASET_ID, usersStream, "character varying")); final List booksActual = retrieveRecords(testDestinationEnv, BOOKS_STREAM_NAME, DATASET_ID, config); final List expectedUsersJson = Lists.newArrayList( diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/util/RedshiftUtilTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/util/RedshiftUtilTest.java new file mode 100644 index 000000000000..f5e167cc7668 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/util/RedshiftUtilTest.java @@ -0,0 +1,67 @@ +package io.airbyte.integrations.destination.redshift.util; + +import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +public class RedshiftUtilTest { + + @Test + @DisplayName("Should return the config when the config has uploading method") + public void testFindS3OptionsWhenConfigHasUploadingMethod() { + JsonNode config = mock(JsonNode.class); + JsonNode uploadingMethod = mock(JsonNode.class); + when(config.has(UPLOADING_METHOD)).thenReturn(true); + when(config.get(UPLOADING_METHOD)).thenReturn(uploadingMethod); + + JsonNode result = RedshiftUtil.findS3Options(config); + + assertEquals(uploadingMethod, result); + } + + @Test + @DisplayName("Should return the config when the config does not have uploading method") + public void testFindS3OptionsWhenConfigDoesNotHaveUploadingMethod() { + JsonNode config = mock(JsonNode.class); + when(config.has(UPLOADING_METHOD)).thenReturn(false); + + JsonNode result = RedshiftUtil.findS3Options(config); + + assertEquals(config, result); + } + + @Test + @DisplayName("Should return true when all of the fields are null or empty") + public void testAnyOfS3FieldsAreNullOrEmptyWhenAllOfTheFieldsAreNullOrEmptyThenReturnTrue() { + JsonNode jsonNode = mock(JsonNode.class); + when(jsonNode.get("s3_bucket_name")).thenReturn(null); + when(jsonNode.get("s3_bucket_region")).thenReturn(null); + when(jsonNode.get("access_key_id")).thenReturn(null); + when(jsonNode.get("secret_access_key")).thenReturn(null); + + assertTrue(RedshiftUtil.anyOfS3FieldsAreNullOrEmpty(jsonNode)); + } + + @Test + @DisplayName("Should return false when all S3 required fields are not null or empty") + public void testAllS3RequiredAreNotNullOrEmptyThenReturnFalse() { + JsonNode jsonNode = mock(JsonNode.class); + when(jsonNode.get("s3_bucket_name")).thenReturn(mock(JsonNode.class)); + when(jsonNode.get("s3_bucket_name").asText()).thenReturn("test"); + when(jsonNode.get("s3_bucket_region")).thenReturn(mock(JsonNode.class)); + when(jsonNode.get("s3_bucket_region").asText()).thenReturn("test"); + when(jsonNode.get("access_key_id")).thenReturn(mock(JsonNode.class)); + when(jsonNode.get("access_key_id").asText()).thenReturn("test"); + when(jsonNode.get("secret_access_key")).thenReturn(mock(JsonNode.class)); + when(jsonNode.get("secret_access_key").asText()).thenReturn("test"); + + assertFalse(RedshiftUtil.anyOfS3FieldsAreNullOrEmpty(jsonNode)); + } +} \ No newline at end of file diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 2dbae028e0d3..75c2c860150a 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -136,33 +136,34 @@ Each stream will be output into its own raw table in Redshift. Each table will c ## Changelog -| Version | Date | Pull Request | Subject | -|:--------|:------------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.3.42 | 2022-06-21 | [\#14013](https://github.com/airbytehq/airbyte/pull/14013) | Add an option to use encryption with staging in Redshift Destination | -| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | -| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | -| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | -| 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | -| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type | -| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check | -| 0.3.33 | 2022-05-04 | [12601](https://github.com/airbytehq/airbyte/pull/12601) | Apply buffering strategy for S3 staging | -| 0.3.32 | 2022-04-20 | [12085](https://github.com/airbytehq/airbyte/pull/12085) | Fixed bug with switching between INSERT and COPY config | -| 0.3.31 | 2022-04-19 | [\#12064](https://github.com/airbytehq/airbyte/pull/12064) | Added option to support SUPER datatype in _airbyte_raw_** table | -| 0.3.29 | 2022-04-05 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Fixed bug with dashes in schema name | | -| 0.3.28 | 2022-03-18 | [\#11254](https://github.com/airbytehq/airbyte/pull/11254) | Fixed missing records during S3 staging | -| 0.3.27 | 2022-02-25 | [10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling | -| 0.3.25 | 2022-02-14 | [#9920](https://github.com/airbytehq/airbyte/pull/9920) | Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file. | -| 0.3.24 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | -| 0.3.23 | 2021-12-16 | [\#8855](https://github.com/airbytehq/airbyte/pull/8855) | Add `purgeStagingData` option to enable/disable deleting the staging data | -| 0.3.22 | 2021-12-15 | [#8607](https://github.com/airbytehq/airbyte/pull/8607) | Accept a path for the staging data | -| 0.3.21 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management | -| 0.3.20 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | -| 0.3.19 | 2021-10-21 | [7234](https://github.com/airbytehq/airbyte/pull/7234) | Allow SSL traffic only | -| 0.3.17 | 2021-10-12 | [6965](https://github.com/airbytehq/airbyte/pull/6965) | Added SSL Support | -| 0.3.16 | 2021-10-11 | [6949](https://github.com/airbytehq/airbyte/pull/6949) | Each stream was split into files of 10,000 records each for copying using S3 or GCS | -| 0.3.14 | 2021-10-08 | [5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | -| 0.3.13 | 2021-09-02 | [5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance | -| 0.3.12 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs | -| 0.3.11 | 2021-07-20 | [4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.43 | 2022-06-24 | [\#13690](https://github.com/airbytehq/airbyte/pull/13690) | Improved discovery for NOT SUPER column | +| 0.3.42 | 2022-06-21 | [\#14013](https://github.com/airbytehq/airbyte/pull/14013) | Add an option to use encryption with staging in Redshift Destination | +| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager | +| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method.
**PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. | +| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. | +| 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | +| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type | +| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check | +| 0.3.33 | 2022-05-04 | [12601](https://github.com/airbytehq/airbyte/pull/12601) | Apply buffering strategy for S3 staging | +| 0.3.32 | 2022-04-20 | [12085](https://github.com/airbytehq/airbyte/pull/12085) | Fixed bug with switching between INSERT and COPY config | +| 0.3.31 | 2022-04-19 | [\#12064](https://github.com/airbytehq/airbyte/pull/12064) | Added option to support SUPER datatype in _airbyte_raw_** table | +| 0.3.29 | 2022-04-05 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Fixed bug with dashes in schema name | | +| 0.3.28 | 2022-03-18 | [\#11254](https://github.com/airbytehq/airbyte/pull/11254) | Fixed missing records during S3 staging | +| 0.3.27 | 2022-02-25 | [10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling | +| 0.3.25 | 2022-02-14 | [#9920](https://github.com/airbytehq/airbyte/pull/9920) | Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file. | +| 0.3.24 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | +| 0.3.23 | 2021-12-16 | [\#8855](https://github.com/airbytehq/airbyte/pull/8855) | Add `purgeStagingData` option to enable/disable deleting the staging data | +| 0.3.22 | 2021-12-15 | [#8607](https://github.com/airbytehq/airbyte/pull/8607) | Accept a path for the staging data | +| 0.3.21 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management | +| 0.3.20 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | +| 0.3.19 | 2021-10-21 | [7234](https://github.com/airbytehq/airbyte/pull/7234) | Allow SSL traffic only | +| 0.3.17 | 2021-10-12 | [6965](https://github.com/airbytehq/airbyte/pull/6965) | Added SSL Support | +| 0.3.16 | 2021-10-11 | [6949](https://github.com/airbytehq/airbyte/pull/6949) | Each stream was split into files of 10,000 records each for copying using S3 or GCS | +| 0.3.14 | 2021-10-08 | [5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | +| 0.3.13 | 2021-09-02 | [5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance | +| 0.3.12 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs | +| 0.3.11 | 2021-07-20 | [4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |