Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert __source_ts_ms and __ts_ms to TimestampType.withZone type #108

Merged
merged 1 commit into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms")
String partitionField;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
Expand Down Expand Up @@ -188,7 +190,9 @@ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, I
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat,
!upsert, // partition if its append mode
partitionField);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class IcebergChangeEvent {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
Expand Down Expand Up @@ -63,19 +64,10 @@ public String destination() {
}

public GenericRecord asIcebergRecord(Schema schema) {
final GenericRecord record = asIcebergRecord(schema.asStruct(), value);

if (value != null && value.has("__source_ts_ms") && value.get("__source_ts_ms") != null) {
final long source_ts_ms = value.get("__source_ts_ms").longValue();
final OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.ofEpochMilli(source_ts_ms), ZoneOffset.UTC);
record.setField("__source_ts", odt);
} else {
record.setField("__source_ts", null);
}
return record;
return asIcebergRecord(schema.asStruct(), value);
}

private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);

Expand All @@ -92,14 +84,18 @@ private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode dat
return record;
}

private Type.PrimitiveType icebergFieldType(String fieldType) {
private static Type.PrimitiveType icebergFieldType(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
return Types.LongType.get();
if (TS_MS_FIELDS.contains(fieldName)) {
return Types.TimestampType.withZone();
} else {
return Types.LongType.get();
}
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
Expand All @@ -121,7 +117,7 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
}
}

private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand All @@ -147,6 +143,15 @@ private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
case UUID:
val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString());
break;
case TIMESTAMP:
if (node.isLong() && TS_MS_FIELDS.contains(field.name())) {
val = OffsetDateTime.ofInstant(Instant.ofEpochMilli(node.longValue()), ZoneOffset.UTC);
} else if (node.isTextual()) {
val = OffsetDateTime.parse(node.asText());
} else {
throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node);
}
break;
case BINARY:
try {
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
Expand Down Expand Up @@ -218,7 +223,7 @@ private List<Types.NestedField> KeySchemaFields() {
private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0, true);
return icebergSchema(valueSchema, "", 0);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
Expand Down Expand Up @@ -264,10 +269,6 @@ public Schema icebergSchema() {
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId, boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand All @@ -286,7 +287,7 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(listItemType);
Type.PrimitiveType item = icebergFieldType(fieldName, listItemType);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
Expand All @@ -302,15 +303,11 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldName, fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier,
Schema schema, String writeFormat, boolean partition) {
Schema schema, String writeFormat, boolean partition, String partitionField) {

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

final PartitionSpec ps;
if (partition && schema.findField("__source_ts") != null) {
ps = PartitionSpec.builderFor(schema).day("__source_ts").build();
if (partition) {
if (schema.findField(partitionField) == null) {
LOGGER.warn("Table schema dont contain partition field {}! Creating table without partition", partition);
ps = PartitionSpec.builderFor(schema).build();
} else {
ps = PartitionSpec.builderFor(schema).day(partitionField).build();
}
} else {
ps = PartitionSpec.builderFor(schema).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public void testIcebergChangeEventBuilder() {
optional(3, "preferences", Types.StructType.of(
optional(4, "feature1", Types.BooleanType.get()),
optional(5, "feature2", Types.BooleanType.get())
)),
optional(6, "__source_ts", Types.TimestampType.withZone())
))
)
, Set.of(1)
);
Expand All @@ -57,8 +56,7 @@ public void testIcebergChangeEventBuilder() {
optional(3, "preferences", Types.StructType.of(
optional(4, "feature1", Types.BooleanType.get()),
optional(5, "feature2", Types.BooleanType.get())
)),
optional(6, "__source_ts", Types.TimestampType.withZone())
))
);

b = new IcebergChangeEventBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class IcebergTableOperatorTest extends BaseSparkTest {
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms")
String partitionField;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;
@Inject
Expand All @@ -56,7 +58,7 @@ class IcebergTableOperatorTest extends BaseSparkTest {
public Table createTable(IcebergChangeEvent sampleEvent) {
HadoopCatalog icebergCatalog = getIcebergCatalog();
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert, partitionField);
}

@Test
Expand Down