From a3b98d7e893c3490f5eaadd2a6bf75624b1eeae6 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 4 May 2024 11:05:53 -0700 Subject: [PATCH 1/4] Kafka Connect: Commit coordination --- .../connect/events/TableReference.java | 20 ++ .../iceberg/connect/events/EventTestUtil.java | 3 +- .../apache/iceberg/connect/CatalogUtils.java | 98 ++++++ .../org/apache/iceberg/connect/Committer.java | 37 +++ .../iceberg/connect/CommitterFactory.java | 29 ++ .../iceberg/connect/IcebergSinkConfig.java | 23 +- .../iceberg/connect/IcebergSinkConnector.java | 4 +- .../iceberg/connect/IcebergSinkTask.java | 107 ++++++ .../iceberg/connect/channel/Channel.java | 171 ++++++++++ .../iceberg/connect/channel/CommitState.java | 167 ++++++++++ .../connect/channel/CommitterImpl.java | 130 ++++++++ .../iceberg/connect/channel/Coordinator.java | 311 ++++++++++++++++++ .../connect/channel/CoordinatorThread.java | 69 ++++ .../iceberg/connect/channel/Envelope.java | 45 +++ .../connect/channel/KafkaClientFactory.java | 68 ++++ .../iceberg/connect/channel/KafkaUtils.java | 70 ++++ .../connect/channel/NotRunningException.java | 25 ++ .../iceberg/connect/channel/Worker.java | 126 +++++++ .../iceberg/connect/data/IcebergWriter.java | 14 +- .../connect/data/IcebergWriterFactory.java | 7 +- ...erResult.java => IcebergWriterResult.java} | 4 +- .../iceberg/connect/data/NoOpWriter.java | 5 +- .../apache/iceberg/connect/data/Offset.java | 54 +++ .../data/{Utilities.java => RecordUtils.java} | 78 +---- .../iceberg/connect/data/RecordWriter.java | 4 +- .../iceberg/connect/data/SinkWriter.java | 140 ++++++++ .../connect/data/SinkWriterResult.java | 42 +++ ...ilitiesTest.java => CatalogUtilsTest.java} | 74 +---- .../connect/channel/ChannelTestBase.java | 131 ++++++++ .../connect/channel/CommitStateTest.java | 107 ++++++ .../connect/channel/CommitterImplTest.java | 60 ++++ .../connect/channel/CoordinatorTest.java | 213 ++++++++++++ .../channel/CoordinatorThreadTest.java | 48 +++ .../connect/channel/EventTestUtil.java | 98 ++++++ .../iceberg/connect/channel/WorkerTest.java | 105 ++++++ .../iceberg/connect/data/BaseWriterTest.java | 2 +- .../iceberg/connect/data/RecordUtilsTest.java | 93 ++++++ .../connect/data/SchemaUpdateTest.java | 68 ++++ .../iceberg/connect/data/SinkWriterTest.java | 210 ++++++++++++ 39 files changed, 2870 insertions(+), 190 deletions(-) create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java rename kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/{WriterResult.java => IcebergWriterResult.java} (96%) create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java rename kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/{Utilities.java => RecordUtils.java} (68%) create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java rename kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/{data/UtilitiesTest.java => CatalogUtilsTest.java} (60%) create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index 50eaa1050485..f30eac892400 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -120,4 +121,23 @@ public Object get(int i) { throw new UnsupportedOperationException("Unknown field ordinal: " + i); } } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableReference that = (TableReference) o; + return Objects.equals(catalog, that.catalog) + && Objects.equals(namespace, that.namespace) + && Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(catalog, namespace, name); + } } diff --git a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java index 8f1f7a601f86..48e268bf0561 100644 --- a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java +++ b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java @@ -44,8 +44,7 @@ private EventTestUtil() {} static final Schema SCHEMA = new Schema(ImmutableList.of(Types.NestedField.required(1, "id", Types.LongType.get()))); - static final PartitionSpec SPEC = - PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build(); + static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); static final SortOrder ORDER = SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC, NullOrder.NULLS_FIRST).build(); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java new file mode 100644 index 000000000000..a3c6358e1bdf --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CatalogUtils { + + private static final Logger LOG = LoggerFactory.getLogger(CatalogUtils.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + static Catalog loadCatalog(IcebergSinkConfig config) { + return CatalogUtil.buildIcebergCatalog( + config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { + Class configClass = + DynClasses.builder() + .impl("org.apache.hadoop.hdfs.HdfsConfiguration") + .impl("org.apache.hadoop.conf.Configuration") + .orNull() + .build(); + + if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; + } + + try { + Object result = DynConstructors.builder().hiddenImpl(configClass).build().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { + HADOOP_CONF_FILES.forEach( + confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { + try { + addResourceMethod.invoke(path.toUri().toURL()); + } catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); + } + } + }); + } + + // set any Hadoop properties specified in the sink config + config.hadoopProps().forEach(setMethod::invoke); + + LOG.info("Hadoop config initialized: {}", configClass.getName()); + return result; + } catch (Exception e) { + LOG.warn( + "Hadoop found on classpath but could not create config, proceeding without config", e); + } + return null; + } + + private CatalogUtils() {} +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java new file mode 100644 index 000000000000..47cb47c6147d --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import org.apache.iceberg.catalog.Catalog; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; + +public interface Committer { + void start( + Catalog catalog, + IcebergSinkConfig config, + SinkTaskContext context, + Collection partitions); + + void stop(); + + void save(Collection sinkRecords); +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java new file mode 100644 index 000000000000..edb2c518f11b --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import org.apache.iceberg.connect.channel.CommitterImpl; + +public class CommitterFactory { + public static Committer createCommitter(IcebergSinkConfig config) { + return new CommitterImpl(); + } + + private CommitterFactory() {} +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index e64e183089cf..aed11ab0b169 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -80,7 +80,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = "iceberg.tables.schema-case-insensitive"; private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; - private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id"; private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000; private static final String COMMIT_TIMEOUT_MS_PROP = "iceberg.control.commit.timeout-ms"; @@ -104,11 +103,7 @@ public class IcebergSinkConfig extends AbstractConfig { public static final ConfigDef CONFIG_DEF = newConfigDef(); public static String version() { - String kcVersion = IcebergSinkConfig.class.getPackage().getImplementationVersion(); - if (kcVersion == null) { - kcVersion = "unknown"; - } - return IcebergBuild.version() + "-kc-" + kcVersion; + return IcebergBuild.version(); } private static ConfigDef newConfigDef() { @@ -185,12 +180,6 @@ private static ConfigDef newConfigDef() { DEFAULT_CONTROL_TOPIC, Importance.MEDIUM, "Name of the control topic"); - configDef.define( - CONTROL_GROUP_ID_PROP, - ConfigDef.Type.STRING, - null, - Importance.MEDIUM, - "Name of the consumer group to store offsets"); configDef.define( CONNECT_GROUP_ID_PROP, ConfigDef.Type.STRING, @@ -370,16 +359,6 @@ public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } - public String controlGroupId() { - String result = getString(CONTROL_GROUP_ID_PROP); - if (result != null) { - return result; - } - String connectorName = connectorName(); - Preconditions.checkNotNull(connectorName, "Connector name cannot be null"); - return DEFAULT_CONTROL_GROUP_PREFIX + connectorName; - } - public String connectGroupId() { String result = getString(CONNECT_GROUP_ID_PROP); if (result != null) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java index 8be8518f4407..be1f9a50b8f6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java @@ -44,9 +44,7 @@ public void start(Map connectorProps) { @Override public Class taskClass() { - // FIXME: update this when the connector channel is added - // return IcebergSinkTask.class; - return null; + return IcebergSinkTask.class; } @Override diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java new file mode 100644 index 000000000000..d69e59824343 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkTask extends SinkTask { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); + + private IcebergSinkConfig config; + private Catalog catalog; + private Committer committer; + + @Override + public String version() { + return IcebergSinkConfig.version(); + } + + @Override + public void start(Map props) { + this.config = new IcebergSinkConfig(props); + } + + @Override + public void open(Collection partitions) { + Preconditions.checkArgument(catalog == null, "Catalog already open"); + Preconditions.checkArgument(committer == null, "Committer already open"); + + catalog = CatalogUtils.loadCatalog(config); + committer = CommitterFactory.createCommitter(config); + committer.start(catalog, config, context, partitions); + } + + @Override + public void close(Collection partitions) { + close(); + } + + private void close() { + if (committer != null) { + committer.stop(); + committer = null; + } + + if (catalog != null) { + if (catalog instanceof AutoCloseable) { + try { + ((AutoCloseable) catalog).close(); + } catch (Exception e) { + LOG.warn("An error occurred closing catalog instance, ignoring...", e); + } + } + catalog = null; + } + } + + @Override + public void put(Collection sinkRecords) { + Preconditions.checkNotNull(committer, "Committer wasn't initialized"); + committer.save(sinkRecords); + } + + @Override + public void flush(Map currentOffsets) { + Preconditions.checkNotNull(committer, "Committer wasn't initialized"); + committer.save(null); + } + + @Override + public Map preCommit( + Map currentOffsets) { + // offset commit is handled by the worker + return ImmutableMap.of(); + } + + @Override + public void stop() { + close(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java new file mode 100644 index 000000000000..ef71ed7b9621 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.Offset; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class Channel { + + private static final Logger LOG = LoggerFactory.getLogger(Channel.class); + + private final String controlTopic; + private final String connectGroupId; + private final Producer producer; + private final Consumer consumer; + private final SinkTaskContext context; + private final Admin admin; + private final Map controlTopicOffsets = Maps.newHashMap(); + private final String producerId; + + Channel( + String name, + String consumerGroupId, + IcebergSinkConfig config, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + this.controlTopic = config.controlTopic(); + this.connectGroupId = config.connectGroupId(); + this.context = context; + + String transactionalId = name + config.transactionalSuffix(); + this.producer = clientFactory.createProducer(transactionalId); + this.consumer = clientFactory.createConsumer(consumerGroupId); + this.admin = clientFactory.createAdmin(); + + this.producerId = UUID.randomUUID().toString(); + } + + protected void send(Event event) { + send(ImmutableList.of(event), ImmutableMap.of()); + } + + @SuppressWarnings("FutureReturnValueIgnored") + protected void send(List events, Map sourceOffsets) { + Map offsetsToCommit = Maps.newHashMap(); + sourceOffsets.forEach((k, v) -> offsetsToCommit.put(k, new OffsetAndMetadata(v.offset()))); + + List> recordList = + events.stream() + .map( + event -> { + LOG.info("Sending event of type: {}", event.type().name()); + byte[] data = AvroUtil.encode(event); + // key by producer ID to keep event order + return new ProducerRecord<>(controlTopic, producerId, data); + }) + .collect(Collectors.toList()); + + synchronized (producer) { + producer.beginTransaction(); + try { + // NOTE: we shouldn't call get() on the future in a transactional context, + // see docs for org.apache.kafka.clients.producer.KafkaProducer + recordList.forEach(producer::send); + if (!sourceOffsets.isEmpty()) { + producer.sendOffsetsToTransaction( + offsetsToCommit, KafkaUtils.consumerGroupMetadata(context, connectGroupId)); + } + producer.commitTransaction(); + } catch (Exception e) { + try { + producer.abortTransaction(); + } catch (Exception ex) { + LOG.warn("Error aborting producer transaction", ex); + } + throw e; + } + } + } + + protected abstract boolean receive(Envelope envelope); + + protected void consumeAvailable(Duration pollDuration) { + ConsumerRecords records = consumer.poll(pollDuration); + while (!records.isEmpty()) { + records.forEach( + record -> { + // the consumer stores the offsets that corresponds to the next record to consume, + // so increment the record offset by one + controlTopicOffsets.put(record.partition(), record.offset() + 1); + + Event event = AvroUtil.decode(record.value()); + + if (event.groupId().equals(connectGroupId)) { + LOG.debug("Received event of type: {}", event.type().name()); + if (receive(new Envelope(event, record.partition(), record.offset()))) { + LOG.info("Handled event of type: {}", event.type().name()); + } + } + }); + records = consumer.poll(pollDuration); + } + } + + protected Map controlTopicOffsets() { + return controlTopicOffsets; + } + + protected void commitConsumerOffsets() { + Map offsetsToCommit = Maps.newHashMap(); + controlTopicOffsets() + .forEach( + (k, v) -> + offsetsToCommit.put(new TopicPartition(controlTopic, k), new OffsetAndMetadata(v))); + consumer.commitSync(offsetsToCommit); + } + + protected Admin admin() { + return admin; + } + + void start() { + consumer.subscribe(ImmutableList.of(controlTopic)); + + // initial poll with longer duration so the consumer will initialize... + consumeAvailable(Duration.ofSeconds(1)); + } + + void stop() { + LOG.info("Channel stopping"); + producer.close(); + consumer.close(); + admin.close(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java new file mode 100644 index 000000000000..f3ac850d12f9 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import java.time.OffsetDateTime; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CommitState { + private static final Logger LOG = LoggerFactory.getLogger(CommitState.class); + + private final List commitBuffer = Lists.newArrayList(); + private final List readyBuffer = Lists.newArrayList(); + private long startTime; + private UUID currentCommitId; + private final IcebergSinkConfig config; + + CommitState(IcebergSinkConfig config) { + this.config = config; + } + + void addResponse(Envelope envelope) { + commitBuffer.add(envelope); + if (!isCommitInProgress()) { + DataWritten dataWritten = (DataWritten) envelope.event().payload(); + LOG.warn( + "Received commit response when no commit in progress, this can happen during recovery. Commit ID: {}", + dataWritten.commitId()); + } + } + + void addReady(Envelope envelope) { + DataComplete dataComplete = (DataComplete) envelope.event().payload(); + readyBuffer.add(dataComplete); + if (!isCommitInProgress()) { + LOG.warn( + "Received commit ready when no commit in progress, this can happen during recovery. Commit ID: {}", + dataComplete.commitId()); + } + } + + UUID currentCommitId() { + return currentCommitId; + } + + boolean isCommitInProgress() { + return currentCommitId != null; + } + + boolean isCommitIntervalReached() { + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + + return (!isCommitInProgress() + && System.currentTimeMillis() - startTime >= config.commitIntervalMs()); + } + + void startNewCommit() { + currentCommitId = UUID.randomUUID(); + startTime = System.currentTimeMillis(); + } + + void endCurrentCommit() { + readyBuffer.clear(); + currentCommitId = null; + } + + void clearResponses() { + commitBuffer.clear(); + } + + boolean isCommitTimedOut() { + if (!isCommitInProgress()) { + return false; + } + + if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) { + LOG.info("Commit timeout reached. Commit ID: {}", currentCommitId); + return true; + } + return false; + } + + boolean isCommitReady(int expectedPartitionCount) { + if (!isCommitInProgress()) { + return false; + } + + int receivedPartitionCount = + readyBuffer.stream() + .filter(payload -> payload.commitId().equals(currentCommitId)) + .mapToInt(payload -> payload.assignments().size()) + .sum(); + + if (receivedPartitionCount >= expectedPartitionCount) { + LOG.info( + "Commit {} ready, received responses for all {} partitions", + currentCommitId, + receivedPartitionCount); + return true; + } + + LOG.info( + "Commit {} not ready, received responses for {} of {} partitions, waiting for more", + currentCommitId, + receivedPartitionCount, + expectedPartitionCount); + + return false; + } + + Map> tableCommitMap() { + return commitBuffer.stream() + .collect( + Collectors.groupingBy( + envelope -> ((DataWritten) envelope.event().payload()).tableReference())); + } + + OffsetDateTime vtts(boolean partialCommit) { + boolean validVtts = + !partialCommit + && readyBuffer.stream() + .flatMap(event -> event.assignments().stream()) + .allMatch(offset -> offset.timestamp() != null); + + OffsetDateTime result; + if (validVtts) { + result = + readyBuffer.stream() + .flatMap(event -> event.assignments().stream()) + .map(TopicPartitionOffset::timestamp) + .min(Comparator.naturalOrder()) + .get(); + } else { + result = null; + } + return result; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java new file mode 100644 index 000000000000..6e28e7ba2eb2 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import java.util.Collection; +import java.util.Comparator; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.Committer; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.SinkWriter; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CommitterImpl implements Committer { + + private static final Logger LOG = LoggerFactory.getLogger(CommitterImpl.class); + + private CoordinatorThread coordinatorThread; + private Worker worker; + + static class TopicPartitionComparator implements Comparator { + + @Override + public int compare(TopicPartition o1, TopicPartition o2) { + int result = o1.topic().compareTo(o2.topic()); + if (result == 0) { + result = Integer.compare(o1.partition(), o2.partition()); + } + return result; + } + } + + @Override + public void start( + Catalog catalog, + IcebergSinkConfig config, + SinkTaskContext context, + Collection partitions) { + KafkaClientFactory clientFactory = new KafkaClientFactory(config.kafkaProps()); + + ConsumerGroupDescription groupDesc; + try (Admin admin = clientFactory.createAdmin()) { + groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin); + } + + if (groupDesc.state() == ConsumerGroupState.STABLE) { + Collection members = groupDesc.members(); + if (isLeader(members, partitions)) { + LOG.info("Task elected leader, starting commit coordinator"); + Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory, context); + coordinatorThread = new CoordinatorThread(coordinator); + coordinatorThread.start(); + } + } + + LOG.info("Starting commit worker"); + SinkWriter sinkWriter = new SinkWriter(catalog, config); + worker = new Worker(config, clientFactory, sinkWriter, context); + worker.start(); + } + + @Override + public void save(Collection sinkRecords) { + if (sinkRecords != null && !sinkRecords.isEmpty()) { + worker.save(sinkRecords); + } + processControlEvents(); + } + + @Override + public void stop() { + if (worker != null) { + worker.stop(); + worker = null; + } + + if (coordinatorThread != null) { + coordinatorThread.terminate(); + coordinatorThread = null; + } + } + + @VisibleForTesting + boolean isLeader(Collection members, Collection partitions) { + // there should only be one task assigned partition 0 of the first topic, + // so elect that one the leader + TopicPartition firstTopicPartition = + members.stream() + .flatMap(member -> member.assignment().topicPartitions().stream()) + .min(new TopicPartitionComparator()) + .orElseThrow( + () -> new ConnectException("No partitions assigned, cannot determine leader")); + + return partitions.contains(firstTopicPartition); + } + + private void processControlEvents() { + if (coordinatorThread != null && coordinatorThread.isTerminated()) { + throw new NotRunningException("Coordinator unexpectedly terminated"); + } + if (worker != null) { + worker.process(); + } + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java new file mode 100644 index 000000000000..80e34dd768d5 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class Coordinator extends Channel { + + private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; + private static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts"; + private static final Duration POLL_DURATION = Duration.ofSeconds(1); + + private final Catalog catalog; + private final IcebergSinkConfig config; + private final int totalPartitionCount; + private final String snapshotOffsetsProp; + private final ExecutorService exec; + private final CommitState commitState; + + Coordinator( + Catalog catalog, + IcebergSinkConfig config, + Collection members, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + // pass consumer group ID to which we commit low watermark offsets + super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); + + this.catalog = catalog; + this.config = config; + this.totalPartitionCount = + members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); + this.snapshotOffsetsProp = + String.format( + "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); + this.exec = ThreadPools.newWorkerPool("iceberg-committer", config.commitThreads()); + this.commitState = new CommitState(config); + } + + void process() { + if (commitState.isCommitIntervalReached()) { + // send out begin commit + commitState.startNewCommit(); + Event event = + new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); + send(event); + LOG.info("Commit {} initiated", commitState.currentCommitId()); + } + + consumeAvailable(POLL_DURATION); + + if (commitState.isCommitTimedOut()) { + commit(true); + } + } + + @Override + protected boolean receive(Envelope envelope) { + switch (envelope.event().payload().type()) { + case DATA_WRITTEN: + commitState.addResponse(envelope); + return true; + case DATA_COMPLETE: + commitState.addReady(envelope); + if (commitState.isCommitReady(totalPartitionCount)) { + commit(false); + } + return true; + } + return false; + } + + private void commit(boolean partialCommit) { + try { + doCommit(partialCommit); + } catch (Exception e) { + LOG.warn("Commit failed, will try again next cycle", e); + } finally { + commitState.endCurrentCommit(); + } + } + + private void doCommit(boolean partialCommit) { + Map> commitMap = commitState.tableCommitMap(); + + String offsetsJson = offsetsJson(); + OffsetDateTime vtts = commitState.vtts(partialCommit); + + Tasks.foreach(commitMap.entrySet()) + .executeWith(exec) + .stopOnFailure() + .run( + entry -> { + commitToTable(entry.getKey(), entry.getValue(), offsetsJson, vtts); + }); + + // we should only get here if all tables committed successfully... + commitConsumerOffsets(); + commitState.clearResponses(); + + Event event = + new Event(config.connectGroupId(), new CommitComplete(commitState.currentCommitId(), vtts)); + send(event); + + LOG.info( + "Commit {} complete, committed to {} table(s), vtts {}", + commitState.currentCommitId(), + commitMap.size(), + vtts); + } + + private String offsetsJson() { + try { + return MAPPER.writeValueAsString(controlTopicOffsets()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void commitToTable( + TableReference tableReference, + List envelopeList, + String offsetsJson, + OffsetDateTime vtts) { + TableIdentifier tableIdentifier = tableReference.identifier(); + Table table; + try { + table = catalog.loadTable(tableIdentifier); + } catch (NoSuchTableException e) { + LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); + return; + } + + String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); + + Map committedOffsets = lastCommittedOffsetsForTable(table, branch); + + List payloads = + envelopeList.stream() + .filter( + envelope -> { + Long minOffset = committedOffsets.get(envelope.partition()); + return minOffset == null || envelope.offset() >= minOffset; + }) + .map(envelope -> (DataWritten) envelope.event().payload()) + .collect(Collectors.toList()); + + List dataFiles = + payloads.stream() + .filter(payload -> payload.dataFiles() != null) + .flatMap(payload -> payload.dataFiles().stream()) + .filter(dataFile -> dataFile.recordCount() > 0) + .filter(distinctByKey(dataFile -> dataFile.path().toString())) + .collect(Collectors.toList()); + + List deleteFiles = + payloads.stream() + .filter(payload -> payload.deleteFiles() != null) + .flatMap(payload -> payload.deleteFiles().stream()) + .filter(deleteFile -> deleteFile.recordCount() > 0) + .filter(distinctByKey(deleteFile -> deleteFile.path().toString())) + .collect(Collectors.toList()); + + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); + } else { + if (deleteFiles.isEmpty()) { + AppendFiles appendOp = table.newAppend(); + if (branch != null) { + appendOp.toBranch(branch); + } + appendOp.set(snapshotOffsetsProp, offsetsJson); + appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (vtts != null) { + appendOp.set(VTTS_SNAPSHOT_PROP, vtts.toString()); + } + dataFiles.forEach(appendOp::appendFile); + appendOp.commit(); + } else { + RowDelta deltaOp = table.newRowDelta(); + if (branch != null) { + deltaOp.toBranch(branch); + } + deltaOp.set(snapshotOffsetsProp, offsetsJson); + deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (vtts != null) { + deltaOp.set(VTTS_SNAPSHOT_PROP, vtts.toString()); + } + dataFiles.forEach(deltaOp::addRows); + deleteFiles.forEach(deltaOp::addDeletes); + deltaOp.commit(); + } + + Long snapshotId = latestSnapshot(table, branch).snapshotId(); + Event event = + new Event( + config.connectGroupId(), + new CommitToTable(commitState.currentCommitId(), tableReference, snapshotId, vtts)); + send(event); + + LOG.info( + "Commit complete to table {}, snapshot {}, commit ID {}, vtts {}", + tableIdentifier, + snapshotId, + commitState.currentCommitId(), + vtts); + } + } + + private Predicate distinctByKey(Function keyExtractor) { + Map seen = Maps.newConcurrentMap(); + return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; + } + + private Snapshot latestSnapshot(Table table, String branch) { + if (branch == null) { + return table.currentSnapshot(); + } + return table.snapshot(branch); + } + + private Map lastCommittedOffsetsForTable(Table table, String branch) { + Snapshot snapshot = latestSnapshot(table, branch); + while (snapshot != null) { + Map summary = snapshot.summary(); + String value = summary.get(snapshotOffsetsProp); + if (value != null) { + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(value, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + return ImmutableMap.of(); + } + + @Override + void stop() { + exec.shutdownNow(); + + // ensure coordinator tasks are shut down, else cause the sink worker to fail + try { + if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { + throw new RuntimeException("Timed out waiting for coordinator shutdown"); + } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for coordinator shutdown", e); + } + + super.stop(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java new file mode 100644 index 000000000000..6a31b17fc606 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CoordinatorThread extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorThread.class); + private static final String THREAD_NAME = "iceberg-coord"; + + private Coordinator coordinator; + private volatile boolean terminated; + + CoordinatorThread(Coordinator coordinator) { + super(THREAD_NAME); + this.coordinator = coordinator; + } + + @Override + public void run() { + try { + coordinator.start(); + } catch (Exception e) { + LOG.error("Coordinator error during start, exiting thread", e); + terminated = true; + } + + while (!terminated) { + try { + coordinator.process(); + } catch (Exception e) { + LOG.error("Coordinator error during process, exiting thread", e); + terminated = true; + } + } + + try { + coordinator.stop(); + } catch (Exception e) { + LOG.error("Coordinator error during stop, ignoring", e); + } + coordinator = null; + } + + boolean isTerminated() { + return terminated; + } + + void terminate() { + terminated = true; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java new file mode 100644 index 000000000000..87a93d058509 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import org.apache.iceberg.connect.events.Event; + +class Envelope { + private final Event event; + private final int partition; + private final long offset; + + Envelope(Event event, int partition, long offset) { + this.event = event; + this.partition = partition; + this.offset = offset; + } + + Event event() { + return event; + } + + int partition() { + return partition; + } + + long offset() { + return offset; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java new file mode 100644 index 000000000000..fd5d27ae34e2 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +class KafkaClientFactory { + private final Map kafkaProps; + + KafkaClientFactory(Map kafkaProps) { + this.kafkaProps = kafkaProps; + } + + Producer createProducer(String transactionalId) { + Map producerProps = Maps.newHashMap(kafkaProps); + producerProps.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + KafkaProducer result = + new KafkaProducer<>(producerProps, new StringSerializer(), new ByteArraySerializer()); + result.initTransactions(); + return result; + } + + Consumer createConsumer(String consumerGroupId) { + Map consumerProps = Maps.newHashMap(kafkaProps); + consumerProps.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + consumerProps.putIfAbsent(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + return new KafkaConsumer<>( + consumerProps, new StringDeserializer(), new ByteArrayDeserializer()); + } + + Admin createAdmin() { + Map adminProps = Maps.newHashMap(kafkaProps); + return Admin.create(adminProps); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java new file mode 100644 index 000000000000..85a98d4cd637 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import java.util.concurrent.ExecutionException; +import org.apache.iceberg.common.DynFields; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KafkaUtils { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); + + private static final String CONTEXT_CLASS_NAME = + "org.apache.kafka.connect.runtime.WorkerSinkTaskContext"; + + static ConsumerGroupDescription consumerGroupDescription(String consumerGroupId, Admin admin) { + try { + DescribeConsumerGroupsResult result = + admin.describeConsumerGroups(ImmutableList.of(consumerGroupId)); + return result.describedGroups().get(consumerGroupId).get(); + + } catch (InterruptedException | ExecutionException e) { + throw new ConnectException( + "Cannot retrieve members for consumer group: " + consumerGroupId, e); + } + } + + @SuppressWarnings("unchecked") + static ConsumerGroupMetadata consumerGroupMetadata( + SinkTaskContext context, String connectGroupId) { + String contextClassName = context.getClass().getName(); + if (CONTEXT_CLASS_NAME.equals(contextClassName)) { + return ((Consumer) + DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME, "consumer").build(context).get()) + .groupMetadata(); + } + LOG.warn( + "Consumer group metadata not available for {}, zombie fencing will be less strong than with {}", + contextClassName, + CONTEXT_CLASS_NAME); + return new ConsumerGroupMetadata(connectGroupId); + } + + private KafkaUtils() {} +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java new file mode 100644 index 000000000000..72a362ceacb0 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +public class NotRunningException extends RuntimeException { + public NotRunningException(String msg) { + super(msg); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java new file mode 100644 index 000000000000..7555b216cd45 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.Offset; +import org.apache.iceberg.connect.data.SinkWriter; +import org.apache.iceberg.connect.data.SinkWriterResult; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; + +class Worker extends Channel { + + private final IcebergSinkConfig config; + private final SinkTaskContext context; + private final SinkWriter sinkWriter; + + Worker( + IcebergSinkConfig config, + KafkaClientFactory clientFactory, + SinkWriter sinkWriter, + SinkTaskContext context) { + // pass transient consumer group ID to which we never commit offsets + super( + "worker", + IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(), + config, + clientFactory, + context); + + this.config = config; + this.context = context; + this.sinkWriter = sinkWriter; + } + + void process() { + consumeAvailable(Duration.ZERO); + } + + @Override + protected boolean receive(Envelope envelope) { + Event event = envelope.event(); + if (event.payload().type() != PayloadType.START_COMMIT) { + return false; + } + + SinkWriterResult results = sinkWriter.completeWrite(); + + // include all assigned topic partitions even if no messages were read + // from a partition, as the coordinator will use that to determine + // when all data for a commit has been received + List assignments = + context.assignment().stream() + .map( + tp -> { + Offset offset = results.sourceOffsets().get(tp); + if (offset == null) { + offset = Offset.NULL_OFFSET; + } + return new TopicPartitionOffset( + tp.topic(), tp.partition(), offset.offset(), offset.timestamp()); + }) + .collect(Collectors.toList()); + + UUID commitId = ((StartCommit) event.payload()).commitId(); + + List events = + results.writerResults().stream() + .map( + writeResult -> + new Event( + config.connectGroupId(), + new DataWritten( + writeResult.partitionStruct(), + commitId, + TableReference.of(config.catalogName(), writeResult.tableIdentifier()), + writeResult.dataFiles(), + writeResult.deleteFiles()))) + .collect(Collectors.toList()); + + Event readyEvent = new Event(config.connectGroupId(), new DataComplete(commitId, assignments)); + events.add(readyEvent); + + send(events, results.sourceOffsets()); + + return true; + } + + @Override + void stop() { + super.stop(); + sinkWriter.close(); + } + + void save(Collection sinkRecords) { + sinkWriter.save(sinkRecords); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index 27ffc4de9973..6df6b091510b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -32,16 +32,16 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; -public class IcebergWriter implements RecordWriter { +class IcebergWriter implements RecordWriter { private final Table table; private final String tableName; private final IcebergSinkConfig config; - private final List writerResults; + private final List writerResults; private RecordConverter recordConverter; private TaskWriter writer; - public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { + IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { this.table = table; this.tableName = tableName; this.config = config; @@ -50,7 +50,7 @@ public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { } private void initNewWriter() { - this.writer = Utilities.createTableWriter(table, tableName, config); + this.writer = RecordUtils.createTableWriter(table, tableName, config); this.recordConverter = new RecordConverter(table, config); } @@ -102,7 +102,7 @@ private void flush() { } writerResults.add( - new WriterResult( + new IcebergWriterResult( TableIdentifier.parse(tableName), Arrays.asList(writeResult.dataFiles()), Arrays.asList(writeResult.deleteFiles()), @@ -110,10 +110,10 @@ private void flush() { } @Override - public List complete() { + public List complete() { flush(); - List result = Lists.newArrayList(writerResults); + List result = Lists.newArrayList(writerResults); writerResults.clear(); return result; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 47dcddcb9925..92f5af2d7a87 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -40,20 +40,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class IcebergWriterFactory { +class IcebergWriterFactory { private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class); private final Catalog catalog; private final IcebergSinkConfig config; - public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) { + IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) { this.catalog = catalog; this.config = config; } - public RecordWriter createWriter( - String tableName, SinkRecord sample, boolean ignoreMissingTable) { + RecordWriter createWriter(String tableName, SinkRecord sample, boolean ignoreMissingTable) { TableIdentifier identifier = TableIdentifier.parse(tableName); Table table; try { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java similarity index 96% rename from kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java rename to kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java index cb3a700da247..58695a5572b5 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java @@ -24,14 +24,14 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types.StructType; -public class WriterResult { +public class IcebergWriterResult { private final TableIdentifier tableIdentifier; private final List dataFiles; private final List deleteFiles; private final StructType partitionStruct; - public WriterResult( + public IcebergWriterResult( TableIdentifier tableIdentifier, List dataFiles, List deleteFiles, diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java index 64ca44f03209..a7d2c90972d7 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.connect.data; import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.kafka.connect.sink.SinkRecord; class NoOpWriter implements RecordWriter { @@ -28,9 +29,9 @@ public void write(SinkRecord record) { } @Override - public List complete() { + public List complete() { // NO-OP - return null; + return ImmutableList.of(); } @Override diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java new file mode 100644 index 000000000000..c4522a40711b --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.time.OffsetDateTime; +import java.util.Objects; + +public class Offset implements Comparable { + + public static final Offset NULL_OFFSET = new Offset(null, null); + + private final Long offset; + private final OffsetDateTime timestamp; + + public Offset(Long offset, OffsetDateTime timestamp) { + this.offset = offset; + this.timestamp = timestamp; + } + + public Long offset() { + return offset; + } + + public OffsetDateTime timestamp() { + return timestamp; + } + + @Override + public int compareTo(Offset other) { + if (Objects.equals(this.offset, other.offset)) { + return 0; + } + if (this.offset == null || (other.offset != null && other.offset > this.offset)) { + return -1; + } + return 1; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java similarity index 68% rename from kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java rename to kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java index 4ff83f777527..5ac930739738 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java @@ -18,25 +18,14 @@ */ package org.apache.iceberg.connect.data; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.common.DynClasses; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; -import org.apache.iceberg.common.DynMethods.BoundMethod; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; @@ -46,7 +35,6 @@ import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; @@ -55,71 +43,11 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class Utilities { - - private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); - private static final List HADOOP_CONF_FILES = - ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); - - public static Catalog loadCatalog(IcebergSinkConfig config) { - return CatalogUtil.buildIcebergCatalog( - config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); - } - - // use reflection here to avoid requiring Hadoop as a dependency - private static Object loadHadoopConfig(IcebergSinkConfig config) { - Class configClass = - DynClasses.builder() - .impl("org.apache.hadoop.hdfs.HdfsConfiguration") - .impl("org.apache.hadoop.conf.Configuration") - .orNull() - .build(); - - if (configClass == null) { - LOG.info("Hadoop not found on classpath, not creating Hadoop config"); - return null; - } - - try { - Object result = DynConstructors.builder().hiddenImpl(configClass).build().newInstance(); - BoundMethod addResourceMethod = - DynMethods.builder("addResource").impl(configClass, URL.class).build(result); - BoundMethod setMethod = - DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); - - // load any config files in the specified config directory - String hadoopConfDir = config.hadoopConfDir(); - if (hadoopConfDir != null) { - HADOOP_CONF_FILES.forEach( - confFile -> { - Path path = Paths.get(hadoopConfDir, confFile); - if (Files.exists(path)) { - try { - addResourceMethod.invoke(path.toUri().toURL()); - } catch (IOException e) { - LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); - } - } - }); - } - - // set any Hadoop properties specified in the sink config - config.hadoopProps().forEach(setMethod::invoke); - - LOG.info("Hadoop config initialized: {}", configClass.getName()); - return result; - } catch (Exception e) { - LOG.warn( - "Hadoop found on classpath but could not create config, proceeding without config", e); - } - return null; - } +class RecordUtils { @SuppressWarnings("unchecked") - public static Object extractFromRecordValue(Object recordValue, String fieldName) { + static Object extractFromRecordValue(Object recordValue, String fieldName) { List fields = Splitter.on('.').splitToList(fieldName); if (recordValue instanceof Struct) { return valueFromStruct((Struct) recordValue, fields); @@ -243,5 +171,5 @@ public static TaskWriter createTableWriter( return writer; } - private Utilities() {} + private RecordUtils() {} } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java index 0b4d7566eab7..56438dde2e40 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.kafka.connect.sink.SinkRecord; -public interface RecordWriter extends Cloneable { +interface RecordWriter extends Cloneable { void write(SinkRecord record); - List complete(); + List complete(); void close(); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java new file mode 100644 index 000000000000..35a2957f0122 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkWriter { + private final IcebergSinkConfig config; + private final IcebergWriterFactory writerFactory; + private final Map writers; + private final Map sourceOffsets; + + public SinkWriter(Catalog catalog, IcebergSinkConfig config) { + this.config = config; + this.writerFactory = new IcebergWriterFactory(catalog, config); + this.writers = Maps.newHashMap(); + this.sourceOffsets = Maps.newHashMap(); + } + + public void close() { + writers.values().forEach(RecordWriter::close); + } + + public SinkWriterResult completeWrite() { + List writerResults = + writers.values().stream() + .flatMap(writer -> writer.complete().stream()) + .collect(Collectors.toList()); + Map offsets = Maps.newHashMap(sourceOffsets); + + writers.clear(); + sourceOffsets.clear(); + + return new SinkWriterResult(writerResults, offsets); + } + + public void save(Collection sinkRecords) { + sinkRecords.forEach(this::save); + } + + private void save(SinkRecord record) { + // the consumer stores the offsets that corresponds to the next record to consume, + // so increment the record offset by one + OffsetDateTime timestamp = + record.timestamp() == null + ? null + : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC); + sourceOffsets.put( + new TopicPartition(record.topic(), record.kafkaPartition()), + new Offset(record.kafkaOffset() + 1, timestamp)); + + if (config.dynamicTablesEnabled()) { + routeRecordDynamically(record); + } else { + routeRecordStatically(record); + } + } + + private void routeRecordStatically(SinkRecord record) { + String routeField = config.tablesRouteField(); + + if (routeField == null) { + // route to all tables + config + .tables() + .forEach( + tableName -> { + writerForTable(tableName, record, false).write(record); + }); + + } else { + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { + config + .tables() + .forEach( + tableName -> { + Pattern regex = config.tableConfig(tableName).routeRegex(); + if (regex != null && regex.matcher(routeValue).matches()) { + writerForTable(tableName, record, false).write(record); + } + }); + } + } + } + + private void routeRecordDynamically(SinkRecord record) { + String routeField = config.tablesRouteField(); + Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); + + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { + String tableName = routeValue.toLowerCase(); + writerForTable(tableName, record, true).write(record); + } + } + + private String extractRouteValue(Object recordValue, String routeField) { + if (recordValue == null) { + return null; + } + Object routeValue = RecordUtils.extractFromRecordValue(recordValue, routeField); + return routeValue == null ? null : routeValue.toString(); + } + + private RecordWriter writerForTable( + String tableName, SinkRecord sample, boolean ignoreMissingTable) { + return writers.computeIfAbsent( + tableName, notUsed -> writerFactory.createWriter(tableName, sample, ignoreMissingTable)); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java new file mode 100644 index 000000000000..ef899102bb64 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; + +public class SinkWriterResult { + private final List writerResults; + private final Map sourceOffsets; + + public SinkWriterResult( + List writerResults, Map sourceOffsets) { + this.writerResults = writerResults; + this.sourceOffsets = sourceOffsets; + } + + public List writerResults() { + return writerResults; + } + + public Map sourceOffsets() { + return sourceOffsets; + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java similarity index 60% rename from kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java rename to kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java index cfa1709da744..ce92b3efc3ed 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.connect.data; +package org.apache.iceberg.connect; import static org.assertj.core.api.Assertions.assertThat; @@ -27,19 +27,15 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -public class UtilitiesTest { +public class CatalogUtilsTest { private static final String HADOOP_CONF_TEMPLATE = "%s%s"; @@ -68,7 +64,7 @@ public void testLoadCatalogNoHadoopDir() { "iceberg.catalog.catalog-impl", TestCatalog.class.getName()); IcebergSinkConfig config = new IcebergSinkConfig(props); - Catalog result = Utilities.loadCatalog(config); + Catalog result = CatalogUtils.loadCatalog(config); assertThat(result).isInstanceOf(TestCatalog.class); @@ -102,7 +98,7 @@ public void testLoadCatalogWithHadoopDir(String confFile) throws IOException { "iceberg.catalog.catalog-impl", TestCatalog.class.getName()); IcebergSinkConfig config = new IcebergSinkConfig(props); - Catalog result = Utilities.loadCatalog(config); + Catalog result = CatalogUtils.loadCatalog(config); assertThat(result).isInstanceOf(TestCatalog.class); @@ -118,66 +114,4 @@ public void testLoadCatalogWithHadoopDir(String confFile) throws IOException { // check that core-site.xml was loaded assertThat(conf.get("foo")).isEqualTo("bar"); } - - @Test - public void testExtractFromRecordValueStruct() { - Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); - Struct val = new Struct(valSchema).put("key", 123L); - Object result = Utilities.extractFromRecordValue(val, "key"); - assertThat(result).isEqualTo(123L); - } - - @Test - public void testExtractFromRecordValueStructNested() { - Schema idSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); - Schema dataSchema = SchemaBuilder.struct().field("id", idSchema).build(); - Schema valSchema = SchemaBuilder.struct().field("data", dataSchema).build(); - - Struct id = new Struct(idSchema).put("key", 123L); - Struct data = new Struct(dataSchema).put("id", id); - Struct val = new Struct(valSchema).put("data", data); - - Object result = Utilities.extractFromRecordValue(val, "data.id.key"); - assertThat(result).isEqualTo(123L); - } - - @Test - public void testExtractFromRecordValueStructNull() { - Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); - Struct val = new Struct(valSchema).put("key", 123L); - - Object result = Utilities.extractFromRecordValue(val, ""); - assertThat(result).isNull(); - - result = Utilities.extractFromRecordValue(val, "xkey"); - assertThat(result).isNull(); - } - - @Test - public void testExtractFromRecordValueMap() { - Map val = ImmutableMap.of("key", 123L); - Object result = Utilities.extractFromRecordValue(val, "key"); - assertThat(result).isEqualTo(123L); - } - - @Test - public void testExtractFromRecordValueMapNested() { - Map id = ImmutableMap.of("key", 123L); - Map data = ImmutableMap.of("id", id); - Map val = ImmutableMap.of("data", data); - - Object result = Utilities.extractFromRecordValue(val, "data.id.key"); - assertThat(result).isEqualTo(123L); - } - - @Test - public void testExtractFromRecordValueMapNull() { - Map val = ImmutableMap.of("key", 123L); - - Object result = Utilities.extractFromRecordValue(val, ""); - assertThat(result).isNull(); - - result = Utilities.extractFromRecordValue(val, "xkey"); - assertThat(result).isNull(); - } } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java new file mode 100644 index 000000000000..e7c049eff93a --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public class ChannelTestBase { + protected static final String SRC_TOPIC_NAME = "src-topic"; + protected static final String CTL_TOPIC_NAME = "ctl-topic"; + protected static final String CONNECT_CONSUMER_GROUP_ID = "cg-connect"; + protected InMemoryCatalog catalog; + protected Table table; + protected IcebergSinkConfig config; + protected KafkaClientFactory clientFactory; + protected MockProducer producer; + protected MockConsumer consumer; + protected Admin admin; + + private InMemoryCatalog initInMemoryCatalog() { + InMemoryCatalog inMemoryCatalog = new InMemoryCatalog(); + inMemoryCatalog.initialize(null, ImmutableMap.of()); + return inMemoryCatalog; + } + + protected static final Namespace NAMESPACE = Namespace.of("db"); + protected static final String TABLE_NAME = "tbl"; + protected static final TableIdentifier TABLE_IDENTIFIER = + TableIdentifier.of(NAMESPACE, TABLE_NAME); + protected static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + required(3, "date", Types.StringType.get())); + + protected static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; + protected static final String OFFSETS_SNAPSHOT_PROP = + String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONNECT_CONSUMER_GROUP_ID); + protected static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts"; + + @BeforeEach + @SuppressWarnings("deprecation") + public void before() { + catalog = initInMemoryCatalog(); + catalog.createNamespace(NAMESPACE); + table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA); + + config = mock(IcebergSinkConfig.class); + when(config.controlTopic()).thenReturn(CTL_TOPIC_NAME); + when(config.commitThreads()).thenReturn(1); + when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + + TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class); + when(partitionInfo.partition()).thenReturn(0); + TopicDescription topicDesc = + new TopicDescription(SRC_TOPIC_NAME, false, ImmutableList.of(partitionInfo)); + DescribeTopicsResult describeResult = mock(DescribeTopicsResult.class); + when(describeResult.values()) + .thenReturn(ImmutableMap.of(SRC_TOPIC_NAME, KafkaFuture.completedFuture(topicDesc))); + + admin = mock(Admin.class); + when(admin.describeTopics(anyCollection())).thenReturn(describeResult); + + producer = new MockProducer<>(false, new StringSerializer(), new ByteArraySerializer()); + producer.initTransactions(); + + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + + clientFactory = mock(KafkaClientFactory.class); + when(clientFactory.createProducer(any())).thenReturn(producer); + when(clientFactory.createConsumer(any())).thenReturn(consumer); + when(clientFactory.createAdmin()).thenReturn(admin); + } + + @AfterEach + public void after() throws IOException { + catalog.close(); + } + + protected void initConsumer() { + TopicPartition tp = new TopicPartition(CTL_TOPIC_NAME, 0); + consumer.rebalance(ImmutableList.of(tp)); + consumer.updateBeginningOffsets(ImmutableMap.of(tp, 0L)); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java new file mode 100644 index 000000000000..14d9cbd8eaf0 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.OffsetDateTime; +import java.util.UUID; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.Payload; +import org.apache.iceberg.connect.events.TopicPartitionOffset; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class CommitStateTest { + @Test + public void testIsCommitReady() { + TopicPartitionOffset tp = mock(TopicPartitionOffset.class); + + CommitState commitState = new CommitState(mock(IcebergSinkConfig.class)); + commitState.startNewCommit(); + + DataComplete payload1 = mock(DataComplete.class); + when(payload1.commitId()).thenReturn(commitState.currentCommitId()); + when(payload1.assignments()).thenReturn(ImmutableList.of(tp, tp)); + + DataComplete payload2 = mock(DataComplete.class); + when(payload2.commitId()).thenReturn(commitState.currentCommitId()); + when(payload2.assignments()).thenReturn(ImmutableList.of(tp)); + + DataComplete payload3 = mock(DataComplete.class); + when(payload3.commitId()).thenReturn(UUID.randomUUID()); + when(payload3.assignments()).thenReturn(ImmutableList.of(tp)); + + commitState.addReady(wrapInEnvelope(payload1)); + commitState.addReady(wrapInEnvelope(payload2)); + commitState.addReady(wrapInEnvelope(payload3)); + + assertThat(commitState.isCommitReady(3)).isTrue(); + assertThat(commitState.isCommitReady(4)).isFalse(); + } + + @Test + public void testGetValidThroughTs() { + DataComplete payload1 = mock(DataComplete.class); + TopicPartitionOffset tp1 = mock(TopicPartitionOffset.class); + OffsetDateTime ts1 = EventTestUtil.now(); + when(tp1.timestamp()).thenReturn(ts1); + + TopicPartitionOffset tp2 = mock(TopicPartitionOffset.class); + OffsetDateTime ts2 = ts1.plusSeconds(1); + when(tp2.timestamp()).thenReturn(ts2); + when(payload1.assignments()).thenReturn(ImmutableList.of(tp1, tp2)); + + DataComplete payload2 = mock(DataComplete.class); + TopicPartitionOffset tp3 = mock(TopicPartitionOffset.class); + OffsetDateTime ts3 = ts1.plusSeconds(2); + when(tp3.timestamp()).thenReturn(ts3); + when(payload2.assignments()).thenReturn(ImmutableList.of(tp3)); + + CommitState commitState = new CommitState(mock(IcebergSinkConfig.class)); + commitState.startNewCommit(); + + commitState.addReady(wrapInEnvelope(payload1)); + commitState.addReady(wrapInEnvelope(payload2)); + + assertThat(commitState.vtts(false)).isEqualTo(ts1); + assertThat(commitState.vtts(true)).isNull(); + + // null timestamp for one, so should not set a vtts + DataComplete payload3 = mock(DataComplete.class); + TopicPartitionOffset tp4 = mock(TopicPartitionOffset.class); + when(tp4.timestamp()).thenReturn(null); + when(payload3.assignments()).thenReturn(ImmutableList.of(tp4)); + + commitState.addReady(wrapInEnvelope(payload3)); + + assertThat(commitState.vtts(false)).isNull(); + assertThat(commitState.vtts(true)).isNull(); + } + + private Envelope wrapInEnvelope(Payload payload) { + Event event = mock(Event.class); + when(event.payload()).thenReturn(payload); + return new Envelope(event, 0, 0); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java new file mode 100644 index 000000000000..7c8ccf8ef669 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Optional; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +public class CommitterImplTest { + + @Test + public void testIsLeader() { + CommitterImpl committer = new CommitterImpl(); + + MemberAssignment assignment1 = + new MemberAssignment( + ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 1))); + MemberDescription member1 = + new MemberDescription(null, Optional.empty(), null, null, assignment1); + + MemberAssignment assignment2 = + new MemberAssignment( + ImmutableSet.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1))); + MemberDescription member2 = + new MemberDescription(null, Optional.empty(), null, null, assignment2); + + List members = ImmutableList.of(member1, member2); + + List assignments = + ImmutableList.of(new TopicPartition("topic2", 1), new TopicPartition("topic1", 0)); + assertThat(committer.isLeader(members, assignments)).isTrue(); + + assignments = + ImmutableList.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1)); + assertThat(committer.isLeader(members, assignments)).isFalse(); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java new file mode 100644 index 000000000000..85aa5293223e --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types.StructType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class CoordinatorTest extends ChannelTestBase { + + @Test + public void testCommitAppend() { + Assertions.assertEquals(0, ImmutableList.copyOf(table.snapshots().iterator()).size()); + + OffsetDateTime ts = EventTestUtil.now(); + UUID commitId = + coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), ts); + table.refresh(); + + assertThat(producer.history()).hasSize(3); + assertCommitTable(1, commitId, ts); + assertCommitComplete(2, commitId, ts); + + List snapshots = ImmutableList.copyOf(table.snapshots()); + Assertions.assertEquals(1, snapshots.size()); + + Snapshot snapshot = snapshots.get(0); + Assertions.assertEquals(DataOperations.APPEND, snapshot.operation()); + Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size()); + Assertions.assertEquals(0, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size()); + + Map summary = snapshot.summary(); + Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); + Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); + Assertions.assertEquals(ts.toString(), summary.get(VTTS_SNAPSHOT_PROP)); + } + + @Test + public void testCommitDelta() { + OffsetDateTime ts = EventTestUtil.now(); + UUID commitId = + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), + ImmutableList.of(EventTestUtil.createDeleteFile()), + ts); + + assertThat(producer.history()).hasSize(3); + assertCommitTable(1, commitId, ts); + assertCommitComplete(2, commitId, ts); + + List snapshots = ImmutableList.copyOf(table.snapshots()); + Assertions.assertEquals(1, snapshots.size()); + + Snapshot snapshot = snapshots.get(0); + Assertions.assertEquals(DataOperations.OVERWRITE, snapshot.operation()); + Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size()); + Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size()); + + Map summary = snapshot.summary(); + Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); + Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); + Assertions.assertEquals(ts.toString(), summary.get(VTTS_SNAPSHOT_PROP)); + } + + @Test + public void testCommitNoFiles() { + OffsetDateTime ts = EventTestUtil.now(); + UUID commitId = coordinatorTest(ImmutableList.of(), ImmutableList.of(), ts); + + assertThat(producer.history()).hasSize(2); + assertCommitComplete(1, commitId, ts); + + List snapshots = ImmutableList.copyOf(table.snapshots()); + Assertions.assertEquals(0, snapshots.size()); + } + + @Test + public void testCommitError() { + // this spec isn't registered with the table + PartitionSpec badPartitionSpec = + PartitionSpec.builderFor(SCHEMA).withSpecId(1).identity("id").build(); + DataFile badDataFile = + DataFiles.builder(badPartitionSpec) + .withPath(UUID.randomUUID() + ".parquet") + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(100L) + .withRecordCount(5) + .build(); + + coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null); + + // no commit messages sent + assertThat(producer.history()).hasSize(1); + + List snapshots = ImmutableList.copyOf(table.snapshots()); + Assertions.assertEquals(0, snapshots.size()); + } + + private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) { + byte[] bytes = producer.history().get(idx).value(); + Event commitTable = AvroUtil.decode(bytes); + assertThat(commitTable.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE); + CommitToTable commitToTablePayload = (CommitToTable) commitTable.payload(); + assertThat(commitToTablePayload.commitId()).isEqualTo(commitId); + assertThat(commitToTablePayload.tableReference().identifier().toString()) + .isEqualTo(TABLE_IDENTIFIER.toString()); + assertThat(commitToTablePayload.validThroughTs()).isEqualTo(ts); + } + + private void assertCommitComplete(int idx, UUID commitId, OffsetDateTime ts) { + byte[] bytes = producer.history().get(idx).value(); + Event commitComplete = AvroUtil.decode(bytes); + assertThat(commitComplete.type()).isEqualTo(PayloadType.COMMIT_COMPLETE); + CommitComplete commitCompletePayload = (CommitComplete) commitComplete.payload(); + assertThat(commitCompletePayload.commitId()).isEqualTo(commitId); + assertThat(commitCompletePayload.validThroughTs()).isEqualTo(ts); + } + + private UUID coordinatorTest( + List dataFiles, List deleteFiles, OffsetDateTime ts) { + when(config.commitIntervalMs()).thenReturn(0); + when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE); + + SinkTaskContext context = mock(SinkTaskContext.class); + Coordinator coordinator = + new Coordinator(catalog, config, ImmutableList.of(), clientFactory, context); + coordinator.start(); + + // init consumer after subscribe() + initConsumer(); + + coordinator.process(); + + assertThat(producer.transactionCommitted()).isTrue(); + assertThat(producer.history()).hasSize(1); + + byte[] bytes = producer.history().get(0).value(); + Event commitRequest = AvroUtil.decode(bytes); + assertThat(commitRequest.type()).isEqualTo(PayloadType.START_COMMIT); + + UUID commitId = ((StartCommit) commitRequest.payload()).commitId(); + + Event commitResponse = + new Event( + config.connectGroupId(), + new DataWritten( + StructType.of(), + commitId, + new TableReference("catalog", ImmutableList.of("db"), "tbl"), + dataFiles, + deleteFiles)); + bytes = AvroUtil.encode(commitResponse); + consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); + + Event commitReady = + new Event( + config.connectGroupId(), + new DataComplete( + commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))); + bytes = AvroUtil.encode(commitReady); + consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key", bytes)); + + when(config.commitIntervalMs()).thenReturn(0); + + coordinator.process(); + + return commitId; + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java new file mode 100644 index 000000000000..da0d881f8927 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.Test; + +public class CoordinatorThreadTest { + + @Test + public void testRun() { + Coordinator coordinator = mock(Coordinator.class); + CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator); + + coordinatorThread.start(); + + verify(coordinator, timeout(1000)).start(); + verify(coordinator, timeout(1000).atLeast(1)).process(); + verify(coordinator, times(0)).stop(); + assertThat(coordinatorThread.isTerminated()).isFalse(); + + coordinatorThread.terminate(); + + verify(coordinator, timeout(1000)).stop(); + assertThat(coordinatorThread.isTerminated()).isTrue(); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java new file mode 100644 index 000000000000..8c3625b74a5d --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import java.nio.ByteBuffer; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; + +class EventTestUtil { + private EventTestUtil() {} + + static final Schema SCHEMA = + new Schema(ImmutableList.of(Types.NestedField.required(1, "id", Types.LongType.get()))); + + static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + + static final SortOrder ORDER = + SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC, NullOrder.NULLS_FIRST).build(); + + static final Metrics METRICS = + new Metrics( + 1L, + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, ByteBuffer.wrap(new byte[10])), + ImmutableMap.of(1, ByteBuffer.wrap(new byte[10]))); + + static OffsetDateTime now() { + return OffsetDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MICROS); + } + + static DataFile createDataFile() { + PartitionData data = new PartitionData(SPEC.partitionType()); + data.set(0, 1L); + + return DataFiles.builder(SPEC) + .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[] {0})) + .withFileSizeInBytes(100L) + .withFormat(FileFormat.PARQUET) + .withMetrics(METRICS) + .withPartition(data) + .withPath("path/to/file.parquet") + .withSortOrder(ORDER) + .withSplitOffsets(ImmutableList.of(4L)) + .build(); + } + + static DeleteFile createDeleteFile() { + PartitionData data = new PartitionData(SPEC.partitionType()); + data.set(0, 1L); + + return FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes(1) + .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[] {0})) + .withFileSizeInBytes(100L) + .withFormat(FileFormat.PARQUET) + .withMetrics(METRICS) + .withPartition(data) + .withPath("path/to/file.parquet") + .withSortOrder(ORDER) + .withSplitOffsets(ImmutableList.of(4L)) + .build(); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java new file mode 100644 index 000000000000..a19047ef2d2c --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.data.IcebergWriterResult; +import org.apache.iceberg.connect.data.Offset; +import org.apache.iceberg.connect.data.SinkWriter; +import org.apache.iceberg.connect.data.SinkWriterResult; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types.StructType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.Test; + +public class WorkerTest extends ChannelTestBase { + + @Test + public void testSave() { + when(config.catalogName()).thenReturn("catalog"); + + SinkTaskContext context = mock(SinkTaskContext.class); + TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0); + when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition)); + + IcebergWriterResult writeResult = + new IcebergWriterResult( + TableIdentifier.parse(TABLE_NAME), + ImmutableList.of(EventTestUtil.createDataFile()), + ImmutableList.of(), + StructType.of()); + + Map offsets = + ImmutableMap.of(topicPartition, new Offset(1L, EventTestUtil.now())); + + SinkWriterResult sinkWriterResult = + new SinkWriterResult(ImmutableList.of(writeResult), offsets); + SinkWriter sinkWriter = mock(SinkWriter.class); + when(sinkWriter.completeWrite()).thenReturn(sinkWriterResult); + + Worker worker = new Worker(config, clientFactory, sinkWriter, context); + worker.start(); + + // init consumer after subscribe() + initConsumer(); + + // save a record + Map value = ImmutableMap.of(); + SinkRecord rec = new SinkRecord(SRC_TOPIC_NAME, 0, null, "key", null, value, 0L); + worker.save(ImmutableList.of(rec)); + + UUID commitId = UUID.randomUUID(); + Event commitRequest = new Event(config.connectGroupId(), new StartCommit(commitId)); + byte[] bytes = AvroUtil.encode(commitRequest); + consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); + + worker.process(); + + assertThat(producer.history()).hasSize(2); + + Event event = AvroUtil.decode(producer.history().get(0).value()); + assertThat(event.payload().type()).isEqualTo(PayloadType.DATA_WRITTEN); + DataWritten dataWritten = (DataWritten) event.payload(); + assertThat(dataWritten.commitId()).isEqualTo(commitId); + + event = AvroUtil.decode(producer.history().get(1).value()); + assertThat(event.type()).isEqualTo(PayloadType.DATA_COMPLETE); + DataComplete dataComplete = (DataComplete) event.payload(); + assertThat(dataComplete.commitId()).isEqualTo(commitId); + assertThat(dataComplete.assignments()).hasSize(1); + assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java index 80adc7fc3e03..ac44952a5c15 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java @@ -73,7 +73,7 @@ public void before() { protected WriteResult writeTest( List rows, IcebergSinkConfig config, Class expectedWriterClass) { - try (TaskWriter writer = Utilities.createTableWriter(table, "name", config)) { + try (TaskWriter writer = RecordUtils.createTableWriter(table, "name", config)) { assertThat(writer.getClass()).isEqualTo(expectedWriterClass); rows.forEach( diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java new file mode 100644 index 000000000000..08e832256a28 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +public class RecordUtilsTest { + + @Test + public void testExtractFromRecordValueStruct() { + Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); + Struct val = new Struct(valSchema).put("key", 123L); + Object result = RecordUtils.extractFromRecordValue(val, "key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueStructNested() { + Schema idSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); + Schema dataSchema = SchemaBuilder.struct().field("id", idSchema).build(); + Schema valSchema = SchemaBuilder.struct().field("data", dataSchema).build(); + + Struct id = new Struct(idSchema).put("key", 123L); + Struct data = new Struct(dataSchema).put("id", id); + Struct val = new Struct(valSchema).put("data", data); + + Object result = RecordUtils.extractFromRecordValue(val, "data.id.key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueStructNull() { + Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); + Struct val = new Struct(valSchema).put("key", 123L); + + Object result = RecordUtils.extractFromRecordValue(val, ""); + assertThat(result).isNull(); + + result = RecordUtils.extractFromRecordValue(val, "xkey"); + assertThat(result).isNull(); + } + + @Test + public void testExtractFromRecordValueMap() { + Map val = ImmutableMap.of("key", 123L); + Object result = RecordUtils.extractFromRecordValue(val, "key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueMapNested() { + Map id = ImmutableMap.of("key", 123L); + Map data = ImmutableMap.of("id", id); + Map val = ImmutableMap.of("data", data); + + Object result = RecordUtils.extractFromRecordValue(val, "data.id.key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueMapNull() { + Map val = ImmutableMap.of("key", 123L); + + Object result = RecordUtils.extractFromRecordValue(val, ""); + assertThat(result).isNull(); + + result = RecordUtils.extractFromRecordValue(val, "xkey"); + assertThat(result).isNull(); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java new file mode 100644 index 000000000000..be29ef1022a4 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class SchemaUpdateTest { + + @Test + public void testAddColumn() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + updateConsumer.addColumn("parent", "name", Types.StringType.get()); + assertThat(updateConsumer.addColumns()).hasSize(1); + assertThat(updateConsumer.updateTypes()).isEmpty(); + assertThat(updateConsumer.makeOptionals()).isEmpty(); + + SchemaUpdate.AddColumn addColumn = updateConsumer.addColumns().iterator().next(); + assertThat(addColumn.parentName()).isEqualTo("parent"); + assertThat(addColumn.name()).isEqualTo("name"); + assertThat(addColumn.type()).isEqualTo(Types.StringType.get()); + } + + @Test + public void testUpdateType() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + updateConsumer.updateType("name", Types.LongType.get()); + assertThat(updateConsumer.addColumns()).isEmpty(); + + assertThat(updateConsumer.updateTypes()).hasSize(1); + assertThat(updateConsumer.makeOptionals()).isEmpty(); + + SchemaUpdate.UpdateType updateType = updateConsumer.updateTypes().iterator().next(); + assertThat(updateType.name()).isEqualTo("name"); + assertThat(updateType.type()).isEqualTo(Types.LongType.get()); + } + + @Test + public void testMakeOptional() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + updateConsumer.makeOptional("name"); + assertThat(updateConsumer.addColumns()).isEmpty(); + + assertThat(updateConsumer.updateTypes()).isEmpty(); + assertThat(updateConsumer.makeOptionals()).hasSize(1); + + SchemaUpdate.MakeOptional makeOptional = updateConsumer.makeOptionals().iterator().next(); + assertThat(makeOptional.name()).isEqualTo("name"); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java new file mode 100644 index 000000000000..4a17b926fc56 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class SinkWriterTest { + + private InMemoryCatalog catalog; + + private static final Namespace NAMESPACE = Namespace.of("db"); + private static final String TABLE_NAME = "tbl"; + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME); + private static final Schema SCHEMA = + new Schema( + optional(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "date", Types.StringType.get())); + private static final String ROUTE_FIELD = "fld"; + + @BeforeEach + public void before() { + catalog = initInMemoryCatalog(); + catalog.createNamespace(NAMESPACE); + catalog.createTable(TABLE_IDENTIFIER, SCHEMA); + } + + @AfterEach + public void after() throws IOException { + catalog.close(); + } + + private InMemoryCatalog initInMemoryCatalog() { + InMemoryCatalog inMemoryCatalog = new InMemoryCatalog(); + inMemoryCatalog.initialize(null, ImmutableMap.of()); + return inMemoryCatalog; + } + + @Test + public void testDefaultRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + Map value = ImmutableMap.of(); + + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(1); + IcebergWriterResult writerResult = writerResults.get(0); + assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + } + + @Test + public void testDefaultNoRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.tables()).thenReturn(ImmutableList.of()); + Map value = ImmutableMap.of(); + + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(0); + } + + @Test + public void testStaticRoute() { + TableSinkConfig tableConfig = mock(TableSinkConfig.class); + when(tableConfig.routeRegex()).thenReturn(Pattern.compile("val")); + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(tableConfig); + when(config.tablesRouteField()).thenReturn(ROUTE_FIELD); + + Map value = ImmutableMap.of(ROUTE_FIELD, "val"); + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(1); + IcebergWriterResult writerResult = writerResults.get(0); + assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + } + + @Test + public void testStaticNoRoute() { + TableSinkConfig tableConfig = mock(TableSinkConfig.class); + when(tableConfig.routeRegex()).thenReturn(Pattern.compile("val")); + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(tableConfig); + when(config.tablesRouteField()).thenReturn(ROUTE_FIELD); + + Map value = ImmutableMap.of(ROUTE_FIELD, "foobar"); + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(0); + } + + @Test + public void testDynamicRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.dynamicTablesEnabled()).thenReturn(true); + when(config.tablesRouteField()).thenReturn(ROUTE_FIELD); + + Map value = ImmutableMap.of(ROUTE_FIELD, TABLE_IDENTIFIER.toString()); + + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(1); + IcebergWriterResult writerResult = writerResults.get(0); + assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + } + + @Test + public void testDynamicNoRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.dynamicTablesEnabled()).thenReturn(true); + when(config.tablesRouteField()).thenReturn(ROUTE_FIELD); + + Map value = ImmutableMap.of(ROUTE_FIELD, "db.foobar"); + + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(0); + } + + private List sinkWriterTest( + Map value, IcebergSinkConfig config) { + IcebergWriterResult writeResult = + new IcebergWriterResult( + TableIdentifier.parse(TABLE_NAME), + ImmutableList.of(mock(DataFile.class)), + ImmutableList.of(), + Types.StructType.of()); + IcebergWriter writer = mock(IcebergWriter.class); + when(writer.complete()).thenReturn(ImmutableList.of(writeResult)); + + IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class); + when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); + + SinkWriter sinkWriter = new SinkWriter(catalog, config); + + // save a record + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + SinkRecord rec = + new SinkRecord( + "topic", + 1, + null, + "key", + null, + value, + 100L, + now.toEpochMilli(), + TimestampType.LOG_APPEND_TIME); + sinkWriter.save(ImmutableList.of(rec)); + + SinkWriterResult result = sinkWriter.completeWrite(); + + Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1)); + assertThat(offset).isNotNull(); + assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than current offset + assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC)); + + return result.writerResults(); + } +} From de3739f46d428a25cff36fc6bd529bb1dd196052 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 10 Jul 2024 20:03:32 -0700 Subject: [PATCH 2/4] PR feedback --- .../iceberg/connect/CommitterFactory.java | 4 +-- .../iceberg/connect/channel/Channel.java | 4 --- .../iceberg/connect/channel/CommitState.java | 8 ++--- .../iceberg/connect/channel/Coordinator.java | 31 ++++++++++--------- .../connect/channel/ChannelTestBase.java | 2 +- .../connect/channel/CommitStateTest.java | 10 +++--- .../connect/channel/CoordinatorTest.java | 4 +-- 7 files changed, 31 insertions(+), 32 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java index edb2c518f11b..18ff118c7773 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java @@ -20,8 +20,8 @@ import org.apache.iceberg.connect.channel.CommitterImpl; -public class CommitterFactory { - public static Committer createCommitter(IcebergSinkConfig config) { +class CommitterFactory { + static Committer createCommitter(IcebergSinkConfig config) { return new CommitterImpl(); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java index ef71ed7b9621..fdb1d7501c7b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -151,10 +151,6 @@ protected void commitConsumerOffsets() { consumer.commitSync(offsetsToCommit); } - protected Admin admin() { - return admin; - } - void start() { consumer.subscribe(ImmutableList.of(controlTopic)); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java index f3ac850d12f9..6cad33c3e387 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java @@ -144,21 +144,21 @@ Map> tableCommitMap() { envelope -> ((DataWritten) envelope.event().payload()).tableReference())); } - OffsetDateTime vtts(boolean partialCommit) { - boolean validVtts = + OffsetDateTime validThroughTs(boolean partialCommit) { + boolean hasValidThroughTs = !partialCommit && readyBuffer.stream() .flatMap(event -> event.assignments().stream()) .allMatch(offset -> offset.timestamp() != null); OffsetDateTime result; - if (validVtts) { + if (hasValidThroughTs) { result = readyBuffer.stream() .flatMap(event -> event.assignments().stream()) .map(TopicPartitionOffset::timestamp) .min(Comparator.naturalOrder()) - .get(); + .orElse(null); } else { result = null; } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 80e34dd768d5..7274f77e0c85 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -62,7 +62,7 @@ class Coordinator extends Channel { private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; - private static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts"; + private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; private static final Duration POLL_DURATION = Duration.ofSeconds(1); private final Catalog catalog; @@ -139,14 +139,14 @@ private void doCommit(boolean partialCommit) { Map> commitMap = commitState.tableCommitMap(); String offsetsJson = offsetsJson(); - OffsetDateTime vtts = commitState.vtts(partialCommit); + OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); Tasks.foreach(commitMap.entrySet()) .executeWith(exec) .stopOnFailure() .run( entry -> { - commitToTable(entry.getKey(), entry.getValue(), offsetsJson, vtts); + commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); }); // we should only get here if all tables committed successfully... @@ -154,14 +154,16 @@ private void doCommit(boolean partialCommit) { commitState.clearResponses(); Event event = - new Event(config.connectGroupId(), new CommitComplete(commitState.currentCommitId(), vtts)); + new Event( + config.connectGroupId(), + new CommitComplete(commitState.currentCommitId(), validThroughTs)); send(event); LOG.info( - "Commit {} complete, committed to {} table(s), vtts {}", + "Commit {} complete, committed to {} table(s), valid-through {}", commitState.currentCommitId(), commitMap.size(), - vtts); + validThroughTs); } private String offsetsJson() { @@ -176,7 +178,7 @@ private void commitToTable( TableReference tableReference, List envelopeList, String offsetsJson, - OffsetDateTime vtts) { + OffsetDateTime validThroughTs) { TableIdentifier tableIdentifier = tableReference.identifier(); Table table; try { @@ -226,8 +228,8 @@ private void commitToTable( } appendOp.set(snapshotOffsetsProp, offsetsJson); appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (vtts != null) { - appendOp.set(VTTS_SNAPSHOT_PROP, vtts.toString()); + if (validThroughTs != null) { + appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); } dataFiles.forEach(appendOp::appendFile); appendOp.commit(); @@ -238,8 +240,8 @@ private void commitToTable( } deltaOp.set(snapshotOffsetsProp, offsetsJson); deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (vtts != null) { - deltaOp.set(VTTS_SNAPSHOT_PROP, vtts.toString()); + if (validThroughTs != null) { + deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); } dataFiles.forEach(deltaOp::addRows); deleteFiles.forEach(deltaOp::addDeletes); @@ -250,15 +252,16 @@ private void commitToTable( Event event = new Event( config.connectGroupId(), - new CommitToTable(commitState.currentCommitId(), tableReference, snapshotId, vtts)); + new CommitToTable( + commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); send(event); LOG.info( - "Commit complete to table {}, snapshot {}, commit ID {}, vtts {}", + "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", tableIdentifier, snapshotId, commitState.currentCommitId(), - vtts); + validThroughTs); } } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java index e7c049eff93a..e6ffefbd9799 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java @@ -81,7 +81,7 @@ private InMemoryCatalog initInMemoryCatalog() { protected static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; protected static final String OFFSETS_SNAPSHOT_PROP = String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONNECT_CONSUMER_GROUP_ID); - protected static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts"; + protected static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; @BeforeEach @SuppressWarnings("deprecation") diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java index 14d9cbd8eaf0..a9fe1ad099cb 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java @@ -84,10 +84,10 @@ public void testGetValidThroughTs() { commitState.addReady(wrapInEnvelope(payload1)); commitState.addReady(wrapInEnvelope(payload2)); - assertThat(commitState.vtts(false)).isEqualTo(ts1); - assertThat(commitState.vtts(true)).isNull(); + assertThat(commitState.validThroughTs(false)).isEqualTo(ts1); + assertThat(commitState.validThroughTs(true)).isNull(); - // null timestamp for one, so should not set a vtts + // null timestamp for one, so should not set a valid-through timestamp DataComplete payload3 = mock(DataComplete.class); TopicPartitionOffset tp4 = mock(TopicPartitionOffset.class); when(tp4.timestamp()).thenReturn(null); @@ -95,8 +95,8 @@ public void testGetValidThroughTs() { commitState.addReady(wrapInEnvelope(payload3)); - assertThat(commitState.vtts(false)).isNull(); - assertThat(commitState.vtts(true)).isNull(); + assertThat(commitState.validThroughTs(false)).isNull(); + assertThat(commitState.validThroughTs(true)).isNull(); } private Envelope wrapInEnvelope(Payload payload) { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java index 85aa5293223e..9c0b8122ae42 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java @@ -76,7 +76,7 @@ public void testCommitAppend() { Map summary = snapshot.summary(); Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); - Assertions.assertEquals(ts.toString(), summary.get(VTTS_SNAPSHOT_PROP)); + Assertions.assertEquals(ts.toString(), summary.get(VALID_THROUGH_TS_SNAPSHOT_PROP)); } @Test @@ -103,7 +103,7 @@ public void testCommitDelta() { Map summary = snapshot.summary(); Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); - Assertions.assertEquals(ts.toString(), summary.get(VTTS_SNAPSHOT_PROP)); + Assertions.assertEquals(ts.toString(), summary.get(VALID_THROUGH_TS_SNAPSHOT_PROP)); } @Test From 2ec253dcfbf666765ad61230b2831bfa0c5789e3 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 10 Jul 2024 21:00:19 -0700 Subject: [PATCH 3/4] Get partition assignment from consumer --- .../org/apache/iceberg/connect/Committer.java | 7 +----- .../iceberg/connect/IcebergSinkTask.java | 2 +- .../iceberg/connect/channel/Channel.java | 2 +- .../connect/channel/CommitterImpl.java | 8 +++---- .../iceberg/connect/channel/KafkaUtils.java | 24 ++++++++----------- 5 files changed, 16 insertions(+), 27 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java index 47cb47c6147d..edc217d1b0e4 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java @@ -20,16 +20,11 @@ import java.util.Collection; import org.apache.iceberg.catalog.Catalog; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; public interface Committer { - void start( - Catalog catalog, - IcebergSinkConfig config, - SinkTaskContext context, - Collection partitions); + void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context); void stop(); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java index d69e59824343..460b18fd7fc2 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java @@ -55,7 +55,7 @@ public void open(Collection partitions) { catalog = CatalogUtils.loadCatalog(config); committer = CommitterFactory.createCommitter(config); - committer.start(catalog, config, context, partitions); + committer.start(catalog, config, context); } @Override diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java index fdb1d7501c7b..993fcf67c989 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -100,7 +100,7 @@ protected void send(List events, Map sourceOffset recordList.forEach(producer::send); if (!sourceOffsets.isEmpty()) { producer.sendOffsetsToTransaction( - offsetsToCommit, KafkaUtils.consumerGroupMetadata(context, connectGroupId)); + offsetsToCommit, KafkaUtils.consumerGroupMetadata(context)); } producer.commitTransaction(); } catch (Exception e) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 6e28e7ba2eb2..53b7b76e8ea0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Comparator; +import java.util.Set; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.connect.Committer; import org.apache.iceberg.connect.IcebergSinkConfig; @@ -56,11 +57,7 @@ public int compare(TopicPartition o1, TopicPartition o2) { } @Override - public void start( - Catalog catalog, - IcebergSinkConfig config, - SinkTaskContext context, - Collection partitions) { + public void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context) { KafkaClientFactory clientFactory = new KafkaClientFactory(config.kafkaProps()); ConsumerGroupDescription groupDesc; @@ -70,6 +67,7 @@ public void start( if (groupDesc.state() == ConsumerGroupState.STABLE) { Collection members = groupDesc.members(); + Set partitions = context.assignment(); if (isLeader(members, partitions)) { LOG.info("Task elected leader, starting commit coordinator"); Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory, context); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java index 85a98d4cd637..be51fff8bfbc 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java @@ -28,13 +28,9 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkTaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class KafkaUtils { - private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); - private static final String CONTEXT_CLASS_NAME = "org.apache.kafka.connect.runtime.WorkerSinkTaskContext"; @@ -50,20 +46,20 @@ static ConsumerGroupDescription consumerGroupDescription(String consumerGroupId, } } + static ConsumerGroupMetadata consumerGroupMetadata(SinkTaskContext context) { + return kafkaConsumer(context).groupMetadata(); + } + @SuppressWarnings("unchecked") - static ConsumerGroupMetadata consumerGroupMetadata( - SinkTaskContext context, String connectGroupId) { + private static Consumer kafkaConsumer(SinkTaskContext context) { String contextClassName = context.getClass().getName(); - if (CONTEXT_CLASS_NAME.equals(contextClassName)) { + try { return ((Consumer) - DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME, "consumer").build(context).get()) - .groupMetadata(); + DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME, "consumer").build(context).get()); + } catch (Exception e) { + throw new ConnectException( + "Unable to retrieve consumer from context: " + contextClassName, e); } - LOG.warn( - "Consumer group metadata not available for {}, zombie fencing will be less strong than with {}", - contextClassName, - CONTEXT_CLASS_NAME); - return new ConsumerGroupMetadata(connectGroupId); } private KafkaUtils() {} From e276a3d74099535d83677a54fdbd54d155f0c3bf Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 10 Jul 2024 21:46:24 -0700 Subject: [PATCH 4/4] test fix --- .../iceberg/connect/channel/WorkerTest.java | 111 ++++++++++-------- 1 file changed, 61 insertions(+), 50 deletions(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java index a19047ef2d2c..577c28fe6375 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java @@ -19,7 +19,9 @@ package org.apache.iceberg.connect.channel; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; import java.util.Map; @@ -39,11 +41,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.types.Types.StructType; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; public class WorkerTest extends ChannelTestBase { @@ -51,55 +55,62 @@ public class WorkerTest extends ChannelTestBase { public void testSave() { when(config.catalogName()).thenReturn("catalog"); - SinkTaskContext context = mock(SinkTaskContext.class); - TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0); - when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition)); - - IcebergWriterResult writeResult = - new IcebergWriterResult( - TableIdentifier.parse(TABLE_NAME), - ImmutableList.of(EventTestUtil.createDataFile()), - ImmutableList.of(), - StructType.of()); - - Map offsets = - ImmutableMap.of(topicPartition, new Offset(1L, EventTestUtil.now())); - - SinkWriterResult sinkWriterResult = - new SinkWriterResult(ImmutableList.of(writeResult), offsets); - SinkWriter sinkWriter = mock(SinkWriter.class); - when(sinkWriter.completeWrite()).thenReturn(sinkWriterResult); - - Worker worker = new Worker(config, clientFactory, sinkWriter, context); - worker.start(); - - // init consumer after subscribe() - initConsumer(); - - // save a record - Map value = ImmutableMap.of(); - SinkRecord rec = new SinkRecord(SRC_TOPIC_NAME, 0, null, "key", null, value, 0L); - worker.save(ImmutableList.of(rec)); - - UUID commitId = UUID.randomUUID(); - Event commitRequest = new Event(config.connectGroupId(), new StartCommit(commitId)); - byte[] bytes = AvroUtil.encode(commitRequest); - consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); - - worker.process(); - - assertThat(producer.history()).hasSize(2); - - Event event = AvroUtil.decode(producer.history().get(0).value()); - assertThat(event.payload().type()).isEqualTo(PayloadType.DATA_WRITTEN); - DataWritten dataWritten = (DataWritten) event.payload(); - assertThat(dataWritten.commitId()).isEqualTo(commitId); - - event = AvroUtil.decode(producer.history().get(1).value()); - assertThat(event.type()).isEqualTo(PayloadType.DATA_COMPLETE); - DataComplete dataComplete = (DataComplete) event.payload(); - assertThat(dataComplete.commitId()).isEqualTo(commitId); - assertThat(dataComplete.assignments()).hasSize(1); - assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L); + try (MockedStatic mockKafkaUtils = mockStatic(KafkaUtils.class)) { + ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); + mockKafkaUtils + .when(() -> KafkaUtils.consumerGroupMetadata(any())) + .thenReturn(consumerGroupMetadata); + + SinkTaskContext context = mock(SinkTaskContext.class); + TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0); + when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition)); + + IcebergWriterResult writeResult = + new IcebergWriterResult( + TableIdentifier.parse(TABLE_NAME), + ImmutableList.of(EventTestUtil.createDataFile()), + ImmutableList.of(), + StructType.of()); + + Map offsets = + ImmutableMap.of(topicPartition, new Offset(1L, EventTestUtil.now())); + + SinkWriterResult sinkWriterResult = + new SinkWriterResult(ImmutableList.of(writeResult), offsets); + SinkWriter sinkWriter = mock(SinkWriter.class); + when(sinkWriter.completeWrite()).thenReturn(sinkWriterResult); + + Worker worker = new Worker(config, clientFactory, sinkWriter, context); + worker.start(); + + // init consumer after subscribe() + initConsumer(); + + // save a record + Map value = ImmutableMap.of(); + SinkRecord rec = new SinkRecord(SRC_TOPIC_NAME, 0, null, "key", null, value, 0L); + worker.save(ImmutableList.of(rec)); + + UUID commitId = UUID.randomUUID(); + Event commitRequest = new Event(config.connectGroupId(), new StartCommit(commitId)); + byte[] bytes = AvroUtil.encode(commitRequest); + consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); + + worker.process(); + + assertThat(producer.history()).hasSize(2); + + Event event = AvroUtil.decode(producer.history().get(0).value()); + assertThat(event.payload().type()).isEqualTo(PayloadType.DATA_WRITTEN); + DataWritten dataWritten = (DataWritten) event.payload(); + assertThat(dataWritten.commitId()).isEqualTo(commitId); + + event = AvroUtil.decode(producer.history().get(1).value()); + assertThat(event.type()).isEqualTo(PayloadType.DATA_COMPLETE); + DataComplete dataComplete = (DataComplete) event.payload(); + assertThat(dataComplete.commitId()).isEqualTo(commitId); + assertThat(dataComplete.assignments()).hasSize(1); + assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L); + } } }