diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 0c93103f..1d27fc63 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -89,12 +89,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu InterfaceIcebergTableOperator icebergTableOperator; @PostConstruct - void connect() throws InterruptedException { + void connect() { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { - throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!"); + throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!"); } if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { - throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!"); + throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!"); } // pass iceberg properties to iceberg and hadoop diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index ae0617e8..e6f0bdd6 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -109,13 +109,13 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D @PostConstruct - void connect() throws InterruptedException { + void connect() { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { - throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported, " + + throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported, " + "Supported (debezium.format.value=*) formats are {json,}!"); } if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { - throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported, " + + throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported, " + "Supported (debezium.format.key=*) formats are {json,}!"); } @@ -222,7 +222,7 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList } catch (IOException e) { LOGGER.error("Failed committing events to iceberg table!", e); - throw new InterruptedException(e.getMessage()); + throw new RuntimeException("Failed commiting events to iceberg table!", e); } DataFile dataFile = DataFiles.builder(eventTable.spec()) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 414fd9dd..50ce9e33 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -103,11 +103,11 @@ public static boolean hasSchema(JsonNode jsonNode) { && jsonNode.get("schema").get("fields").isArray(); } - public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) throws InterruptedException { + public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) { return IcebergUtil.getIcebergRecord(schema.asStruct(), data); } - public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) throws InterruptedException { + public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) { Map mappedResult = new HashMap<>(); LOGGER.debug("Processing nested field:{}", tableFields); @@ -123,7 +123,7 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN } private static Object jsonToGenericRecordVal(Types.NestedField field, - JsonNode node) throws InterruptedException { + JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); final Object val; switch (field.type().typeId()) { @@ -150,7 +150,7 @@ private static Object jsonToGenericRecordVal(Types.NestedField field, val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); } catch (IOException e) { LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e); - throw new InterruptedException("Failed Processing Event!" + e.getMessage()); + throw new RuntimeException("Failed Processing Event!", e); } break; case LIST: diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index 098ae3e5..3016ae74 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -73,7 +73,7 @@ protected String unsupportedTypeMessage(Object object) { return "Unexpected data type '" + type + "'"; } - protected ArrayList toIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { + protected ArrayList toIcebergRecords(Schema schema, ArrayList> events) { ArrayList icebergRecords = new ArrayList<>(); for (ChangeEvent e : events) { @@ -85,7 +85,7 @@ protected ArrayList toIcebergRecords(Schema schema, ArrayList icebergRecords) throws InterruptedException { + protected DataFile getDataFile(Table icebergTable, ArrayList icebergRecords) { final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); @@ -105,7 +105,7 @@ protected DataFile getDataFile(Table icebergTable, ArrayList icebergReco } } catch (IOException e) { - throw new InterruptedException(e.getMessage()); + throw new RuntimeException(e); } LOGGER.debug("Creating iceberg DataFile!"); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java index 851dc284..2fc3bc51 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java @@ -28,7 +28,7 @@ public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); @Override - public void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { + public void addToTable(Table icebergTable, ArrayList> events) { ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); DataFile dataFile = getDataFile(icebergTable, icebergRecords); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index f7b4d787..c488deb4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -63,7 +63,7 @@ public void initialize() { icebergTableAppend.initialize(); } - private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { + private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) { final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); @@ -105,7 +105,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRe } } catch (IOException e) { - throw new InterruptedException(e.getMessage()); + throw new RuntimeException(e); } LOGGER.debug("Creating iceberg equality delete file!"); @@ -122,7 +122,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRe .build(); } - private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { + private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) { ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); for (ChangeEvent e : events) { @@ -159,7 +159,7 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { } @Override - public void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { + public void addToTable(Table icebergTable, ArrayList> events) { if (icebergTable.sortOrder().isUnsorted()) { LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java index de49a96b..ff02f9a2 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java @@ -20,7 +20,7 @@ public interface InterfaceIcebergTableOperator { void initialize(); - void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException; + void addToTable(Table icebergTable, ArrayList> events); Predicate filterEvents(); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index aeb34fd1..3a1fb0e0 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -30,13 +30,13 @@ class TestIcebergUtil { final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json"); @Test - public void testNestedJsonRecord() throws JsonProcessingException { + public void testNestedJsonRecord() { Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema"))); assertTrue(exception.getMessage().contains("nested data type")); } @Test - public void testUnwrapJsonRecord() throws IOException, InterruptedException { + public void testUnwrapJsonRecord() throws IOException { JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload"); List fileds = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(unwrapWithSchema) .get("schema"));