Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
9a3d6b5
airbyte-12843: Improved discovery for redshift-destination not SUPER …
alexandertsukanov Jun 10, 2022
85098d6
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 17, 2022
20977a0
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 20, 2022
a4fe3dd
Update airbyte-integrations/connectors/destination-redshift/src/main/…
alexandertsukanov Jun 23, 2022
476fab2
airbyte-12843: reformat the code.
alexandertsukanov Jun 23, 2022
6b93858
airbyte-12843: Modified the tests for improved discovery.
alexandertsukanov Jun 23, 2022
b96390c
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 23, 2022
f2b7150
airbyte-12843: Bump the version
alexandertsukanov Jun 23, 2022
02cde76
airbyte-12843: Bump the version
alexandertsukanov Jun 23, 2022
fbce188
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 24, 2022
fa73f41
airbyte-12843: Covered RedshiftUtilTest with unit tests.
alexandertsukanov Jun 24, 2022
28cd314
airbyte-12843: Covered RedshiftUtilTest with unit tests.
alexandertsukanov Jun 24, 2022
47def69
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 24, 2022
58e44f3
airbyte-12843: Merged master.
alexandertsukanov Jun 24, 2022
03987f7
airbyte-12843: Merged master.
alexandertsukanov Jun 24, 2022
f947262
airbyte-12843: Merged master.
alexandertsukanov Jun 24, 2022
e3d1b16
auto-bump connector version
octavia-squidington-iii Jun 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.42
dockerImageTag: 0.3.43
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3622,7 +3622,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.42"
- dockerImage: "airbyte/destination-redshift:0.3.43"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.destination.jdbc;

import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;
import static java.util.stream.Collectors.toSet;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -104,7 +103,7 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
/**
* Defer to the {@link AirbyteStream}'s namespace. If this is not set, use the destination's default
* schema. This namespace is source-provided, and can be potentially empty.
*
* <p>
* The logic here matches the logic in the catalog_process.py for Normalization. Any modifications
* need to be reflected there and vice versa.
*/
Expand Down Expand Up @@ -159,7 +158,7 @@ private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
// copy data
if (!hasFailed) {
final List<String> queryList = new ArrayList<>();
sqlOperations.onDestinationCloseOperations(database, writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(toSet()));
sqlOperations.onDestinationCloseOperations(database, writeConfigs);
LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
Expand Down Expand Up @@ -193,7 +192,9 @@ private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
sqlOperations.dropTableIfExists(database, schemaName, tmpTableName);
}
LOGGER.info("Cleaning tmp tables in destination completed.");
};
}

;
}

private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -130,10 +129,10 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
* Redshift destination:
*
* @param database - Database that the connector is interacting with
* @param schemaNames - schemas will be discovered
* @param writeConfigs - schemas and tables (streams) will be discovered
* @see io.airbyte.integrations.destination.redshift.RedshiftSqlOperations#onDestinationCloseOperations
*/
default void onDestinationCloseOperations(JdbcDatabase database, Set<String> schemaNames) {
default void onDestinationCloseOperations(final JdbcDatabase database, final List<WriteConfig> writeConfigs) {
// do nothing
LOGGER.info("No onDestinationCloseOperations required for this destination.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.destination.jdbc.copy;

import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;
import static java.util.stream.Collectors.toSet;

import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
Expand Down Expand Up @@ -162,8 +161,6 @@ private static void closeAsOneTransaction(final Map<AirbyteStreamNameNamespacePa
}
}
if (!hasFailed) {
sqlOperations.onDestinationCloseOperations(db,
pairToCopier.keySet().stream().map(AirbyteStreamNameNamespacePair::getNamespace).collect(toSet()));
sqlOperations.executeTransaction(db, queries);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
}
queryList.add(stagingOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName));
}
stagingOperations.onDestinationCloseOperations(database,
writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet()));
stagingOperations.onDestinationCloseOperations(database, writeConfigs);
LOGGER.info("Executing finalization of tables.");
stagingOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.42
LABEL io.airbyte.version=0.3.43
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.integrations.destination.redshift;

import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.anyOfS3FieldsAreNullOrEmpty;
import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.findS3Options;
import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.anyOfS3FieldsAreNullOrEmpty;
import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.base.Destination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.USERNAME;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcConfig;
import static io.airbyte.integrations.destination.redshift.validator.RedshiftUtil.findS3Options;
import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options;
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -124,9 +124,10 @@ public JsonNode toJdbcConfig(final JsonNode config) {
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig = config.has("uploading_method") ?
EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption();
final JsonNode s3Options = findS3Options(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
Expand All @@ -135,7 +136,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
config,
catalog,
isPurgeStagingData(config));
isPurgeStagingData(s3Options));
}

private boolean isPurgeStagingData(final JsonNode config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
package io.airbyte.integrations.destination.redshift.operations;

import static io.airbyte.db.jdbc.JdbcUtils.getDefaultSourceOperations;
import static java.lang.String.join;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,30 +33,34 @@ public class RedshiftSqlOperations extends JdbcSqlOperations {
public static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535;
public static final int REDSHIFT_SUPER_MAX_BYTE_SIZE = 1000000;

private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT = """
select tablename, schemaname
from pg_table_def
where tablename in (
select tablename as tablename
from pg_table_def
where schemaname = '%1$s'
and tablename like '%%airbyte_raw%%'
and "column" in ('%2$s', '%3$s', '%4$s')
group by tablename
having count(*) = 3)
and schemaname = '%1$s'
and type <> 'super'
and "column" = '_airbyte_data';
""";
private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT =
"""
select tablename, schemaname
from pg_table_def
where tablename in (
select tablename as tablename
from pg_table_def
where schemaname = '%1$s'
and tablename in ('%5$s')
and tablename like '%%airbyte_raw%%'
and tablename not in (select table_name
from information_schema.views
where table_schema in ('%1$s'))
and "column" in ('%2$s', '%3$s', '%4$s')
group by tablename
having count(*) = 3)
and schemaname = '%1$s'
and type <> 'super'
and "column" = '_airbyte_data' """;

private static final String ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE =
"""
ALTER TABLE %1$s ADD COLUMN %2$s_super super;
ALTER TABLE %1$s ADD COLUMN %3$s_reserve TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP;
UPDATE %1$s SET %2$s_super = JSON_PARSE(%2$s);
UPDATE %1$s SET %3$s_reserve = %3$s;
ALTER TABLE %1$s DROP COLUMN %2$s;
ALTER TABLE %1$s DROP COLUMN %3$s;
ALTER TABLE %1$s DROP COLUMN %2$s CASCADE;
ALTER TABLE %1$s DROP COLUMN %3$s CASCADE;
ALTER TABLE %1$s RENAME %2$s_super to %2$s;
ALTER TABLE %1$s RENAME %3$s_reserve to %3$s;
""";
Expand Down Expand Up @@ -104,8 +112,8 @@ public boolean isValidData(final JsonNode data) {

// check VARCHAR limits for VARCHAR fields within the SUPER object, if overall object is valid
if (isValid) {
Map<String, Object> dataMap = Jsons.flatten(data);
for (Object value : dataMap.values()) {
final Map<String, Object> dataMap = Jsons.flatten(data);
for (final Object value : dataMap.values()) {
if (value instanceof String stringValue) {
final int stringDataSize = stringValue.getBytes(StandardCharsets.UTF_8).length;
isValid = stringDataSize <= REDSHIFT_VARCHAR_MAX_BYTE_SIZE;
Expand All @@ -123,29 +131,60 @@ public boolean isValidData(final JsonNode data) {
* SUPER type. This would be done once.
*
* @param database - Database object for interacting with a JDBC connection.
* @param writeConfigSet - list of write configs.
* @param writeConfigs - list of write configs.
*/

@Override
public void onDestinationCloseOperations(final JdbcDatabase database, final Set<String> writeConfigSet) {
public void onDestinationCloseOperations(final JdbcDatabase database, final List<WriteConfig> writeConfigs) {
LOGGER.info("Executing operations for Redshift Destination DB engine...");
List<String> schemaAndTableWithNotSuperType = writeConfigSet
if (writeConfigs.isEmpty()) {
LOGGER.warn("Write config list is EMPTY.");
return;
}
final Map<String, List<String>> schemaTableMap = getTheSchemaAndRelatedStreamsMap(writeConfigs);
final List<String> schemaAndTableWithNotSuperType = schemaTableMap
.entrySet()
.stream()
.flatMap(schemaName -> discoverNotSuperTables(database, schemaName).stream())
.toList();
// String.join() we use to concat tables from list, in query, as follows: SELECT * FROM some_table
// WHERE smt_column IN ('test1', 'test2', etc)
.map(e -> discoverNotSuperTables(database, e.getKey(), join("', '", e.getValue())))
.flatMap(Collection::stream)
.collect(Collectors.toList());

if (!schemaAndTableWithNotSuperType.isEmpty()) {
updateVarcharDataColumnToSuperDataColumn(database, schemaAndTableWithNotSuperType);
}
LOGGER.info("Executing operations for Redshift Destination DB engine completed.");
}

/**
* The method is responsible for building the map which consists from: Keys - Schema names, Values -
* List of related tables (Streams)
*
* @param writeConfigs - write configs from which schema-related tables map will be built
* @return map with Schemas as Keys and with Tables (Streams) as values
*/
private Map<String, List<String>> getTheSchemaAndRelatedStreamsMap(final List<WriteConfig> writeConfigs) {
final Map<String, List<String>> schemaTableMap = new HashMap<>();
for (final WriteConfig writeConfig : writeConfigs) {
if (schemaTableMap.containsKey(writeConfig.getOutputSchemaName())) {
schemaTableMap.get(writeConfig.getOutputSchemaName()).add(writeConfig.getOutputTableName());
} else {
schemaTableMap.put(writeConfig.getOutputSchemaName(), new ArrayList<>(Collections.singletonList(writeConfig.getOutputTableName())));
}
}
return schemaTableMap;
}

/**
* @param database - Database object for interacting with a JDBC connection.
* @param schemaName - schema to update.
* @param tableName - tables to update.
*/
private List<String> discoverNotSuperTables(final JdbcDatabase database,
final String schemaName) {
List<String> schemaAndTableWithNotSuperType = new ArrayList<>();
private List<String> discoverNotSuperTables(final JdbcDatabase database, final String schemaName, final String tableName) {

final List<String> schemaAndTableWithNotSuperType = new ArrayList<>();

try {
LOGGER.info("Discovering NOT SUPER table types...");
database.execute(String.format("set search_path to %s", schemaName));
Expand All @@ -154,7 +193,8 @@ private List<String> discoverNotSuperTables(final JdbcDatabase database,
schemaName,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_ID)),
JavaBaseConstants.COLUMN_NAME_AB_ID,
tableName)),
getDefaultSourceOperations()::rowToJson);
if (tablesNameWithoutSuperDatatype.isEmpty()) {
return Collections.emptyList();
Expand All @@ -163,7 +203,7 @@ private List<String> discoverNotSuperTables(final JdbcDatabase database,
.forEach(e -> schemaAndTableWithNotSuperType.add(e.get("schemaname").textValue() + "." + e.get("tablename").textValue()));
return schemaAndTableWithNotSuperType;
}
} catch (SQLException e) {
} catch (final SQLException e) {
LOGGER.error("Error during discoverNotSuperTables() appears: ", e);
throw new RuntimeException(e);
}
Expand All @@ -177,7 +217,7 @@ private List<String> discoverNotSuperTables(final JdbcDatabase database,
*/
private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase database, final List<String> schemaAndTableWithNotSuperType) {
LOGGER.info("Updating VARCHAR data column to SUPER...");
StringBuilder finalSqlStatement = new StringBuilder();
final StringBuilder finalSqlStatement = new StringBuilder();
// To keep the previous data, we need to add next columns: _airbyte_data, _airbyte_emitted_at
// We do such workflow because we can't directly CAST VARCHAR to SUPER column. _airbyte_emitted_at
// column recreated to keep
Expand All @@ -191,7 +231,7 @@ private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase databas
});
try {
database.execute(finalSqlStatement.toString());
} catch (SQLException e) {
} catch (final SQLException e) {
LOGGER.error("Error during updateVarcharDataColumnToSuperDataColumn() appears: ", e);
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift.validator;
package io.airbyte.integrations.destination.redshift.util;

import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD;

Expand Down
Loading