Skip to content

Commit b229a92

Browse files
committed
fix: handle j.u.Date in schemaless payloads
this can happen e.g. when a transform is used with schemaless payloads
1 parent 6f29315 commit b229a92

File tree

2 files changed

+87
-1
lines changed

2 files changed

+87
-1
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

+3
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,9 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
276276
sender.doubleColumn(actualName, (Double) value);
277277
} else if (value instanceof Map) {
278278
handleMap(name, (Map<?, ?>) value, fallbackName);
279+
} else if (value instanceof java.util.Date) {
280+
long epochMillis = ((java.util.Date) value).getTime();
281+
sender.timestampColumn(actualName, TimeUnit.MILLISECONDS.toMicros(epochMillis));
279282
} else {
280283
onUnsupportedType(actualName, value.getClass().getName());
281284
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

+84-1
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,91 @@ private void testTimestampUnitResolution0(String mode) {
457457
"select firstname,lastname,timestamp from " + topicName);
458458
}
459459

460+
@Test
461+
public void testTimestampSMT_parseMicroseconds_schemaLess() {
462+
connect.kafka().createTopic(topicName, 1);
463+
Map<String, String> props = baseConnectorProps(topicName);
464+
props.put("value.converter.schemas.enable", "false");
465+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born");
466+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
467+
468+
String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSSSSS z";
469+
props.put("transforms", "Timestamp-born,Timestamp-death");
470+
props.put("transforms.Timestamp-born.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value");
471+
props.put("transforms.Timestamp-born.field", "born");
472+
props.put("transforms.Timestamp-born.format", timestampFormat);
473+
props.put("transforms.Timestamp-born.target.type", "Timestamp");
474+
475+
props.put("transforms.Timestamp-death.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value");
476+
props.put("transforms.Timestamp-death.field", "death");
477+
props.put("transforms.Timestamp-death.target.type", "Timestamp");
478+
props.put("transforms.Timestamp-death.format", timestampFormat);
479+
480+
connect.configureConnector(CONNECTOR_NAME, props);
481+
assertConnectorTaskRunningEventually();
482+
483+
QuestDBUtils.assertSql(questDBContainer,
484+
"{\"ddl\":\"OK\"}\n",
485+
"create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)",
486+
QuestDBUtils.Endpoint.EXEC);
487+
488+
String birthTimestamp = "1985-08-02 16:41:55.402095 UTC";
489+
String deadTimestamp = "2023-08-02 16:41:55.402095 UTC";
490+
connect.kafka().produce(topicName, "foo",
491+
"{\"firstname\":\"John\""
492+
+ ",\"lastname\":\"Doe\""
493+
+ ",\"death\":\"" + deadTimestamp + "\""
494+
+ ",\"born\":\"" + birthTimestamp + "\"}"
495+
);
496+
497+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" +
498+
"\"John\",\"Doe\",\"2023-08-02T16:48:37.095000Z\",\"1985-08-02T16:48:37.095000Z\"\r\n",
499+
"select * from " + topicName);
500+
}
501+
502+
@Test
503+
public void testTimestampSMT_parseMicroseconds_withSchema() {
504+
connect.kafka().createTopic(topicName, 1);
505+
Map<String, String> props = baseConnectorProps(topicName);
506+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born");
507+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
508+
509+
String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSSSSS z";
510+
props.put("transforms", "Timestamp-born,Timestamp-death");
511+
props.put("transforms.Timestamp-born.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value");
512+
props.put("transforms.Timestamp-born.field", "born");
513+
props.put("transforms.Timestamp-born.format", timestampFormat);
514+
props.put("transforms.Timestamp-born.target.type", "Timestamp");
515+
props.put("transforms.Timestamp-death.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value");
516+
props.put("transforms.Timestamp-death.field", "death");
517+
props.put("transforms.Timestamp-death.target.type", "Timestamp");
518+
props.put("transforms.Timestamp-death.format", timestampFormat);
519+
520+
connect.configureConnector(CONNECTOR_NAME, props);
521+
assertConnectorTaskRunningEventually();
460522

461-
@Test
523+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
524+
.field("firstname", Schema.STRING_SCHEMA)
525+
.field("lastname", Schema.STRING_SCHEMA)
526+
.field("born", Schema.STRING_SCHEMA)
527+
.field("death", Schema.STRING_SCHEMA)
528+
.build();
529+
530+
Struct struct = new Struct(schema)
531+
.put("firstname", "John")
532+
.put("lastname", "Doe")
533+
.put("born", "1985-08-02 16:41:55.402095 UTC")
534+
.put("death", "2023-08-02 16:41:55.402095 UTC");
535+
536+
537+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
538+
539+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"timestamp\"\r\n" +
540+
"\"John\",\"Doe\",\"2023-08-02T16:48:37.095000Z\",\"1985-08-02T16:48:37.095000Z\"\r\n",
541+
"select * from " + topicName);
542+
}
543+
544+
@Test
462545
public void testUpfrontTable() {
463546
connect.kafka().createTopic(topicName, 1);
464547
Map<String, String> props = baseConnectorProps(topicName);

0 commit comments

Comments
 (0)