Skip to content

Commit

Permalink
refactor: avoid throwing InterruptedException directly (#38)
Browse files Browse the repository at this point in the history
InterruptedException is meant to be thrown when a thread is interrupted.
This changes some places where it was being used to indicate config
errors or IO errors to more appropriate exceptions.
  • Loading branch information
racevedoo committed Oct 20, 2021
1 parent 772f01d commit 6dbee1a
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
InterfaceIcebergTableOperator icebergTableOperator;

@PostConstruct
void connect() throws InterruptedException {
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

// pass iceberg properties to iceberg and hadoop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D


@PostConstruct
void connect() throws InterruptedException {
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported, " +
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported, " +
"Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported, " +
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported, " +
"Supported (debezium.format.key=*) formats are {json,}!");
}

Expand Down Expand Up @@ -222,7 +222,7 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList

} catch (IOException e) {
LOGGER.error("Failed committing events to iceberg table!", e);
throw new InterruptedException(e.getMessage());
throw new RuntimeException("Failed commiting events to iceberg table!", e);
}

DataFile dataFile = DataFiles.builder(eventTable.spec())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public static boolean hasSchema(JsonNode jsonNode) {
&& jsonNode.get("schema").get("fields").isArray();
}

public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) throws InterruptedException {
public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
return IcebergUtil.getIcebergRecord(schema.asStruct(), data);
}

public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) throws InterruptedException {
public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) {
Map<String, Object> mappedResult = new HashMap<>();
LOGGER.debug("Processing nested field:{}", tableFields);

Expand All @@ -123,7 +123,7 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN
}

private static Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) throws InterruptedException {
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand All @@ -150,7 +150,7 @@ private static Object jsonToGenericRecordVal(Types.NestedField field,
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
} catch (IOException e) {
LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e);
throw new InterruptedException("Failed Processing Event!" + e.getMessage());
throw new RuntimeException("Failed Processing Event!", e);
}
break;
case LIST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected String unsupportedTypeMessage(Object object) {
return "Unexpected data type '" + type + "'";
}

protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = new ArrayList<>();
for (ChangeEvent<Object, Object> e : events) {
Expand All @@ -85,7 +85,7 @@ protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEven
return icebergRecords;
}

protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {
protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));

Expand All @@ -105,7 +105,7 @@ protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergReco
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg DataFile!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class);

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = toIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void initialize() {
icebergTableAppend.initialize();
}

private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {
private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
Expand Down Expand Up @@ -105,7 +105,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg equality delete file!");
Expand All @@ -122,7 +122,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
.build();
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();

for (ChangeEvent<Object, Object> e : events) {
Expand Down Expand Up @@ -159,7 +159,7 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
}

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {

if (icebergTable.sortOrder().isUnsorted()) {
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface InterfaceIcebergTableOperator {

void initialize();

void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException;
void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events);

Predicate<Record> filterEvents();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ class TestIcebergUtil {
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
public void testNestedJsonRecord() {
Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")));
assertTrue(exception.getMessage().contains("nested data type"));
}

@Test
public void testUnwrapJsonRecord() throws IOException, InterruptedException {
public void testUnwrapJsonRecord() throws IOException {
JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload");
List<Types.NestedField> fileds = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(unwrapWithSchema)
.get("schema"));
Expand Down

0 comments on commit 6dbee1a

Please sign in to comment.