Skip to content

Commit

Permalink
Fix confluentinc#75 by passing original properties map
Browse files Browse the repository at this point in the history
The issue being fixed is that the way this interface is currently
designed leads to Partitioner being effectively un-extensible, unless
they don't need any parameters at all except those defined by the
connectors using the partitioner.

The new design puts the onus of turning properties into a configuration
map on the partitioner class. Because those classes use recommenders and
also use common storage configuration, the interface was extended with
two recommender getters, with a default implementation.

TimestampExtractor also gets the same treatment, as it is used from some
of the Partitioner implementations.

While this does break API, the existing API is difficult to extend to
begin with, which is the reason for the change.
  • Loading branch information
dcsobral committed Jul 12, 2018
1 parent 3cbebc4 commit 282c14c
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@

public class DailyPartitioner<T> extends TimeBasedPartitioner<T> {
@Override
public void configure(Map<String, Object> config) {
long partitionDurationMs = TimeUnit.DAYS.toMillis(1);
public void configure(Map<String, String> props) {
String partitionDurationMs = String.valueOf(TimeUnit.DAYS.toMillis(1));

String delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
PartitionerConfig localConf = new PartitionerConfig(
PartitionerConfig.getConfig(getPartitionerRecommender(), getStorageRecommender()),
props);
String delim = localConf.getString(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
String pathFormat = "'year'=YYYY" + delim + "'month'=MM" + delim + "'day'=dd";

config.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, partitionDurationMs);
config.put(PartitionerConfig.PATH_FORMAT_CONFIG, pathFormat);
props.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, partitionDurationMs);
props.put(PartitionerConfig.PATH_FORMAT_CONFIG, pathFormat);

super.configure(config);
super.configure(props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ public class DefaultPartitioner<T> implements Partitioner<T> {
private static final String SCHEMA_GENERATOR_CLASS =
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator";

protected Map<String, Object> config;
protected PartitionerConfig config;
protected Map<String, String> props;
protected List<T> partitionFields = null;
protected String delim;

@Override
public void configure(Map<String, Object> config) {
this.config = config;
delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
public void configure(Map<String, String> props) {
this.props = props;
config = new PartitionerConfig(
PartitionerConfig.getConfig(getPartitionerRecommender(), getStorageRecommender()),
props);
delim = config.getString(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
}

@Override
Expand All @@ -58,13 +62,14 @@ public String generatePartitionedPath(String topic, String encodedPartition) {
@Override
public List<T> partitionFields() {
if (partitionFields == null) {
partitionFields = newSchemaGenerator(config).newPartitionFields(PARTITION_FIELD);
partitionFields = newSchemaGenerator(props)
.newPartitionFields(PARTITION_FIELD);
}
return partitionFields;
}

@SuppressWarnings("unchecked")
public SchemaGenerator<T> newSchemaGenerator(Map<String, Object> config) {
public SchemaGenerator<T> newSchemaGenerator(Map<String, String> config) {
Class<? extends SchemaGenerator<T>> generatorClass = null;
try {
generatorClass = getSchemaGeneratorClass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,17 @@
import java.util.List;
import java.util.Map;

import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.errors.PartitionException;

public class FieldPartitioner<T> extends DefaultPartitioner<T> {
private static final Logger log = LoggerFactory.getLogger(FieldPartitioner.class);
private List<String> fieldNames;


@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, Object> config) {
fieldNames = (List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
public void configure(Map<String, String> props) {
super.configure(props);
fieldNames = config.getList(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
}

@Override
Expand Down Expand Up @@ -87,7 +85,7 @@ public String encodePartition(SinkRecord sinkRecord) {
@Override
public List<T> partitionFields() {
if (partitionFields == null) {
partitionFields = newSchemaGenerator(config).newPartitionFields(
partitionFields = newSchemaGenerator(props).newPartitionFields(
Utils.join(fieldNames, ",")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@

public class HourlyPartitioner<T> extends TimeBasedPartitioner<T> {
@Override
public void configure(Map<String, Object> config) {
long partitionDurationMs = TimeUnit.HOURS.toMillis(1);
public void configure(Map<String, String> props) {
String partitionDurationMs = String.valueOf(TimeUnit.HOURS.toMillis(1));

String delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
String delim = props.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
String pathFormat =
"'year'=YYYY" + delim + "'month'=MM" + delim + "'day'=dd" + delim + "'hour'=HH";

config.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, partitionDurationMs);
config.put(PartitionerConfig.PATH_FORMAT_CONFIG, pathFormat);
props.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, partitionDurationMs);
props.put(PartitionerConfig.PATH_FORMAT_CONFIG, pathFormat);

super.configure(config);
super.configure(props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,30 @@

package io.confluent.connect.storage.partitioner;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.List;
import java.util.Map;

import io.confluent.connect.storage.common.GenericRecommender;

/**
* Partition incoming records, and generates directories and file names in which to store the
* incoming records.
*
* @param <T> The type representing the field schemas.
*/
public interface Partitioner<T> {
void configure(Map<String, Object> config);
default ConfigDef.Recommender getPartitionerRecommender() {
return new GenericRecommender();
}

default ConfigDef.Recommender getStorageRecommender() {
return new GenericRecommender();
}

void configure(Map<String, String> props);

String encodePartition(SinkRecord sinkRecord);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import org.apache.kafka.common.config.ConfigException;

import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.storage.common.ComposableConfig;
import io.confluent.connect.storage.common.StorageCommonConfig;

public class PartitionerConfig extends AbstractConfig implements ComposableConfig {

Expand Down Expand Up @@ -205,6 +208,29 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom
return configDef;
}

public static ConfigDef getConfig(ConfigDef.Recommender partitionerClassRecommender,
ConfigDef.Recommender storageClassRecommender) {
// Define the names of the configurations we're going to override
Set<String> skip = new HashSet<>();

// Order added is important, so that group order is maintained
ConfigDef visible = new ConfigDef();
addAllConfigKeys(visible, newConfigDef(partitionerClassRecommender), skip);
addAllConfigKeys(visible, StorageCommonConfig.newConfigDef(storageClassRecommender), skip);

// Add overridden configurations here, if you add them

return visible;
}

private static void addAllConfigKeys(ConfigDef container, ConfigDef other, Set<String> skip) {
for (ConfigDef.ConfigKey key : other.configKeys().values()) {
if (skip != null && !skip.contains(key.name)) {
container.define(key);
}
}
}

public static class BooleanParentRecommender implements ConfigDef.Recommender {
protected final String parentConfigName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Map;

import io.confluent.connect.storage.common.SchemaGenerator;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.common.util.StringUtils;
import io.confluent.connect.storage.errors.PartitionException;

Expand All @@ -60,16 +59,15 @@ protected void init(
String pathFormat,
Locale locale,
DateTimeZone timeZone,
Map<String, Object> config
Map<String, String> props
) {
delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
this.partitionDurationMs = partitionDurationMs;
this.pathFormat = pathFormat;
this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale);
try {
timestampExtractor = newTimestampExtractor(
(String) config.get(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG));
timestampExtractor.configure(config);
config.getString(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG));
timestampExtractor.configure(props);
} catch (IllegalArgumentException e) {
ConfigException ce = new ConfigException(
PartitionerConfig.PATH_FORMAT_CONFIG,
Expand Down Expand Up @@ -105,10 +103,10 @@ public TimestampExtractor getTimestampExtractor() {
}

@Override
public void configure(Map<String, Object> config) {
super.configure(config);
public void configure(Map<String, String> props) {
super.configure(props);
long partitionDurationMsProp =
(long) config.get(PartitionerConfig.PARTITION_DURATION_MS_CONFIG);
config.getLong(PartitionerConfig.PARTITION_DURATION_MS_CONFIG);
if (partitionDurationMsProp < 0) {
throw new ConfigException(
PartitionerConfig.PARTITION_DURATION_MS_CONFIG,
Expand All @@ -117,7 +115,7 @@ public void configure(Map<String, Object> config) {
);
}

String pathFormat = (String) config.get(PartitionerConfig.PATH_FORMAT_CONFIG);
String pathFormat = config.getString(PartitionerConfig.PATH_FORMAT_CONFIG);
if (StringUtils.isBlank(pathFormat) || pathFormat.equals(delim)) {
throw new ConfigException(
PartitionerConfig.PATH_FORMAT_CONFIG,
Expand All @@ -129,7 +127,7 @@ public void configure(Map<String, Object> config) {
pathFormat = pathFormat.substring(0, pathFormat.length() - delim.length());
}

String localeString = (String) config.get(PartitionerConfig.LOCALE_CONFIG);
String localeString = config.getString(PartitionerConfig.LOCALE_CONFIG);
if (StringUtils.isBlank(localeString)) {
throw new ConfigException(
PartitionerConfig.LOCALE_CONFIG,
Expand All @@ -138,7 +136,7 @@ public void configure(Map<String, Object> config) {
);
}

String timeZoneString = (String) config.get(PartitionerConfig.TIMEZONE_CONFIG);
String timeZoneString = config.getString(PartitionerConfig.TIMEZONE_CONFIG);
if (StringUtils.isBlank(timeZoneString)) {
throw new ConfigException(
PartitionerConfig.TIMEZONE_CONFIG,
Expand All @@ -149,7 +147,7 @@ public void configure(Map<String, Object> config) {

Locale locale = new Locale(localeString);
DateTimeZone timeZone = DateTimeZone.forID(timeZoneString);
init(partitionDurationMsProp, pathFormat, locale, timeZone, config);
init(partitionDurationMsProp, pathFormat, locale, timeZone, props);
}

@Override
Expand All @@ -172,7 +170,7 @@ public String encodePartition(SinkRecord sinkRecord) {
@Override
public List<T> partitionFields() {
if (partitionFields == null) {
partitionFields = newSchemaGenerator(config).newPartitionFields(pathFormat);
partitionFields = newSchemaGenerator(props).newPartitionFields(pathFormat);
}
return partitionFields;
}
Expand Down Expand Up @@ -217,7 +215,7 @@ public TimestampExtractor newTimestampExtractor(String extractorClassName) {

public static class WallclockTimestampExtractor implements TimestampExtractor {
@Override
public void configure(Map<String, Object> config) {}
public void configure(Map<String, String> props) {}

@Override
public Long extract(ConnectRecord<?> record) {
Expand All @@ -227,7 +225,7 @@ public Long extract(ConnectRecord<?> record) {

public static class RecordTimestampExtractor implements TimestampExtractor {
@Override
public void configure(Map<String, Object> config) {}
public void configure(Map<String, String> props) {}

@Override
public Long extract(ConnectRecord<?> record) {
Expand All @@ -240,8 +238,11 @@ public static class RecordFieldTimestampExtractor implements TimestampExtractor
private DateTimeFormatter dateTime;

@Override
public void configure(Map<String, Object> config) {
fieldName = (String) config.get(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG);
public void configure(Map<String, String> props) {
PartitionerConfig config = new PartitionerConfig(
PartitionerConfig.getConfig(getPartitionerRecommender(), getStorageRecommender()),
props);
fieldName = config.getString(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG);
dateTime = ISODateTimeFormat.dateTime();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@

package io.confluent.connect.storage.partitioner;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;

import java.util.Map;

import io.confluent.connect.storage.common.GenericRecommender;

public interface TimestampExtractor {
void configure(Map<String, Object> config);
default ConfigDef.Recommender getPartitionerRecommender() {
return new GenericRecommender();
}

default ConfigDef.Recommender getStorageRecommender() {
return new GenericRecommender();
}

void configure(Map<String, String> props);

Long extract(ConnectRecord<?> record);
}
Loading

0 comments on commit 282c14c

Please sign in to comment.