Skip to content

Commit

Permalink
Remove DebeziumToIcebergTable move logic to IcebergUtil (#48)
Browse files Browse the repository at this point in the history
* Minor improve naming

* Minor improve naming

* Minor improve naming

* Minor improve naming
  • Loading branch information
ismailsimsek committed Oct 27, 2021
1 parent 3f58964 commit 032f0bc
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 156 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand All @@ -38,11 +35,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand All @@ -58,10 +57,15 @@
@Dependent
public class IcebergChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {

protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
protected final Clock clock = Clock.system();
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
protected long consumerStart = clock.currentTimeInMillis();
protected long numConsumedEvents = 0;
protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL);
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
Expand All @@ -84,7 +88,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
boolean eventSchemaEnabled;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
Expand All @@ -95,12 +98,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
InterfaceIcebergTableOperator icebergTableOperator;

protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
protected final Clock clock = Clock.system();
protected long consumerStart = clock.currentTimeInMillis();
protected long numConsumedEvents = 0;
protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL);

@PostConstruct
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
Expand Down Expand Up @@ -199,13 +196,27 @@ private Table createIcebergTable(TableIdentifier tableIdentifier,
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));
List<Types.NestedField> tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(event.value()));
List<Types.NestedField> keyColumns =
IcebergUtil.getIcebergFieldsFromEventSchema(event.key() == null ? null : getBytes(event.key()));

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to create table " + tableIdentifier);
}

return eventSchema.create(icebergCatalog, tableIdentifier, writeFormat);
Schema schema = IcebergUtil.getSchema(tableColumns, keyColumns);

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

return icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
.withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema))
.create();
}


private Optional<Table> loadIcebergTable(TableIdentifier tableId) {
try {
Table table = icebergCatalog.loadTable(tableId);
Expand All @@ -216,5 +227,4 @@ private Optional<Table> loadIcebergTable(TableIdentifier tableId) {
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
Expand Down Expand Up @@ -188,5 +187,61 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
return ret;
}

public static List<Types.NestedField> getIcebergFieldsFromEventSchema(byte[] eventVal) {

if(eventVal == null){
return new ArrayList<>();
}

try {
JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
if (IcebergUtil.hasSchema(jsonEvent)) {
return IcebergUtil.getIcebergSchema(jsonEvent.get("schema"));
}

LOGGER.trace("Event schema not found in the given data:!");
return new ArrayList<>();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) {
SortOrder.Builder sob = SortOrder.builderFor(schema);
for (String fieldName : schema.identifierFieldNames()) {
sob = sob.asc(fieldName);
}

return sob.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ public void testDataTypeChanges() throws Exception {
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)";
SourcePostgresqlDB.runSQL(sql);

SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " +
"ALTER COLUMN c_int2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_date2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " +
"ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " +
"ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " +
"ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean"
);
);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.write.format.default", "orc");

return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements Quar
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "false");
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfil
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
return config;
Expand Down

0 comments on commit 032f0bc

Please sign in to comment.