Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class AvroConfluentContentSchemaProvider
public class ConfluentContentSchemaProvider
extends AbstractContentSchemaProvider
{
private final SchemaRegistryClient schemaRegistryClient;

@Inject
public AvroConfluentContentSchemaProvider(SchemaRegistryClient schemaRegistryClient)
public ConfluentContentSchemaProvider(SchemaRegistryClient schemaRegistryClient)
{
this.schemaRegistryClient = requireNonNull(schemaRegistryClient, "schemaRegistryClient is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected void setup(Binder binder)
configBinder(binder).bindConfig(ConfluentSchemaRegistryConfig.class);
install(new ConfluentDecoderModule());
install(new ConfluentEncoderModule());
binder.bind(ContentSchemaProvider.class).to(AvroConfluentContentSchemaProvider.class).in(Scopes.SINGLETON);
binder.bind(ContentSchemaProvider.class).to(ConfluentContentSchemaProvider.class).in(Scopes.SINGLETON);
newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class);
newSetBinder(binder, SchemaProvider.class).addBinding().to(AvroSchemaProvider.class).in(Scopes.SINGLETON);
// Each SchemaRegistry object should have a new instance of SchemaProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -165,7 +164,7 @@ private SetMultimap<String, TopicAndSubjects> getTopicAndSubjects()
topic,
getKeySubjectFromTopic(topic, entry.getValue()),
getValueSubjectFromTopic(topic, entry.getValue()));
topicSubjectsCacheBuilder.put(topicAndSubjects.getTableName(), topicAndSubjects);
topicSubjectsCacheBuilder.put(topicAndSubjects.tableName(), topicAndSubjects);
}
return topicSubjectsCacheBuilder.build();
}
Expand All @@ -176,7 +175,7 @@ public Optional<KafkaTopicDescription> getTopicDescription(ConnectorSession sess
requireNonNull(schemaTableName, "schemaTableName is null");
TopicAndSubjects topicAndSubjects = parseTopicAndSubjects(schemaTableName);

String tableName = topicAndSubjects.getTableName();
String tableName = topicAndSubjects.tableName();
if (topicAndSubjectsSupplier.get().containsKey(tableName)) {
// Use the topic from cache, if present, in case the topic is mixed case
Collection<TopicAndSubjects> topicAndSubjectsCollection = topicAndSubjectsSupplier.get().get(tableName);
Expand All @@ -186,21 +185,21 @@ public Optional<KafkaTopicDescription> getTopicDescription(ConnectorSession sess
format(
"Unable to access '%s' table. Subject is ambiguous, and may refer to one of the following: %s",
schemaTableName.getTableName(),
topicAndSubjectsCollection.stream().map(TopicAndSubjects::getTopic).collect(joining(", "))));
topicAndSubjectsCollection.stream().map(TopicAndSubjects::topic).collect(joining(", "))));
}
TopicAndSubjects topicAndSubjectsFromCache = getOnlyElement(topicAndSubjectsCollection);
topicAndSubjects = new TopicAndSubjects(
topicAndSubjectsFromCache.getTopic(),
topicAndSubjects.getKeySubject().or(topicAndSubjectsFromCache::getKeySubject),
topicAndSubjects.getValueSubject().or(topicAndSubjectsFromCache::getValueSubject));
topicAndSubjectsFromCache.topic(),
topicAndSubjects.keySubject().or(topicAndSubjectsFromCache::keySubject),
topicAndSubjects.valueSubject().or(topicAndSubjectsFromCache::valueSubject));
}

if (topicAndSubjects.getKeySubject().isEmpty() && topicAndSubjects.getValueSubject().isEmpty()) {
if (topicAndSubjects.keySubject().isEmpty() && topicAndSubjects.valueSubject().isEmpty()) {
return Optional.empty();
}
Optional<KafkaTopicFieldGroup> key = topicAndSubjects.getKeySubject().map(subject -> getFieldGroup(session, subject));
Optional<KafkaTopicFieldGroup> message = topicAndSubjects.getValueSubject().map(subject -> getFieldGroup(session, subject));
return Optional.of(new KafkaTopicDescription(tableName, Optional.of(schemaTableName.getSchemaName()), topicAndSubjects.getTopic(), key, message));
Optional<KafkaTopicFieldGroup> key = topicAndSubjects.keySubject().map(subject -> getFieldGroup(session, subject));
Optional<KafkaTopicFieldGroup> message = topicAndSubjects.valueSubject().map(subject -> getFieldGroup(session, subject));
return Optional.of(new KafkaTopicDescription(tableName, Optional.of(schemaTableName.getSchemaName()), topicAndSubjects.topic(), key, message));
}

private KafkaTopicFieldGroup getFieldGroup(ConnectorSession session, String subject)
Expand Down Expand Up @@ -307,57 +306,18 @@ private static Optional<String> getValueSubjectFromTopic(String topic, Collectio
return Optional.empty();
}

private static class TopicAndSubjects
private record TopicAndSubjects(String topic, Optional<String> keySubject, Optional<String> valueSubject)
{
private final Optional<String> keySubject;
private final Optional<String> valueSubject;
private final String topic;

public TopicAndSubjects(String topic, Optional<String> keySubject, Optional<String> valueSubject)
private TopicAndSubjects
{
this.topic = requireNonNull(topic, "topic is null");
this.keySubject = requireNonNull(keySubject, "keySubject is null");
this.valueSubject = requireNonNull(valueSubject, "valueSubject is null");
requireNonNull(topic, "topic is null");
requireNonNull(keySubject, "keySubject is null");
requireNonNull(valueSubject, "valueSubject is null");
}

public String getTableName()
public String tableName()
{
return topic.toLowerCase(ENGLISH);
}

public String getTopic()
{
return topic;
}

public Optional<String> getKeySubject()
{
return keySubject;
}

public Optional<String> getValueSubject()
{
return valueSubject;
}

@Override
public boolean equals(Object other)
{
if (this == other) {
return true;
}
if (!(other instanceof TopicAndSubjects that)) {
return false;
}
return topic.equals(that.topic) &&
keySubject.equals(that.keySubject) &&
valueSubject.equals(that.valueSubject);
}

@Override
public int hashCode()
{
return Objects.hash(topic, keySubject, valueSubject);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestAvroConfluentContentSchemaProvider
public class TestConfluentContentSchemaProvider
{
private static final String TOPIC = "test";
private static final String SUBJECT_NAME = format("%s-value", TOPIC);
Expand All @@ -44,7 +44,7 @@ public void testAvroConfluentSchemaProvider()
MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
AvroSchema schema = getAvroSchema();
mockSchemaRegistryClient.register(SUBJECT_NAME, schema);
AvroConfluentContentSchemaProvider avroConfluentSchemaProvider = new AvroConfluentContentSchemaProvider(mockSchemaRegistryClient);
ConfluentContentSchemaProvider avroConfluentSchemaProvider = new ConfluentContentSchemaProvider(mockSchemaRegistryClient);
KafkaTableHandle tableHandle = new KafkaTableHandle("default", TOPIC, TOPIC, AvroRowDecoderFactory.NAME, AvroRowDecoderFactory.NAME, Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(SUBJECT_NAME), ImmutableList.of(), TupleDomain.all());
assertThat(avroConfluentSchemaProvider.getMessage(tableHandle)).isEqualTo(Optional.of(schema).map(AvroSchema::toString));
assertThat(avroConfluentSchemaProvider.getKey(tableHandle)).isEqualTo(Optional.empty());
Expand All @@ -64,7 +64,7 @@ public void testAvroSchemaWithReferences()
.orElseThrow();
mockSchemaRegistryClient.register(SUBJECT_NAME, schemaWithReference);

AvroConfluentContentSchemaProvider avroConfluentSchemaProvider = new AvroConfluentContentSchemaProvider(mockSchemaRegistryClient);
ConfluentContentSchemaProvider avroConfluentSchemaProvider = new ConfluentContentSchemaProvider(mockSchemaRegistryClient);
assertThat(avroConfluentSchemaProvider.readSchema(Optional.empty(), Optional.of(SUBJECT_NAME)).map(schema -> new Parser().parse(schema))).isPresent();
}

Expand Down