Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,8 @@ public static void toJson(
generator.writeStartObject();
generator.writeFieldName(DATA_FILE);
ContentFileParser.unboundContentFileToJson(fileScanTask.file(), partitionSpec, generator);

if (deleteFileReferences != null) {
generator.writeArrayFieldStart(DELETE_FILE_REFERENCES);
deleteFileReferences.forEach(
delIdx -> {
try {
generator.writeNumber(delIdx);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
generator.writeEndArray();
JsonUtil.writeIntegerArray(DELETE_FILE_REFERENCES, deleteFileReferences, generator);
}

if (fileScanTask.residual() != null) {
Expand Down Expand Up @@ -92,8 +82,6 @@ public static FileScanTask fromJson(JsonNode jsonNode, List<DeleteFile> allDelet
filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL));
}

// TODO at the time of creation we dont have the schemaString, specString, and residual so will
// need to bind later
return new UnboundBaseFileScanTask(dataFile, deleteFiles, filter);
}
}
15 changes: 0 additions & 15 deletions core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

public class UnboundBaseFileScanTask extends BaseFileScanTask {
private DataFile unboundDataFile;
Expand All @@ -45,21 +44,7 @@ public PartitionSpec spec() {
throw new UnsupportedOperationException("spec() is not supported in UnboundBaseFileScanTask");
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("unboundDataFile", unboundDataFile)
.add("unboundDeleteFiles", unboundDeleteFiles)
.add("filter", filter)
.toString();
}

public FileScanTask bind(PartitionSpec spec, boolean caseSensitive) {
// TODO before creating a new task
// need to ensure that dataFile is refreshed with correct partitionData using spec
// need to ensure deleteFiles is refreshed with spec info
// need to ensure residual refreshed with spec.

Metrics dataFileMetrics =
new Metrics(
unboundDataFile.recordCount(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class FetchScanTasksRequest implements RESTRequest {

private String planTask;
private final String planTask;

public FetchScanTasksRequest(String planTask) {
this.planTask = planTask;
Expand All @@ -37,7 +37,7 @@ public String planTask() {

@Override
public void validate() {
Preconditions.checkArgument(planTask != null, "Invalid request: planTask null");
Preconditions.checkArgument(planTask != null, "Invalid planTask: null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
import java.util.List;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.rest.RESTRequest;

public class PlanTableScanRequest implements RESTRequest {
private Long snapshotId;
private List<String> select;
private Expression filter;
private Boolean caseSensitive;
private Boolean useSnapshotSchema;
private Long startSnapshotId;
private Long endSnapshotId;
private List<String> statsFields;
private final Long snapshotId;
private final List<String> select;
private final Expression filter;
private final boolean caseSensitive;
private final boolean useSnapshotSchema;
private final Long startSnapshotId;
private final Long endSnapshotId;
private final List<String> statsFields;

public Long snapshotId() {
return snapshotId;
Expand Down Expand Up @@ -86,20 +87,21 @@ private PlanTableScanRequest(

@Override
public void validate() {
// validation logic to be performed in PlanTableScanRequestParser
if (snapshotId != null || startSnapshotId != null || endSnapshotId != null) {
Preconditions.checkArgument(
snapshotId != null ^ (startSnapshotId != null && endSnapshotId != null),
"Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided");
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("snapshotId", snapshotId)
.add("select", select)
.add("filter", filter)
.add("caseSensitive", caseSensitive)
.add("useSnapshotSchema", useSnapshotSchema)
.add("startSnapshotId", startSnapshotId)
.add("endSnapshotId", endSnapshotId)
.add("statsFields", statsFields)
.toString();
}

Expand All @@ -120,38 +122,38 @@ public Builder withSnapshotId(Long withSnapshotId) {
return this;
}

public Builder withSelect(List<String> withSelect) {
this.select = withSelect;
public Builder withSelect(List<String> projection) {
this.select = projection;
return this;
}

public Builder withFilter(Expression withFilter) {
this.filter = withFilter;
public Builder withFilter(Expression expression) {
this.filter = expression;
return this;
}

public Builder withCaseSensitive(boolean withCaseSensitive) {
this.caseSensitive = withCaseSensitive;
public Builder withCaseSensitive(boolean value) {
this.caseSensitive = value;
return this;
}

public Builder withUseSnapshotSchema(boolean withUseSnapshotSchema) {
this.useSnapshotSchema = withUseSnapshotSchema;
public Builder withUseSnapshotSchema(boolean snapshotSchema) {
this.useSnapshotSchema = snapshotSchema;
return this;
}

public Builder withStartSnapshotId(Long withStartSnapshotId) {
this.startSnapshotId = withStartSnapshotId;
public Builder withStartSnapshotId(Long startingSnapshotId) {
this.startSnapshotId = startingSnapshotId;
return this;
}

public Builder withEndSnapshotId(Long withEndSnapshotId) {
this.endSnapshotId = withEndSnapshotId;
public Builder withEndSnapshotId(Long endingSnapshotId) {
this.endSnapshotId = endingSnapshotId;
return this;
}

public Builder withStatsFields(List<String> withStatsFields) {
this.statsFields = withStatsFields;
public Builder withStatsFields(List<String> fields) {
this.statsFields = fields;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,19 @@ public static PlanTableScanRequest fromJson(JsonNode json) {
Long startSnapshotId = JsonUtil.getLongOrNull(START_SNAPSHOT_ID, json);
Long endSnapshotId = JsonUtil.getLongOrNull(END_SNAPSHOT_ID, json);

if (snapshotId != null || startSnapshotId != null || endSnapshotId != null) {
Preconditions.checkArgument(
snapshotId != null ^ (startSnapshotId != null && endSnapshotId != null),
"Either snapshotId must be provided or both startSnapshotId and endSnapshotId must be provided");
}

List<String> select = JsonUtil.getStringListOrNull(SELECT, json);

Expression filter = null;
if (json.has(FILTER)) {
// TODO without text value it adds another " "
filter = ExpressionParser.fromJson(json.get(FILTER).textValue());
}

Boolean caseSensitive = true;
boolean caseSensitive = true;
if (json.has(CASE_SENSITIVE)) {
caseSensitive = JsonUtil.getBool(CASE_SENSITIVE, json);
}

Boolean useSnapshotSchema = false;
boolean useSnapshotSchema = false;
if (json.has(USE_SNAPSHOT_SCHEMA)) {
useSnapshotSchema = JsonUtil.getBool(USE_SNAPSHOT_SCHEMA, json);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@
import org.apache.iceberg.rest.RESTResponse;

public class FetchPlanningResultResponse implements RESTResponse {
private PlanStatus planStatus;

private List<String> planTasks;

private List<FileScanTask> fileScanTasks;

private List<DeleteFile> deleteFiles;
private Map<Integer, PartitionSpec> specsById;
private final PlanStatus planStatus;
private final List<String> planTasks;
private final List<FileScanTask> fileScanTasks;
private final List<DeleteFile> deleteFiles;
private final Map<Integer, PartitionSpec> specsById;

private FetchPlanningResultResponse(
PlanStatus planStatus,
Expand Down Expand Up @@ -92,36 +89,33 @@ public static class Builder {
private Builder() {}

private PlanStatus planStatus;

private List<String> planTasks;

private List<FileScanTask> fileScanTasks;

private List<DeleteFile> deleteFiles;
private Map<Integer, PartitionSpec> specsById;

public Builder withPlanStatus(PlanStatus withPlanStatus) {
this.planStatus = withPlanStatus;
public Builder withPlanStatus(PlanStatus status) {
this.planStatus = status;
return this;
}

public Builder withPlanTasks(List<String> withPlanTasks) {
this.planTasks = withPlanTasks;
public Builder withPlanTasks(List<String> tasks) {
this.planTasks = tasks;
return this;
}

public Builder withFileScanTasks(List<FileScanTask> withFileScanTasks) {
this.fileScanTasks = withFileScanTasks;
public Builder withFileScanTasks(List<FileScanTask> tasks) {
this.fileScanTasks = tasks;
return this;
}

public Builder withDeleteFiles(List<DeleteFile> withDeleteFiles) {
this.deleteFiles = withDeleteFiles;
public Builder withDeleteFiles(List<DeleteFile> deletes) {
this.deleteFiles = deletes;
return this;
}

public Builder withSpecsById(Map<Integer, PartitionSpec> withSpecsById) {
this.specsById = withSpecsById;
public Builder withSpecsById(Map<Integer, PartitionSpec> specs) {
this.specsById = specs;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public static void toJson(FetchPlanningResultResponse response, JsonGenerator ge
throws IOException {
Preconditions.checkArgument(
null != response, "Invalid response: fetchPanningResultResponse null");
Preconditions.checkArgument(
response.specsById() != null,
"Cannot serialize fetchingPlanningResultResponse without specsById");
gen.writeStartObject();
gen.writeStringField(PLAN_STATUS, response.planStatus().status());
if (response.planTasks() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.apache.iceberg.rest.RESTResponse;

public class FetchScanTasksResponse implements RESTResponse {
private List<String> planTasks;
private List<FileScanTask> fileScanTasks;
private List<DeleteFile> deleteFiles;
private Map<Integer, PartitionSpec> specsById;
private final List<String> planTasks;
private final List<FileScanTask> fileScanTasks;
private final List<DeleteFile> deleteFiles;
private final Map<Integer, PartitionSpec> specsById;

private FetchScanTasksResponse(
List<String> planTasks,
Expand Down Expand Up @@ -85,23 +85,23 @@ private Builder() {}
private List<DeleteFile> deleteFiles;
private Map<Integer, PartitionSpec> specsById;

public Builder withPlanTasks(List<String> withPlanTasks) {
this.planTasks = withPlanTasks;
public Builder withPlanTasks(List<String> tasks) {
this.planTasks = tasks;
return this;
}

public Builder withFileScanTasks(List<FileScanTask> withFileScanTasks) {
this.fileScanTasks = withFileScanTasks;
public Builder withFileScanTasks(List<FileScanTask> tasks) {
this.fileScanTasks = tasks;
return this;
}

public Builder withDeleteFiles(List<DeleteFile> withDeleteFiles) {
this.deleteFiles = withDeleteFiles;
public Builder withDeleteFiles(List<DeleteFile> deletes) {
this.deleteFiles = deletes;
return this;
}

public Builder withSpecsById(Map<Integer, PartitionSpec> withSpecsById) {
this.specsById = withSpecsById;
public Builder withSpecsById(Map<Integer, PartitionSpec> specs) {
this.specsById = specs;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public static String toJson(FetchScanTasksResponse response, boolean pretty) {

public static void toJson(FetchScanTasksResponse response, JsonGenerator gen) throws IOException {
Preconditions.checkArgument(response != null, "Invalid response: fetchScanTasksResponse null");
Preconditions.checkArgument(
response.specsById() != null, "Cannot serialize fetchScanTasksResponse without specsById");
gen.writeStartObject();
if (response.planTasks() != null) {
JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen);
Expand Down
Loading