Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: avoid throwing InterruptedException directly #38

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
InterfaceIcebergTableOperator icebergTableOperator;

@PostConstruct
void connect() throws InterruptedException {
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

// pass iceberg properties to iceberg and hadoop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D


@PostConstruct
void connect() throws InterruptedException {
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported, " +
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported, " +
"Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported, " +
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported, " +
"Supported (debezium.format.key=*) formats are {json,}!");
}

Expand Down Expand Up @@ -222,7 +222,7 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList

} catch (IOException e) {
LOGGER.error("Failed committing events to iceberg table!", e);
throw new InterruptedException(e.getMessage());
throw new RuntimeException("Failed commiting events to iceberg table!", e);
}

DataFile dataFile = DataFiles.builder(eventTable.spec())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public static boolean hasSchema(JsonNode jsonNode) {
&& jsonNode.get("schema").get("fields").isArray();
}

public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) throws InterruptedException {
public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
return IcebergUtil.getIcebergRecord(schema.asStruct(), data);
}

public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) throws InterruptedException {
public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) {
Map<String, Object> mappedResult = new HashMap<>();
LOGGER.debug("Processing nested field:{}", tableFields);

Expand All @@ -123,7 +123,7 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN
}

private static Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) throws InterruptedException {
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand All @@ -150,7 +150,7 @@ private static Object jsonToGenericRecordVal(Types.NestedField field,
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
} catch (IOException e) {
LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e);
throw new InterruptedException("Failed Processing Event!" + e.getMessage());
throw new RuntimeException("Failed Processing Event!", e);
}
break;
case LIST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected String unsupportedTypeMessage(Object object) {
return "Unexpected data type '" + type + "'";
}

protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = new ArrayList<>();
for (ChangeEvent<Object, Object> e : events) {
Expand All @@ -85,7 +85,7 @@ protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEven
return icebergRecords;
}

protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {
protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));

Expand All @@ -105,7 +105,7 @@ protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergReco
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg DataFile!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class);

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = toIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void initialize() {
icebergTableAppend.initialize();
}

private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {
private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
Expand Down Expand Up @@ -105,7 +105,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg equality delete file!");
Expand All @@ -122,7 +122,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
.build();
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();

for (ChangeEvent<Object, Object> e : events) {
Expand Down Expand Up @@ -159,7 +159,7 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
}

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {

if (icebergTable.sortOrder().isUnsorted()) {
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface InterfaceIcebergTableOperator {

void initialize();

void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException;
void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events);

Predicate<Record> filterEvents();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ class TestIcebergUtil {
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
public void testNestedJsonRecord() {
Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")));
assertTrue(exception.getMessage().contains("nested data type"));
}

@Test
public void testUnwrapJsonRecord() throws IOException, InterruptedException {
public void testUnwrapJsonRecord() throws IOException {
JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload");
List<Types.NestedField> fileds = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(unwrapWithSchema)
.get("schema"));
Expand Down