Skip to content

Commit

Permalink
Fix __source_ts_ms values with 0 conversion and enable mongodb test (#…
Browse files Browse the repository at this point in the history
…292)

* Update IcebergChangeConsumerMangodbTest.java

* Update SourceMangoDB.java

* Update IcebergChangeEvent.java

* Update SourceMangoDB.java
  • Loading branch information
ismailsimsek committed Mar 13, 2024
1 parent dee8d05 commit 1581e59
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString());
break;
case TIMESTAMP:
if (node.isLong() && TS_MS_FIELDS.contains(field.name())) {
if ((node.isLong() || node.isNumber()) && TS_MS_FIELDS.contains(field.name())) {
val = OffsetDateTime.ofInstant(Instant.ofEpochMilli(node.longValue()), ZoneOffset.UTC);
} else if (node.isTextual()) {
val = OffsetDateTime.parse(node.asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* @author Ismail Simsek
*/
@QuarkusTest
@Disabled // @TODO fix
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMangodbTest.TestProfile.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -31,6 +32,7 @@ public class SourceMangoDB implements QuarkusTestResourceLifecycleManager {

@Override
public Map<String, String> start() {
container.setPortBindings(List.of(MONGODB_PORT+":"+MONGODB_PORT));
container.withExposedPorts(MONGODB_PORT).start();

Map<String, String> params = new ConcurrentHashMap<>();
Expand Down

0 comments on commit 1581e59

Please sign in to comment.