diff --git a/docs/hdfs_connector.rst b/docs/hdfs_connector.rst
index 8d42fe287..cb8bc3a0e 100644
--- a/docs/hdfs_connector.rst
+++ b/docs/hdfs_connector.rst
@@ -306,7 +306,7 @@ To work with secure HDFS and Hive metastore, you need to specify ``hdfs.authenti
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal
-You need to create the Kafka connect principals and keytab files via Kerboros and distribute the
+You need to create the Kafka connect principals and keytab files via Kerberos and distribute the
keytab file to all hosts that running the connector and ensures that only the connector user
has read access to the keytab file.
diff --git a/pom.xml b/pom.xml
index 5afa18e1f..2eb636f35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,39 +84,29 @@
${confluent.version}
- org.apache.hadoop
- hadoop-client
- ${hadoop.version}
-
-
- org.apache.hive
- hive-cli
- ${hive.version}
-
-
- org.apache.hive
- hive-common
- ${hive.version}
+ io.confluent
+ kafka-connect-storage-common
+ ${confluent.version}
- org.apache.avro
- avro-mapred
- ${avro.version}
+ io.confluent
+ kafka-connect-storage-core
+ ${confluent.version}
- org.apache.parquet
- parquet-column
- ${parquet.version}
+ io.confluent
+ kafka-connect-storage-format
+ ${confluent.version}
- org.apache.parquet
- parquet-avro
- ${parquet.version}
+ io.confluent
+ kafka-connect-storage-partitioner
+ ${confluent.version}
- commons-io
- commons-io
- ${commons-io.version}
+ io.confluent
+ kafka-connect-storage-wal
+ ${confluent.version}
junit
@@ -207,6 +197,7 @@
src/assembly/development.xml
src/assembly/package.xml
+ false
diff --git a/src/assembly/development.xml b/src/assembly/development.xml
index 498a914a2..96cf78082 100644
--- a/src/assembly/development.xml
+++ b/src/assembly/development.xml
@@ -19,7 +19,6 @@
shipped with Confluent platform and other dependencies such as Hadoop and Avro.
This allows correctly setup of CLASSPATH in kafka-run-class.sh when running
kafka connect hdfs connector. -->
- org.apache.kafka:connect-api
org.mortbay.jetty:*
com.sun.jersey:*
org.eclipse.jetty.aggregate:jetty-all
diff --git a/src/assembly/package.xml b/src/assembly/package.xml
index 3d9dae049..449606b93 100644
--- a/src/assembly/package.xml
+++ b/src/assembly/package.xml
@@ -38,11 +38,16 @@
shipped with Confluent platform and other dependencies such as Hadoop and Avro.
This allows correctly setup of CLASSPATH in kafka-run-class.sh when running
kafka connect hdfs connector. -->
- org.apache.kafka:connect-api
org.mortbay.jetty:*
com.sun.jersey:*
org.eclipse.jetty.aggregate:jetty-all
com.sun.jersey.contribs:jersey-guice
+ io.confluent:kafka-connect-storage-common
+ io.confluent:kafka-connect-storage-core
+ io.confluent:kafka-connect-storage-format
+ io.confluent:kafka-connect-storage-hive
+ io.confluent:kafka-connect-storage-partitioner
+ io.confluent:kafka-connect-storage-wal
diff --git a/src/main/java/io/confluent/connect/hdfs/DataWriter.java b/src/main/java/io/confluent/connect/hdfs/DataWriter.java
index 62f00c12e..9616afc28 100644
--- a/src/main/java/io/confluent/connect/hdfs/DataWriter.java
+++ b/src/main/java/io/confluent/connect/hdfs/DataWriter.java
@@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.HashMap;
@@ -51,22 +52,29 @@
import io.confluent.connect.hdfs.hive.HiveMetaStore;
import io.confluent.connect.hdfs.hive.HiveUtil;
import io.confluent.connect.hdfs.partitioner.Partitioner;
+import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.storage.Storage;
-import io.confluent.connect.hdfs.storage.StorageFactory;
+import io.confluent.connect.storage.common.StorageCommonConfig;
+import io.confluent.connect.storage.format.SchemaFileReader;
+import io.confluent.connect.storage.hive.HiveConfig;
+import io.confluent.connect.storage.partitioner.PartitionerConfig;
public class DataWriter {
private static final Logger log = LoggerFactory.getLogger(DataWriter.class);
private Map topicPartitionWriters;
private String url;
- private Storage storage;
- private Configuration conf;
+ private HdfsStorage storage;
private String topicsDir;
private Format format;
+ private RecordWriterProvider writerProvider;
+ private io.confluent.connect.storage.format.RecordWriterProvider
+ newWriterProvider;
+ private io.confluent.connect.storage.format.SchemaFileReader
+ schemaFileReader;
+ private io.confluent.connect.storage.format.Format newFormat;
private Set assignment;
private Partitioner partitioner;
- private RecordWriterProvider writerProvider;
- private SchemaFileReader schemaFileReader;
private Map offsets;
private HdfsSinkConnectorConfig connectorConfig;
private AvroData avroData;
@@ -91,7 +99,7 @@ public DataWriter(HdfsSinkConnectorConfig connectorConfig, SinkTaskContext conte
String hadoopConfDir = connectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_CONF_DIR_CONFIG);
log.info("Hadoop configuration directory {}", hadoopConfDir);
- conf = new Configuration();
+ Configuration conf = connectorConfig.getHadoopConfiguration();
if (!hadoopConfDir.equals("")) {
conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
@@ -105,7 +113,7 @@ public DataWriter(HdfsSinkConnectorConfig connectorConfig, SinkTaskContext conte
if (principalConfig == null || keytab == null) {
throw new ConfigException(
- "Hadoop is using Kerboros for authentication, you need to provide both a connect principal and "
+ "Hadoop is using Kerberos for authentication, you need to provide both a connect principal and "
+ "the path to the keytab of the principal.");
}
@@ -159,32 +167,104 @@ public void run() {
}
url = connectorConfig.getString(HdfsSinkConnectorConfig.HDFS_URL_CONFIG);
- topicsDir = connectorConfig.getString(HdfsSinkConnectorConfig.TOPICS_DIR_CONFIG);
+ topicsDir = connectorConfig.getString(StorageCommonConfig.TOPICS_DIR_CONFIG);
String logsDir = connectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG);
@SuppressWarnings("unchecked")
- Class extends Storage> storageClass = (Class extends Storage>) Class
- .forName(connectorConfig.getString(HdfsSinkConnectorConfig.STORAGE_CLASS_CONFIG));
- storage = StorageFactory.createStorage(storageClass, conf, url);
+ Class extends HdfsStorage> storageClass = (Class extends HdfsStorage>) connectorConfig
+ .getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG);
+ storage = io.confluent.connect.storage.StorageFactory.createStorage(
+ storageClass,
+ HdfsSinkConnectorConfig.class,
+ connectorConfig,
+ url
+ );
createDir(topicsDir);
createDir(topicsDir + HdfsSinkConnectorConstants.TEMPFILE_DIRECTORY);
createDir(logsDir);
- format = getFormat();
- writerProvider = format.getRecordWriterProvider();
- schemaFileReader = format.getSchemaFileReader(avroData);
+ // Try to instantiate as a new-style storage-common type class, then fall back to old-style with
+ // no parameters
+ try {
+ Class formatClass =
+ (Class) connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
+ newFormat = formatClass.getConstructor(HdfsStorage.class).newInstance(storage);
+ newWriterProvider = newFormat.getRecordWriterProvider();
+ schemaFileReader = newFormat.getSchemaFileReader();
+ } catch (NoSuchMethodException e) {
+ Class formatClass =
+ (Class) connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
+ format = formatClass.getConstructor().newInstance();
+ writerProvider = format.getRecordWriterProvider();
+ final io.confluent.connect.hdfs.SchemaFileReader oldReader
+ = format.getSchemaFileReader(avroData);
+ schemaFileReader = new SchemaFileReader() {
+ @Override
+ public Schema getSchema(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, Path path) {
+ try {
+ return oldReader.getSchema(hdfsSinkConnectorConfig.getHadoopConfiguration(), path);
+ } catch (IOException e) {
+ throw new ConnectException("Failed to get schema", e);
+ }
+ }
+
+ @Override
+ public Iterator