Skip to content

Commit

Permalink
Convert __source_ts_ms and __ts_ms to TimestampType.withZone type (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Sep 8, 2022
1 parent 0540a97 commit 1419aae
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms")
String partitionField;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
Expand Down Expand Up @@ -188,7 +190,9 @@ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, I
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat,
!upsert, // partition if its append mode
partitionField);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class IcebergChangeEvent {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
Expand Down Expand Up @@ -63,19 +64,10 @@ public String destination() {
}

public GenericRecord asIcebergRecord(Schema schema) {
final GenericRecord record = asIcebergRecord(schema.asStruct(), value);

if (value != null && value.has("__source_ts_ms") && value.get("__source_ts_ms") != null) {
final long source_ts_ms = value.get("__source_ts_ms").longValue();
final OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.ofEpochMilli(source_ts_ms), ZoneOffset.UTC);
record.setField("__source_ts", odt);
} else {
record.setField("__source_ts", null);
}
return record;
return asIcebergRecord(schema.asStruct(), value);
}

private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);

Expand All @@ -92,14 +84,18 @@ private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode dat
return record;
}

private Type.PrimitiveType icebergFieldType(String fieldType) {
private static Type.PrimitiveType icebergFieldType(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
return Types.LongType.get();
if (TS_MS_FIELDS.contains(fieldName)) {
return Types.TimestampType.withZone();
} else {
return Types.LongType.get();
}
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
Expand All @@ -121,7 +117,7 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
}
}

private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand All @@ -147,6 +143,15 @@ private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
case UUID:
val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString());
break;
case TIMESTAMP:
if (node.isLong() && TS_MS_FIELDS.contains(field.name())) {
val = OffsetDateTime.ofInstant(Instant.ofEpochMilli(node.longValue()), ZoneOffset.UTC);
} else if (node.isTextual()) {
val = OffsetDateTime.parse(node.asText());
} else {
throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node);
}
break;
case BINARY:
try {
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
Expand Down Expand Up @@ -218,7 +223,7 @@ private List<Types.NestedField> KeySchemaFields() {
private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0, true);
return icebergSchema(valueSchema, "", 0);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
Expand Down Expand Up @@ -264,10 +269,6 @@ public Schema icebergSchema() {
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId, boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand All @@ -286,7 +287,7 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(listItemType);
Type.PrimitiveType item = icebergFieldType(fieldName, listItemType);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
Expand All @@ -302,15 +303,11 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldName, fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier,
Schema schema, String writeFormat, boolean partition) {
Schema schema, String writeFormat, boolean partition, String partitionField) {

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

final PartitionSpec ps;
if (partition && schema.findField("__source_ts") != null) {
ps = PartitionSpec.builderFor(schema).day("__source_ts").build();
if (partition) {
if (schema.findField(partitionField) == null) {
LOGGER.warn("Table schema dont contain partition field {}! Creating table without partition", partition);
ps = PartitionSpec.builderFor(schema).build();
} else {
ps = PartitionSpec.builderFor(schema).day(partitionField).build();
}
} else {
ps = PartitionSpec.builderFor(schema).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public void testIcebergChangeEventBuilder() {
optional(3, "preferences", Types.StructType.of(
optional(4, "feature1", Types.BooleanType.get()),
optional(5, "feature2", Types.BooleanType.get())
)),
optional(6, "__source_ts", Types.TimestampType.withZone())
))
)
, Set.of(1)
);
Expand All @@ -57,8 +56,7 @@ public void testIcebergChangeEventBuilder() {
optional(3, "preferences", Types.StructType.of(
optional(4, "feature1", Types.BooleanType.get()),
optional(5, "feature2", Types.BooleanType.get())
)),
optional(6, "__source_ts", Types.TimestampType.withZone())
))
);

b = new IcebergChangeEventBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class IcebergTableOperatorTest extends BaseSparkTest {
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms")
String partitionField;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;
@Inject
Expand All @@ -56,7 +58,7 @@ class IcebergTableOperatorTest extends BaseSparkTest {
public Table createTable(IcebergChangeEvent sampleEvent) {
HadoopCatalog icebergCatalog = getIcebergCatalog();
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert, partitionField);
}

@Test
Expand Down

0 comments on commit 1419aae

Please sign in to comment.