Skip to content

Commit 2c8015f

Browse files
chenjian2664ebyhr
authored andcommitted
Convert TopicAndSubjects to record
1 parent 39ff3d8 commit 2c8015f

File tree

1 file changed

+16
-56
lines changed

1 file changed

+16
-56
lines changed

plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java

Lines changed: 16 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.Collection;
3636
import java.util.List;
3737
import java.util.Map;
38-
import java.util.Objects;
3938
import java.util.Optional;
4039
import java.util.Set;
4140
import java.util.function.Supplier;
@@ -165,7 +164,7 @@ private SetMultimap<String, TopicAndSubjects> getTopicAndSubjects()
165164
topic,
166165
getKeySubjectFromTopic(topic, entry.getValue()),
167166
getValueSubjectFromTopic(topic, entry.getValue()));
168-
topicSubjectsCacheBuilder.put(topicAndSubjects.getTableName(), topicAndSubjects);
167+
topicSubjectsCacheBuilder.put(topicAndSubjects.tableName(), topicAndSubjects);
169168
}
170169
return topicSubjectsCacheBuilder.build();
171170
}
@@ -176,7 +175,7 @@ public Optional<KafkaTopicDescription> getTopicDescription(ConnectorSession sess
176175
requireNonNull(schemaTableName, "schemaTableName is null");
177176
TopicAndSubjects topicAndSubjects = parseTopicAndSubjects(schemaTableName);
178177

179-
String tableName = topicAndSubjects.getTableName();
178+
String tableName = topicAndSubjects.tableName();
180179
if (topicAndSubjectsSupplier.get().containsKey(tableName)) {
181180
// Use the topic from cache, if present, in case the topic is mixed case
182181
Collection<TopicAndSubjects> topicAndSubjectsCollection = topicAndSubjectsSupplier.get().get(tableName);
@@ -186,21 +185,21 @@ public Optional<KafkaTopicDescription> getTopicDescription(ConnectorSession sess
186185
format(
187186
"Unable to access '%s' table. Subject is ambiguous, and may refer to one of the following: %s",
188187
schemaTableName.getTableName(),
189-
topicAndSubjectsCollection.stream().map(TopicAndSubjects::getTopic).collect(joining(", "))));
188+
topicAndSubjectsCollection.stream().map(TopicAndSubjects::topic).collect(joining(", "))));
190189
}
191190
TopicAndSubjects topicAndSubjectsFromCache = getOnlyElement(topicAndSubjectsCollection);
192191
topicAndSubjects = new TopicAndSubjects(
193-
topicAndSubjectsFromCache.getTopic(),
194-
topicAndSubjects.getKeySubject().or(topicAndSubjectsFromCache::getKeySubject),
195-
topicAndSubjects.getValueSubject().or(topicAndSubjectsFromCache::getValueSubject));
192+
topicAndSubjectsFromCache.topic(),
193+
topicAndSubjects.keySubject().or(topicAndSubjectsFromCache::keySubject),
194+
topicAndSubjects.valueSubject().or(topicAndSubjectsFromCache::valueSubject));
196195
}
197196

198-
if (topicAndSubjects.getKeySubject().isEmpty() && topicAndSubjects.getValueSubject().isEmpty()) {
197+
if (topicAndSubjects.keySubject().isEmpty() && topicAndSubjects.valueSubject().isEmpty()) {
199198
return Optional.empty();
200199
}
201-
Optional<KafkaTopicFieldGroup> key = topicAndSubjects.getKeySubject().map(subject -> getFieldGroup(session, subject));
202-
Optional<KafkaTopicFieldGroup> message = topicAndSubjects.getValueSubject().map(subject -> getFieldGroup(session, subject));
203-
return Optional.of(new KafkaTopicDescription(tableName, Optional.of(schemaTableName.getSchemaName()), topicAndSubjects.getTopic(), key, message));
200+
Optional<KafkaTopicFieldGroup> key = topicAndSubjects.keySubject().map(subject -> getFieldGroup(session, subject));
201+
Optional<KafkaTopicFieldGroup> message = topicAndSubjects.valueSubject().map(subject -> getFieldGroup(session, subject));
202+
return Optional.of(new KafkaTopicDescription(tableName, Optional.of(schemaTableName.getSchemaName()), topicAndSubjects.topic(), key, message));
204203
}
205204

206205
private KafkaTopicFieldGroup getFieldGroup(ConnectorSession session, String subject)
@@ -307,57 +306,18 @@ private static Optional<String> getValueSubjectFromTopic(String topic, Collectio
307306
return Optional.empty();
308307
}
309308

310-
private static class TopicAndSubjects
309+
private record TopicAndSubjects(String topic, Optional<String> keySubject, Optional<String> valueSubject)
311310
{
312-
private final Optional<String> keySubject;
313-
private final Optional<String> valueSubject;
314-
private final String topic;
315-
316-
public TopicAndSubjects(String topic, Optional<String> keySubject, Optional<String> valueSubject)
311+
private TopicAndSubjects
317312
{
318-
this.topic = requireNonNull(topic, "topic is null");
319-
this.keySubject = requireNonNull(keySubject, "keySubject is null");
320-
this.valueSubject = requireNonNull(valueSubject, "valueSubject is null");
313+
requireNonNull(topic, "topic is null");
314+
requireNonNull(keySubject, "keySubject is null");
315+
requireNonNull(valueSubject, "valueSubject is null");
321316
}
322317

323-
public String getTableName()
318+
public String tableName()
324319
{
325320
return topic.toLowerCase(ENGLISH);
326321
}
327-
328-
public String getTopic()
329-
{
330-
return topic;
331-
}
332-
333-
public Optional<String> getKeySubject()
334-
{
335-
return keySubject;
336-
}
337-
338-
public Optional<String> getValueSubject()
339-
{
340-
return valueSubject;
341-
}
342-
343-
@Override
344-
public boolean equals(Object other)
345-
{
346-
if (this == other) {
347-
return true;
348-
}
349-
if (!(other instanceof TopicAndSubjects that)) {
350-
return false;
351-
}
352-
return topic.equals(that.topic) &&
353-
keySubject.equals(that.keySubject) &&
354-
valueSubject.equals(that.valueSubject);
355-
}
356-
357-
@Override
358-
public int hashCode()
359-
{
360-
return Objects.hash(topic, keySubject, valueSubject);
361-
}
362322
}
363323
}

0 commit comments

Comments
 (0)