Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
Expand Down Expand Up @@ -120,4 +121,23 @@ public Object get(int i) {
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TableReference that = (TableReference) o;
return Objects.equals(catalog, that.catalog)
&& Objects.equals(namespace, that.namespace)
&& Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(catalog, namespace, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ private EventTestUtil() {}
static final Schema SCHEMA =
new Schema(ImmutableList.of(Types.NestedField.required(1, "id", Types.LongType.get())));

static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build();
static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build();

static final SortOrder ORDER =
SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC, NullOrder.NULLS_FIRST).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect;

import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.common.DynMethods.BoundMethod;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CatalogUtils {

private static final Logger LOG = LoggerFactory.getLogger(CatalogUtils.class.getName());
private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");

static Catalog loadCatalog(IcebergSinkConfig config) {
return CatalogUtil.buildIcebergCatalog(
config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
}

// use reflection here to avoid requiring Hadoop as a dependency
private static Object loadHadoopConfig(IcebergSinkConfig config) {
Class<?> configClass =
DynClasses.builder()
.impl("org.apache.hadoop.hdfs.HdfsConfiguration")
.impl("org.apache.hadoop.conf.Configuration")
.orNull()
.build();

if (configClass == null) {
LOG.info("Hadoop not found on classpath, not creating Hadoop config");
return null;
}

try {
Object result = DynConstructors.builder().hiddenImpl(configClass).build().newInstance();
BoundMethod addResourceMethod =
DynMethods.builder("addResource").impl(configClass, URL.class).build(result);
BoundMethod setMethod =
DynMethods.builder("set").impl(configClass, String.class, String.class).build(result);

// load any config files in the specified config directory
String hadoopConfDir = config.hadoopConfDir();
if (hadoopConfDir != null) {
HADOOP_CONF_FILES.forEach(
confFile -> {
Path path = Paths.get(hadoopConfDir, confFile);
if (Files.exists(path)) {
try {
addResourceMethod.invoke(path.toUri().toURL());
} catch (IOException e) {
LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e);
}
}
});
}

// set any Hadoop properties specified in the sink config
config.hadoopProps().forEach(setMethod::invoke);

LOG.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (Exception e) {
LOG.warn(
"Hadoop found on classpath but could not create config, proceeding without config", e);
}
return null;
}

private CatalogUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect;

import java.util.Collection;
import org.apache.iceberg.catalog.Catalog;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public interface Committer {
void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context);

void stop();

void save(Collection<SinkRecord> sinkRecords);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect;

import org.apache.iceberg.connect.channel.CommitterImpl;

class CommitterFactory {
static Committer createCommitter(IcebergSinkConfig config) {
return new CommitterImpl();
}

private CommitterFactory() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id";
private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms";
private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000;
private static final String COMMIT_TIMEOUT_MS_PROP = "iceberg.control.commit.timeout-ms";
Expand All @@ -104,11 +103,7 @@ public class IcebergSinkConfig extends AbstractConfig {
public static final ConfigDef CONFIG_DEF = newConfigDef();

public static String version() {
String kcVersion = IcebergSinkConfig.class.getPackage().getImplementationVersion();
if (kcVersion == null) {
kcVersion = "unknown";
}
return IcebergBuild.version() + "-kc-" + kcVersion;
return IcebergBuild.version();
}

private static ConfigDef newConfigDef() {
Expand Down Expand Up @@ -185,12 +180,6 @@ private static ConfigDef newConfigDef() {
DEFAULT_CONTROL_TOPIC,
Importance.MEDIUM,
"Name of the control topic");
configDef.define(
CONTROL_GROUP_ID_PROP,
ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Name of the consumer group to store offsets");
configDef.define(
CONNECT_GROUP_ID_PROP,
ConfigDef.Type.STRING,
Expand Down Expand Up @@ -370,16 +359,6 @@ public String controlTopic() {
return getString(CONTROL_TOPIC_PROP);
}

public String controlGroupId() {
String result = getString(CONTROL_GROUP_ID_PROP);
if (result != null) {
return result;
}
String connectorName = connectorName();
Preconditions.checkNotNull(connectorName, "Connector name cannot be null");
return DEFAULT_CONTROL_GROUP_PREFIX + connectorName;
}

public String connectGroupId() {
String result = getString(CONNECT_GROUP_ID_PROP);
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ public void start(Map<String, String> connectorProps) {

@Override
public Class<? extends Task> taskClass() {
// FIXME: update this when the connector channel is added
// return IcebergSinkTask.class;
return null;
return IcebergSinkTask.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect;

import java.util.Collection;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergSinkTask extends SinkTask {

private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class);

private IcebergSinkConfig config;
private Catalog catalog;
private Committer committer;

@Override
public String version() {
return IcebergSinkConfig.version();
}

@Override
public void start(Map<String, String> props) {
this.config = new IcebergSinkConfig(props);
}

@Override
public void open(Collection<TopicPartition> partitions) {
Preconditions.checkArgument(catalog == null, "Catalog already open");
Preconditions.checkArgument(committer == null, "Committer already open");

catalog = CatalogUtils.loadCatalog(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open method is called with only the newly assigned partitions. Is there a strong reason to pass just the newly assigned partitions to the Committer.start method when the Committer can just retrieve all partitions assigned to this task via context.assignment anyway?

I'm also worried we might have a bug here. The Committer implementation uses this partitions argument to check if partition 0 of the first topic is assigned to this task and if so, it spawns a Coordinator process. I'm worried that if there was a rebalance where the partition 0 of the first topic doesn't move between tasks, then it would not be included in the partitions argument for any Task and thus we could potentially end up with a Connector that doesn't have any Coordinator process running on any Task. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not my understanding, open() will be called with the new assignment, i.e. all assigned topic partitions. See the javadoc: "The list of partitions that are now assigned to the task (may include partitions previously assigned to the task)"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did read the javadoc and interpreted it differently but I can totally see how you reached your interpretation as well.

The reason I'm inclined towards my interpretation is because I know SinkTask.open is ultimately called by the ConsumerRebalanceListener.onPartitionsAssigned method (here in the Kafka source code) and if you look at the javadocs for that method, it says The list of partitions that are now assigned to the consumer (previously owned partitions will NOT be included, i.e. this list will only include newly added partitions).

I could still be wrong so feel free to test things out or ask in the kafka community.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it does seem like you're right. I changed this to get the consumer assignment instead.

committer = CommitterFactory.createCommitter(config);
committer.start(catalog, config, context);
}

@Override
public void close(Collection<TopicPartition> partitions) {
close();
}

private void close() {
if (committer != null) {
committer.stop();
committer = null;
}

if (catalog != null) {
if (catalog instanceof AutoCloseable) {
try {
((AutoCloseable) catalog).close();
} catch (Exception e) {
LOG.warn("An error occurred closing catalog instance, ignoring...", e);
}
}
catalog = null;
}
}
Comment on lines +61 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you move these close methods so they're after preCommit but before stop? Just so these methods are arranged in life-cycle order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow your suggestion. When the KC close() or stop() lifecycle methods are called, we close the committer and the catalog.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he means just rearrange the code, keep close method just before stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have this order to prevent a running committer from having reference to a closed catalog.


@Override
public void put(Collection<SinkRecord> sinkRecords) {
Preconditions.checkNotNull(committer, "Committer wasn't initialized");
committer.save(sinkRecords);
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
Preconditions.checkNotNull(committer, "Committer wasn't initialized");
committer.save(null);
}

@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// offset commit is handled by the worker
return ImmutableMap.of();
}

@Override
public void stop() {
close();
}
}
Loading