Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove DebeziumToIcebergTable move logic to IcebergUtil #48

Merged
merged 5 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand All @@ -38,11 +35,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand All @@ -58,10 +57,15 @@
@Dependent
public class IcebergChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {

protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
protected final Clock clock = Clock.system();
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
protected long consumerStart = clock.currentTimeInMillis();
protected long numConsumedEvents = 0;
protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL);
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
Expand All @@ -84,7 +88,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
boolean eventSchemaEnabled;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
Expand All @@ -95,12 +98,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
InterfaceIcebergTableOperator icebergTableOperator;

protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
protected final Clock clock = Clock.system();
protected long consumerStart = clock.currentTimeInMillis();
protected long numConsumedEvents = 0;
protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL);

@PostConstruct
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
Expand Down Expand Up @@ -199,13 +196,27 @@ private Table createIcebergTable(TableIdentifier tableIdentifier,
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));
List<Types.NestedField> tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(event.value()));
List<Types.NestedField> keyColumns =
IcebergUtil.getIcebergFieldsFromEventSchema(event.key() == null ? null : getBytes(event.key()));

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to create table " + tableIdentifier);
}

return eventSchema.create(icebergCatalog, tableIdentifier, writeFormat);
Schema schema = IcebergUtil.getSchema(tableColumns, keyColumns);

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

return icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
.withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema))
.create();
}


private Optional<Table> loadIcebergTable(TableIdentifier tableId) {
try {
Table table = icebergCatalog.loadTable(tableId);
Expand All @@ -216,5 +227,4 @@ private Optional<Table> loadIcebergTable(TableIdentifier tableId) {
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
Expand Down Expand Up @@ -188,5 +187,61 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
return ret;
}

public static List<Types.NestedField> getIcebergFieldsFromEventSchema(byte[] eventVal) {

if(eventVal == null){
return new ArrayList<>();
}

try {
JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
if (IcebergUtil.hasSchema(jsonEvent)) {
return IcebergUtil.getIcebergSchema(jsonEvent.get("schema"));
}

LOGGER.trace("Event schema not found in the given data:!");
return new ArrayList<>();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) {
SortOrder.Builder sob = SortOrder.builderFor(schema);
for (String fieldName : schema.identifierFieldNames()) {
sob = sob.asc(fieldName);
}

return sob.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ public void testDataTypeChanges() throws Exception {
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)";
SourcePostgresqlDB.runSQL(sql);

SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " +
"ALTER COLUMN c_int2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_date2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " +
"ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " +
"ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " +
"ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean"
);
);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.write.format.default", "orc");

return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements Quar
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "false");
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfil
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
return config;
Expand Down