Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,9 @@ AZURE:
'azure/**/*',
'azure-bundle/**/*'
]

KAFKACONNECT:
- changed-files:
- any-glob-to-any-file: [
'kafka-connect/**/*'
]
13 changes: 10 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand Down Expand Up @@ -59,7 +60,7 @@ public static Schema convert(org.apache.iceberg.Schema schema, String tableName)

public static Schema convert(
org.apache.iceberg.Schema schema, Map<Types.StructType, String> names) {
return TypeUtil.visit(schema, new TypeToSchema(names));
return TypeUtil.visit(schema, new TypeToSchema.WithTypeToName(names));
}

public static Schema convert(Type type) {
Expand All @@ -71,7 +72,12 @@ public static Schema convert(Types.StructType type, String name) {
}

public static Schema convert(Type type, Map<Types.StructType, String> names) {
return TypeUtil.visit(type, new TypeToSchema(names));
return TypeUtil.visit(type, new TypeToSchema.WithTypeToName(names));
}

public static Schema convert(
Type type, BiFunction<Integer, Types.StructType, String> namesFunction) {
return TypeUtil.visit(type, new TypeToSchema.WithNamesFunction(namesFunction));
}

public static Type convert(Schema schema) {
Expand Down Expand Up @@ -111,7 +117,8 @@ static boolean missingIds(Schema schema) {
}

public static Map<Type, Schema> convertTypes(Types.StructType type, String name) {
TypeToSchema converter = new TypeToSchema(ImmutableMap.of(type, name));
TypeToSchema.WithTypeToName converter =
new TypeToSchema.WithTypeToName(ImmutableMap.of(type, name));
TypeUtil.visit(type, converter);
return ImmutableMap.copyOf(converter.getConversionMap());
}
Expand Down
94 changes: 73 additions & 21 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -30,7 +31,7 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
private static final Schema BOOLEAN_SCHEMA = Schema.create(Schema.Type.BOOLEAN);
private static final Schema INTEGER_SCHEMA = Schema.create(Schema.Type.INT);
private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG);
Expand All @@ -55,15 +56,10 @@ class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
}

private final Deque<Integer> fieldIds = Lists.newLinkedList();
private final Map<Type, Schema> results = Maps.newHashMap();
private final Map<Types.StructType, String> names;
private final BiFunction<Integer, Types.StructType, String> namesFunction;

TypeToSchema(Map<Types.StructType, String> names) {
this.names = names;
}

Map<Type, Schema> getConversionMap() {
return results;
TypeToSchema(BiFunction<Integer, Types.StructType, String> namesFunction) {
this.namesFunction = namesFunction;
}

@Override
Expand All @@ -81,16 +77,29 @@ public void afterField(Types.NestedField field) {
fieldIds.pop();
}

Schema lookupSchema(Type type) {
return lookupSchema(type, null);
}

abstract Schema lookupSchema(Type type, String recordName);

void cacheSchema(Type struct, Schema schema) {
cacheSchema(struct, null, schema);
}

abstract void cacheSchema(Type struct, String recordName, Schema schema);

@Override
public Schema struct(Types.StructType struct, List<Schema> fieldSchemas) {
Schema recordSchema = results.get(struct);
if (recordSchema != null) {
return recordSchema;
Integer fieldId = fieldIds.peek();
String recordName = namesFunction.apply(fieldId, struct);
if (recordName == null) {
recordName = "r" + fieldId;
}

String recordName = names.get(struct);
if (recordName == null) {
recordName = "r" + fieldIds.peek();
Schema recordSchema = lookupSchema(struct, recordName);
if (recordSchema != null) {
return recordSchema;
}

List<Types.NestedField> structFields = struct.fields();
Expand All @@ -115,7 +124,7 @@ public Schema struct(Types.StructType struct, List<Schema> fieldSchemas) {

recordSchema = Schema.createRecord(recordName, null, null, false, fields);

results.put(struct, recordSchema);
cacheSchema(struct, recordName, recordSchema);

return recordSchema;
}
Expand All @@ -131,7 +140,7 @@ public Schema field(Types.NestedField field, Schema fieldSchema) {

@Override
public Schema list(Types.ListType list, Schema elementSchema) {
Schema listSchema = results.get(list);
Schema listSchema = lookupSchema(list);
if (listSchema != null) {
return listSchema;
}
Expand All @@ -144,14 +153,14 @@ public Schema list(Types.ListType list, Schema elementSchema) {

listSchema.addProp(AvroSchemaUtil.ELEMENT_ID_PROP, list.elementId());

results.put(list, listSchema);
cacheSchema(list, listSchema);

return listSchema;
}

@Override
public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) {
Schema mapSchema = results.get(map);
Schema mapSchema = lookupSchema(map);
if (mapSchema != null) {
return mapSchema;
}
Expand All @@ -173,7 +182,7 @@ public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) {
map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema);
}

results.put(map, mapSchema);
cacheSchema(map, mapSchema);

return mapSchema;
}
Expand Down Expand Up @@ -238,8 +247,51 @@ public Schema primitive(Type.PrimitiveType primitive) {
throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId());
}

results.put(primitive, primitiveSchema);
cacheSchema(primitive, primitiveSchema);

return primitiveSchema;
}

static class WithTypeToName extends TypeToSchema {

private final Map<Type, Schema> results = Maps.newHashMap();

WithTypeToName(Map<Types.StructType, String> names) {
super((id, struct) -> names.get(struct));
}

Map<Type, Schema> getConversionMap() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<Type, Schema> getConversionMap() {
Map<Type, Schema> conversionMap() {

return results;
}

@Override
void cacheSchema(Type type, String recordName, Schema schema) {
results.put(type, schema);
}

@Override
Schema lookupSchema(Type type, String recordName) {
return results.get(type);
}
}

static class WithNamesFunction extends TypeToSchema {
private final Map<String, Schema> schemaCache = Maps.newHashMap();

WithNamesFunction(BiFunction<Integer, Types.StructType, String> namesFunction) {
super(namesFunction);
}

@Override
void cacheSchema(Type type, String recordName, Schema schema) {
if (recordName != null) {
schemaCache.put(recordName, schema);
}
}

@Override
Schema lookupSchema(Type type, String recordName) {
return recordName == null ? null : schemaCache.get(recordName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public static <T> T resolveAndRead(
return value;
}

public static void clearCache() {
DECODER_CACHES.get().clear();
}

@VisibleForTesting
static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema fileSchema)
throws IOException {
Expand Down
32 changes: 32 additions & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

project(":iceberg-kafka-connect:iceberg-kafka-connect-events") {
dependencies {
api project(':iceberg-api')
implementation project(':iceberg-core')
implementation project(':iceberg-common')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
implementation libs.avro.avro
}

test {
useJUnitPlatform()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect.events;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

/** Class for Avro-related utility methods. */
class AvroUtil {
static final Map<Integer, String> FIELD_ID_TO_CLASS =
ImmutableMap.of(
DataComplete.ASSIGNMENTS_ELEMENT,
TopicPartitionOffset.class.getName(),
DataFile.PARTITION_ID,
PartitionData.class.getName(),
DataWritten.TABLE_REFERENCE,
TableReference.class.getName(),
DataWritten.DATA_FILES_ELEMENT,
"org.apache.iceberg.GenericDataFile",
DataWritten.DELETE_FILES_ELEMENT,
"org.apache.iceberg.GenericDeleteFile",
CommitToTable.TABLE_REFERENCE,
TableReference.class.getName());

public static byte[] encode(Event event) {
try {
return AvroEncoderUtil.encode(event, event.getSchema());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static Event decode(byte[] bytes) {
try {
Event event = AvroEncoderUtil.decode(bytes);
// clear the cache to avoid memory leak
DecoderResolver.clearCache();
return event;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

static Schema convert(Types.StructType icebergSchema, Class<? extends IndexedRecord> javaClass) {
return convert(icebergSchema, javaClass, FIELD_ID_TO_CLASS);
}

static Schema convert(
Types.StructType icebergSchema,
Class<? extends IndexedRecord> javaClass,
Map<Integer, String> typeMap) {
return AvroSchemaUtil.convert(
icebergSchema,
(fieldId, struct) ->
struct.equals(icebergSchema) ? javaClass.getName() : typeMap.get(fieldId));
}

static int positionToId(int position, Schema avroSchema) {
List<Schema.Field> fields = avroSchema.getFields();
Preconditions.checkArgument(
position >= 0 && position < fields.size(), "Invalid field position: " + position);
Object val = fields.get(position).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP);
return val == null ? -1 : (int) val;
}

private AvroUtil() {}
}
Loading