Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
*/
public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State<T>>> {

private final RowType rowType;
private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
Expand All @@ -62,8 +61,9 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State

private boolean batchCommitted = false;
private BucketMode bucketMode;
private RowType writeType;
private int[] notNullFieldIndex;

private final int[] notNullFieldIndex;
private final @Nullable DefaultValueRow defaultValueRow;

public TableWriteImpl(
Expand All @@ -73,7 +73,7 @@ public TableWriteImpl(
RecordExtractor<T> recordExtractor,
@Nullable RowKindGenerator rowKindGenerator,
@Nullable RowKindFilter rowKindFilter) {
this.rowType = rowType;
this.writeType = rowType;
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
Expand Down Expand Up @@ -114,6 +114,13 @@ public TableWriteImpl<T> withIOManager(IOManager ioManager) {
@Override
public TableWriteImpl<T> withWriteType(RowType writeType) {
write.withWriteType(writeType);
this.writeType = writeType;
List<String> notNullColumnNames =
writeType.getFields().stream()
.filter(field -> !field.type().isNullable())
.map(DataField::name)
.collect(Collectors.toList());
this.notNullFieldIndex = writeType.getFieldIndices(notNullColumnNames);
return this;
}

Expand Down Expand Up @@ -188,7 +195,7 @@ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
private void checkNullability(InternalRow row) {
for (int idx : notNullFieldIndex) {
if (row.isNullAt(idx)) {
String columnName = rowType.getFields().get(idx).name();
String columnName = writeType.getFields().get(idx).name();
throw new RuntimeException(
String.format("Cannot write null to non-null column(%s)", columnName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,47 @@ public void testWithRowIds() throws Exception {
assertThat(i.get()).isEqualTo(2);
}

@Test
public void testNonNullColumn() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
schemaBuilder.column("f1", DataTypes.STRING());
schemaBuilder.column("f2", DataTypes.STRING().copy(false));
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");

Schema schema = schemaBuilder.build();

catalog.createTable(identifier(), schema, true);
Table table = catalog.getTable(identifier());
BatchWriteBuilder builder = table.newBatchWriteBuilder();
BatchTableWrite write = builder.newWrite();
write.write(GenericRow.of(1, BinaryString.fromString("a"), BinaryString.fromString("b")));
BatchTableCommit commit = builder.newCommit();
List<CommitMessage> commitables = write.prepareCommit();
commit.commit(commitables);

write =
builder.newWrite()
.withWriteType(schema.rowType().project(Collections.singletonList("f2")));
write.write(GenericRow.of(BinaryString.fromString("c")));
commit = builder.newCommit();
commitables = write.prepareCommit();
setFirstRowId(commitables, 0L);
commit.commit(commitables);

ReadBuilder readBuilder = table.newReadBuilder();
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
reader.forEachRemaining(
r -> {
assertThat(r.getInt(0)).isEqualTo(1);
assertThat(r.getString(1).toString()).isEqualTo("a");
assertThat(r.getString(2).toString()).isEqualTo("c");
});
}

protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
Expand Down