Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deleted initialNumReaders paramter. #24355

Merged
merged 3 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,6 @@ public abstract static class ReadWithPartitions<T> extends PTransform<PBegin, PC

abstract @Nullable RowMapper<T> getRowMapper();

abstract @Nullable Integer getInitialNumReaders();

abstract Builder<T> toBuilder();

@AutoValue.Builder
Expand All @@ -560,8 +558,6 @@ abstract Builder<T> setDataSourceConfiguration(

abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);

abstract Builder<T> setInitialNumReaders(Integer initialNumReaders);

abstract ReadWithPartitions<T> build();
}

Expand All @@ -585,12 +581,6 @@ public ReadWithPartitions<T> withRowMapper(RowMapper<T> rowMapper) {
return toBuilder().setRowMapper(rowMapper).build();
}

/** Pre-split initial restriction and start initialNumReaders reading at the very beginning. */
public ReadWithPartitions<T> withInitialNumReaders(Integer initialNumReaders) {
checkNotNull(initialNumReaders, "initialNumReaders can not be null");
return toBuilder().setInitialNumReaders(initialNumReaders).build();
}

@Override
public PCollection<T> expand(PBegin input) {
DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration();
Expand All @@ -603,10 +593,6 @@ public PCollection<T> expand(PBegin input) {
RowMapper<T> rowMapper = getRowMapper();
Preconditions.checkArgumentNotNull(rowMapper, "withRowMapper() is required");

int initialNumReaders = SingleStoreUtil.getArgumentWithDefault(getInitialNumReaders(), 1);
checkArgument(
initialNumReaders >= 1, "withInitialNumReaders() should be greater or equal to 1");

String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), getQuery());

Coder<T> coder =
Expand All @@ -621,11 +607,7 @@ public PCollection<T> expand(PBegin input) {
.apply(
ParDo.of(
new ReadWithPartitions.ReadWithPartitionsFn<>(
dataSourceConfiguration,
actualQuery,
database,
rowMapper,
initialNumReaders)))
dataSourceConfiguration, actualQuery, database, rowMapper)))
.setCoder(coder);
}

Expand All @@ -635,19 +617,16 @@ private static class ReadWithPartitionsFn<ParameterT, OutputT>
String query;
String database;
RowMapper<OutputT> rowMapper;
int initialNumReaders;

ReadWithPartitionsFn(
DataSourceConfiguration dataSourceConfiguration,
String query,
String database,
RowMapper<OutputT> rowMapper,
int initialNumReaders) {
RowMapper<OutputT> rowMapper) {
this.dataSourceConfiguration = dataSourceConfiguration;
this.query = query;
this.database = database;
this.rowMapper = rowMapper;
this.initialNumReaders = initialNumReaders;
}

@ProcessElement
Expand Down Expand Up @@ -690,19 +669,8 @@ public void splitRange(
@Element ParameterT element,
@Restriction OffsetRange range,
OutputReceiver<OffsetRange> receiver) {
long numPartitions = range.getTo() - range.getFrom();
checkArgument(
initialNumReaders <= numPartitions,
"withInitialNumReaders() should not be greater then number of partitions in the database.\n"
+ String.format(
"InitialNumReaders is %d, number of partitions in the database is %d",
initialNumReaders, range.getTo()));

for (int i = 0; i < initialNumReaders; i++) {
receiver.output(
new OffsetRange(
range.getFrom() + numPartitions * i / initialNumReaders,
range.getFrom() + numPartitions * (i + 1) / initialNumReaders));
for (long i = range.getFrom(); i < range.getTo(); i++) {
receiver.output(new OffsetRange(i, i + 1));
}
}

Expand Down Expand Up @@ -744,7 +712,6 @@ public void populateDisplayData(DisplayData.Builder builder) {
builder.addIfNotNull(DisplayData.item("table", getTable()));
builder.addIfNotNull(
DisplayData.item("rowMapper", SingleStoreUtil.getClassNameOrNull(getRowMapper())));
builder.addIfNotNull(DisplayData.item("initialNumReaders", getInitialNumReaders()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,81 +148,29 @@ public void testReadWithPartitionsWithTable() {
pipeline.run();
}

@Test
public void testReadWithPartitionsWithInitialNumReaders() {
PCollection<TestRow> rows =
pipeline.apply(
SingleStoreIO.<TestRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withQuery("SELECT * FROM `t`")
.withRowMapper(new TestHelper.TestRowMapper())
.withInitialNumReaders(2));

PAssert.thatSingleton(rows.apply("Count All", Count.globally()))
.isEqualTo((long) EXPECTED_ROW_COUNT);

Iterable<TestRow> expectedValues = TestRow.getExpectedValues(0, EXPECTED_ROW_COUNT);
PAssert.that(rows).containsInAnyOrder(expectedValues);

pipeline.run();
}

@Test
public void testReadWithPartitionsZeroInitialNumReaders() {
assertThrows(
"withInitialNumReaders() should be greater or equal to 1",
IllegalArgumentException.class,
() -> {
pipelineForErrorChecks.apply(
SingleStoreIO.<TestRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable("t")
.withInitialNumReaders(0)
.withRowMapper(new TestHelper.TestRowMapper()));
});
}

@Test
public void testReadWithPartitionsTooBigInitialNumReaders() {
pipelineForErrorChecks.apply(
SingleStoreIO.<TestRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable("t")
.withInitialNumReaders(100)
.withRowMapper(new TestHelper.TestRowMapper()));

assertThrows(
"withInitialNumReaders() should not be greater then number of partitions in the database.\n"
+ "InitialNumReaders is 100, number of partitions in the database is 2",
Pipeline.PipelineExecutionException.class,
() -> pipelineForErrorChecks.run().waitUntilFinish());
}

@Test
public void testReadWithPartitionsNoTableAndQuery() {
assertThrows(
"One of withTable() or withQuery() is required",
IllegalArgumentException.class,
() -> {
pipelineForErrorChecks.apply(
SingleStoreIO.<TestRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withRowMapper(new TestHelper.TestRowMapper()));
});
() ->
pipelineForErrorChecks.apply(
SingleStoreIO.<TestRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withRowMapper(new TestHelper.TestRowMapper())));
}

@Test
public void testReadWithPartitionsBothTableAndQuery() {
assertThrows(
"withTable() can not be used together with withQuery()",
IllegalArgumentException.class,
() -> {
pipelineForErrorChecks.apply(
SingleStoreIO.<TestRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable("t")
.withQuery("SELECT * FROM `t`")
.withRowMapper(new TestHelper.TestRowMapper()));
});
() ->
pipelineForErrorChecks.apply(
SingleStoreIO.<TestRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable("t")
.withQuery("SELECT * FROM `t`")
.withRowMapper(new TestHelper.TestRowMapper())));
}
}