Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
9a3d6b5
airbyte-12843: Improved discovery for redshift-destination not SUPER …
alexandertsukanov Jun 10, 2022
85098d6
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 17, 2022
20977a0
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 20, 2022
a4fe3dd
Update airbyte-integrations/connectors/destination-redshift/src/main/…
alexandertsukanov Jun 23, 2022
476fab2
airbyte-12843: reformat the code.
alexandertsukanov Jun 23, 2022
6b93858
airbyte-12843: Modified the tests for improved discovery.
alexandertsukanov Jun 23, 2022
b96390c
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 23, 2022
f2b7150
airbyte-12843: Bump the version
alexandertsukanov Jun 23, 2022
02cde76
airbyte-12843: Bump the version
alexandertsukanov Jun 23, 2022
fbce188
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 24, 2022
fa73f41
airbyte-12843: Covered RedshiftUtilTest with unit tests.
alexandertsukanov Jun 24, 2022
28cd314
airbyte-12843: Covered RedshiftUtilTest with unit tests.
alexandertsukanov Jun 24, 2022
47def69
Merge branch 'master' into otsukanov/airbyte-12843_fix_redshift_disco…
alexandertsukanov Jun 24, 2022
58e44f3
airbyte-12843: Merged master.
alexandertsukanov Jun 24, 2022
03987f7
airbyte-12843: Merged master.
alexandertsukanov Jun 24, 2022
f947262
airbyte-12843: Merged master.
alexandertsukanov Jun 24, 2022
e3d1b16
auto-bump connector version
octavia-squidington-iii Jun 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.destination.jdbc;

import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;
import static java.util.stream.Collectors.toSet;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -49,11 +48,11 @@ public class JdbcBufferedConsumerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);

public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final JdbcDatabase database,
final SqlOperations sqlOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired());

return new BufferedStreamConsumer(
Expand All @@ -66,9 +65,9 @@ public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outpu
}

private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean schemaRequired) {
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean schemaRequired) {
if (schemaRequired) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
}
Expand All @@ -77,10 +76,10 @@ private static List<WriteConfig> createWriteConfigs(final NamingConventionTransf
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final NamingConventionTransformer namingResolver,
final JsonNode config,
final Instant now,
final boolean schemaRequired) {
final NamingConventionTransformer namingResolver,
final JsonNode config,
final Instant now,
final boolean schemaRequired) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream abStream = stream.getStream();
Expand All @@ -102,23 +101,21 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
}

/**
* Defer to the {@link AirbyteStream}'s namespace. If this is not set, use the destination's default
* schema. This namespace is source-provided, and can be potentially empty.
*
* The logic here matches the logic in the catalog_process.py for Normalization. Any modifications
* need to be reflected there and vice versa.
* Defer to the {@link AirbyteStream}'s namespace. If this is not set, use the destination's default schema. This namespace is source-provided, and can be potentially empty.
* <p>
* The logic here matches the logic in the catalog_process.py for Normalization. Any modifications need to be reflected there and vice versa.
*/
private static String getOutputSchema(final AirbyteStream stream,
final String defaultDestSchema,
final NamingConventionTransformer namingResolver) {
final String defaultDestSchema,
final NamingConventionTransformer namingResolver) {
return stream.getNamespace() != null
? namingResolver.getNamespace(stream.getNamespace())
: namingResolver.getNamespace(defaultDestSchema);
}

private static OnStartFunction onStartFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
return () -> {
LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
Expand All @@ -135,9 +132,9 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
}

private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig = writeConfigs.stream()
.collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity()));

Expand All @@ -153,13 +150,13 @@ private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Jdb
}

private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
return (hasFailed) -> {
// copy data
if (!hasFailed) {
final List<String> queryList = new ArrayList<>();
sqlOperations.onDestinationCloseOperations(database, writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(toSet()));
sqlOperations.onDestinationCloseOperations(database, writeConfigs);
LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
Expand All @@ -178,24 +175,26 @@ private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
queryList.add(sqlOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName));
}

LOGGER.info("Executing finalization of tables.");
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
}
// clean up
LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName,
tmpTableName);

sqlOperations.dropTableIfExists(database, schemaName, tmpTableName);
}
LOGGER.info("Cleaning tmp tables in destination completed.");
};
LOGGER.info("Executing finalization of tables.");
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
}
// clean up
LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName,
tmpTableName);

sqlOperations.dropTableIfExists(database, schemaName, tmpTableName);
}
LOGGER.info("Cleaning tmp tables in destination completed.");
}

;
}

private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
return new AirbyteStreamNameNamespacePair(config.getStreamName(), config.getNamespace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -130,10 +129,10 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
* Redshift destination:
*
* @param database - Database that the connector is interacting with
* @param schemaNames - schemas will be discovered
* @param writeConfigs - schemas and tables (streams) will be discovered
* @see io.airbyte.integrations.destination.redshift.RedshiftSqlOperations#onDestinationCloseOperations
*/
default void onDestinationCloseOperations(JdbcDatabase database, Set<String> schemaNames) {
default void onDestinationCloseOperations(final JdbcDatabase database, final List<WriteConfig> writeConfigs) {
// do nothing
LOGGER.info("No onDestinationCloseOperations required for this destination.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ private static void closeAsOneTransaction(final Map<AirbyteStreamNameNamespacePa
}
}
if (!hasFailed) {
sqlOperations.onDestinationCloseOperations(db,
pairToCopier.keySet().stream().map(AirbyteStreamNameNamespacePair::getNamespace).collect(toSet()));
sqlOperations.executeTransaction(db, queries);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
}
queryList.add(stagingOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName));
}
stagingOperations.onDestinationCloseOperations(database,
writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet()));
stagingOperations.onDestinationCloseOperations(database, writeConfigs);
LOGGER.info("Executing finalization of tables.");
stagingOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
Expand Down
Loading