Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 11, 2024
1 parent 4c97c1c commit 2878397
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
Expand Down Expand Up @@ -329,7 +329,7 @@ public int hashCode() {

private Schema icebergSchema() {

if (this.valueSchema == null) {
if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,22 @@

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestChangeEvent;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import jakarta.inject.Inject;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*
* @author Ismail Simsek
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,14 @@ public IcebergChangeEventBuilder addKeyField(String name, String val) {
public IcebergChangeEvent build() {
return new IcebergChangeEvent(
this.destination,
payload.asText().getBytes(StandardCharsets.UTF_8),
keyPayload.asText().getBytes(StandardCharsets.UTF_8)
("{" +
"\"schema\":" + this.valueSchema() + "," +
"\"payload\":" + payload.toString() +
"} ").getBytes(StandardCharsets.UTF_8),
("{" +
"\"schema\":" + this.keySchema() + "," +
"\"payload\":" + keyPayload.toString() +
"} ").getBytes(StandardCharsets.UTF_8)
);
}

Expand Down

0 comments on commit 2878397

Please sign in to comment.