Skip to content

Commit 3ee7afd

Browse files
authored
fix: regression in auth token configuration (#9)
* fix: regression in auth token configuration The regression was caused by masking the auth token so Kafka Connect would not log it. For the avoidance of doubts: This issue could not be abused to get unauthorized access to QuestDB.
1 parent 0d0c6a7 commit 3ee7afd

File tree

6 files changed

+303
-192
lines changed

6 files changed

+303
-192
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.kafka.common.config.ConfigDef.Importance;
77
import org.apache.kafka.common.config.ConfigDef.Type;
88
import org.apache.kafka.common.config.ConfigException;
9+
import org.apache.kafka.common.config.types.Password;
910
import org.apache.kafka.connect.errors.ConnectException;
1011

1112
import java.util.Arrays;
@@ -153,8 +154,8 @@ public String getUsername() {
153154
return getString(USERNAME);
154155
}
155156

156-
public String getToken() {
157-
return getString(TOKEN);
157+
public Password getToken() {
158+
return getPassword(TOKEN);
158159
}
159160

160161
public boolean isTls() {

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

+4-14
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,15 @@
99
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
1010
import org.apache.kafka.common.TopicPartition;
1111
import org.apache.kafka.connect.data.Date;
12-
import org.apache.kafka.connect.data.Decimal;
13-
import org.apache.kafka.connect.data.Field;
14-
import org.apache.kafka.connect.data.Schema;
15-
import org.apache.kafka.connect.data.Struct;
16-
import org.apache.kafka.connect.data.Time;
17-
import org.apache.kafka.connect.data.Timestamp;
12+
import org.apache.kafka.connect.data.*;
1813
import org.apache.kafka.connect.errors.ConnectException;
1914
import org.apache.kafka.connect.errors.RetriableException;
2015
import org.apache.kafka.connect.sink.SinkRecord;
2116
import org.apache.kafka.connect.sink.SinkTask;
2217
import org.slf4j.Logger;
2318
import org.slf4j.LoggerFactory;
2419

25-
import java.util.Collection;
26-
import java.util.Collections;
27-
import java.util.HashSet;
28-
import java.util.List;
29-
import java.util.Map;
30-
import java.util.Set;
20+
import java.util.*;
3121
import java.util.concurrent.TimeUnit;
3222

3323
public final class QuestDBSinkTask extends SinkTask {
@@ -91,10 +81,10 @@ private Sender createSender() {
9181
}
9282
if (config.getToken() != null) {
9383
String username = config.getUsername();
94-
if (username == null || username.equals("")) {
84+
if (username == null || username.isEmpty()) {
9585
throw new ConnectException("Username cannot be empty when using ILP authentication");
9686
}
97-
builder.enableAuth(username).authToken(config.getToken());
87+
builder.enableAuth(username).authToken(config.getToken().value());
9888
}
9989
Sender rawSender = builder.build();
10090
String symbolColumns = config.getSymbolColumns();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.questdb.kafka;
2+
3+
import org.apache.kafka.connect.json.JsonConverter;
4+
import org.apache.kafka.connect.runtime.AbstractStatus;
5+
import org.apache.kafka.connect.runtime.ConnectorConfig;
6+
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
7+
import org.apache.kafka.connect.storage.StringConverter;
8+
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
9+
import org.awaitility.Awaitility;
10+
import org.testcontainers.containers.GenericContainer;
11+
12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Objects;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
19+
import static java.util.concurrent.TimeUnit.SECONDS;
20+
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
21+
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
22+
import static org.junit.jupiter.api.Assertions.fail;
23+
24+
public final class ConnectTestUtils {
25+
public static final long CONNECTOR_START_TIMEOUT_MS = SECONDS.toMillis(60);
26+
public static final String CONNECTOR_NAME = "questdb-sink-connector";
27+
private static final AtomicInteger ID_GEN = new AtomicInteger(0);
28+
29+
private ConnectTestUtils() {
30+
}
31+
32+
static void assertConnectorTaskRunningEventually(EmbeddedConnectCluster connect) {
33+
assertConnectorTaskStateEventually(connect, AbstractStatus.State.RUNNING);
34+
}
35+
36+
static void assertConnectorTaskFailedEventually(EmbeddedConnectCluster connect) {
37+
assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED);
38+
}
39+
40+
static void assertConnectorTaskStateEventually(EmbeddedConnectCluster connect, AbstractStatus.State expectedState) {
41+
Awaitility.await().atMost(CONNECTOR_START_TIMEOUT_MS, MILLISECONDS).untilAsserted(() -> assertConnectorTaskState(connect, CONNECTOR_NAME, expectedState));
42+
}
43+
44+
static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContainer, String topicName) {
45+
Map<String, String> props = new HashMap<>();
46+
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, QuestDBSinkConnector.class.getName());
47+
props.put("topics", topicName);
48+
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
49+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
50+
props.put("host", questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT));
51+
return props;
52+
}
53+
54+
static void assertConnectorTaskState(EmbeddedConnectCluster connect, String connectorName, AbstractStatus.State expectedState) {
55+
ConnectorStateInfo info = connect.connectorStatus(connectorName);
56+
if (info == null) {
57+
fail("Connector " + connectorName + " not found");
58+
}
59+
List<ConnectorStateInfo.TaskState> taskStates = info.tasks();
60+
if (taskStates.size() == 0) {
61+
fail("No tasks found for connector " + connectorName);
62+
}
63+
for (ConnectorStateInfo.TaskState taskState : taskStates) {
64+
if (!Objects.equals(taskState.state(), expectedState.toString())) {
65+
fail("Task " + taskState.id() + " for connector " + connectorName + " is in state " + taskState.state() + " but expected " + expectedState);
66+
}
67+
}
68+
}
69+
70+
static String newTopicName() {
71+
return "topic" + ID_GEN.getAndIncrement();
72+
}
73+
74+
static String newTableName() {
75+
return "table" + ID_GEN.getAndIncrement();
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package io.questdb.kafka;
2+
3+
import org.apache.kafka.connect.data.Schema;
4+
import org.apache.kafka.connect.data.SchemaBuilder;
5+
import org.apache.kafka.connect.data.Struct;
6+
import org.apache.kafka.connect.json.JsonConverter;
7+
import org.apache.kafka.connect.storage.Converter;
8+
import org.apache.kafka.connect.storage.ConverterConfig;
9+
import org.apache.kafka.connect.storage.ConverterType;
10+
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
import org.slf4j.LoggerFactory;
14+
import org.testcontainers.containers.FixedHostPortGenericContainer;
15+
import org.testcontainers.containers.GenericContainer;
16+
import org.testcontainers.containers.output.Slf4jLogConsumer;
17+
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
18+
import org.testcontainers.junit.jupiter.Container;
19+
import org.testcontainers.junit.jupiter.Testcontainers;
20+
import org.testcontainers.utility.MountableFile;
21+
22+
import java.util.Map;
23+
24+
import static java.util.Collections.singletonMap;
25+
26+
@Testcontainers
27+
public class QuestDBSinkConnectorEmbeddedAuthTest {
28+
private EmbeddedConnectCluster connect;
29+
private Converter converter;
30+
private String topicName;
31+
32+
// must match the user in authDb.txt
33+
private static final String TEST_USER_TOKEN = "UvuVb1USHGRRT08gEnwN2zGZrvM4MsLQ5brgF6SVkAw=";
34+
private static final String TEST_USER_NAME = "testUser1";
35+
36+
@Container
37+
private static GenericContainer<?> questDBContainer = newQuestDbConnector();
38+
39+
private static GenericContainer<?> newQuestDbConnector() {
40+
FixedHostPortGenericContainer<?> container = new FixedHostPortGenericContainer<>("questdb/questdb:7.3");
41+
container.addExposedPort(QuestDBUtils.QUESTDB_HTTP_PORT);
42+
container.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT);
43+
container.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main enjoy.*"));
44+
container.withCopyFileToContainer(MountableFile.forClasspathResource("/authDb.txt"), "/var/lib/questdb/conf/authDb.txt");
45+
container.withEnv("QDB_LINE_TCP_AUTH_DB_PATH", "conf/authDb.txt");
46+
return container.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")));
47+
}
48+
49+
@BeforeEach
50+
public void setUp() {
51+
topicName = ConnectTestUtils.newTopicName();
52+
JsonConverter jsonConverter = new JsonConverter();
53+
jsonConverter.configure(singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()));
54+
converter = jsonConverter;
55+
56+
connect = new EmbeddedConnectCluster.Builder()
57+
.name("questdb-connect-cluster")
58+
.build();
59+
60+
connect.start();
61+
}
62+
63+
@Test
64+
public void testSmoke() {
65+
connect.kafka().createTopic(topicName, 1);
66+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName);
67+
props.put(QuestDBSinkConnectorConfig.USERNAME, TEST_USER_NAME);
68+
props.put(QuestDBSinkConnectorConfig.TOKEN, TEST_USER_TOKEN);
69+
70+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
71+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
72+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
73+
.field("firstname", Schema.STRING_SCHEMA)
74+
.field("lastname", Schema.STRING_SCHEMA)
75+
.field("age", Schema.INT8_SCHEMA)
76+
.build();
77+
78+
Struct struct = new Struct(schema)
79+
.put("firstname", "John")
80+
.put("lastname", "Doe")
81+
.put("age", (byte) 42);
82+
83+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
84+
85+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n"
86+
+ "\"John\",\"Doe\",42\r\n",
87+
"select firstname,lastname,age from " + topicName);
88+
}
89+
}

0 commit comments

Comments
 (0)