Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
&& !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
// If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath());
checkPartitionsLoaded();
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
Expand Down Expand Up @@ -211,7 +212,6 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
checkPartitionsLoaded();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -87,9 +86,12 @@ public class StreamWriteFunctionTest {
@TempDir
File tempFile;

private String tempFilePath;

@BeforeEach
public void before() throws Exception {
this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath());
tempFilePath = tempFile.getAbsolutePath();
this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFilePath);
}

@AfterEach
Expand Down Expand Up @@ -232,9 +234,9 @@ public void testInsert() throws Exception {
@Test
public void testInsertDuplicates() throws Exception {
// reset the config option
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
Configuration conf = TestConfigurations.getDefaultConf(tempFilePath);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFilePath, conf);

// open the function and ingest data
funcWrapper.openFunction();
Expand Down Expand Up @@ -319,9 +321,9 @@ public void testUpsert() throws Exception {
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
Configuration conf = TestConfigurations.getDefaultConf(tempFilePath);
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFilePath, conf);

// open the function and ingest data
funcWrapper.openFunction();
Expand Down Expand Up @@ -407,6 +409,7 @@ public void testIndexStateBootstrap() throws Exception {
// Mark the index state as not fully loaded to trigger re-load from the filesystem.
funcWrapper.clearIndexState();

funcWrapper.openFunction();
// upsert another data buffer
for (RowData rowData : TestData.DATA_SET_TWO) {
funcWrapper.invoke(rowData);
Expand Down Expand Up @@ -436,7 +439,7 @@ public void testIndexStateBootstrap() throws Exception {
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");

checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
assertFalse(funcWrapper.isAllPartitionsLoaded(),
assertTrue(funcWrapper.isAllPartitionsLoaded(),
"All partitions assume to be loaded into the index state");
funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public MockFunctionInitializationContext() {

@Override
public boolean isRestored() {
return false;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void openFunction() throws Exception {

bucketAssignerFunction = new BucketAssignFunction<>(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext);
bucketAssignerFunction.open(conf);
bucketAssignerFunction.initializeState(this.functionInitializationContext);
bucketAssignerFunction.open(conf);

writeFunction = new StreamWriteFunction<>(conf);
writeFunction.setRuntimeContext(runtimeContext);
Expand Down