diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java b/core/src/main/java/org/apache/iceberg/ContentFileParser.java index 9e51dea3f1dc..d5ccb3e37953 100644 --- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java @@ -27,7 +27,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.JsonUtil; -class ContentFileParser { +public class ContentFileParser { private static final String SPEC_ID = "spec-id"; private static final String CONTENT = "content"; private static final String FILE_PATH = "file-path"; @@ -48,17 +48,8 @@ class ContentFileParser { private ContentFileParser() {} - private static boolean hasPartitionData(StructLike partitionData) { - return partitionData != null && partitionData.size() > 0; - } - - static String toJson(ContentFile contentFile, PartitionSpec spec) { - return JsonUtil.generate( - generator -> ContentFileParser.toJson(contentFile, spec, generator), false); - } - - static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) - throws IOException { + public static void unboundContentFileToJson( + ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException { Preconditions.checkArgument(contentFile != null, "Invalid content file: null"); Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); @@ -67,14 +58,8 @@ static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator "Invalid partition spec id from content file: expected = %s, actual = %s", spec.specId(), contentFile.specId()); - Preconditions.checkArgument( - spec.isPartitioned() == hasPartitionData(contentFile.partition()), - "Invalid partition data from content file: expected = %s, actual = %s", - spec.isPartitioned() ? "partitioned" : "unpartitioned", - hasPartitionData(contentFile.partition()) ? "partitioned" : "unpartitioned"); generator.writeStartObject(); - // ignore the ordinal position (ContentFile#pos) of the file in a manifest, // as it isn't used and BaseFile constructor doesn't support it. @@ -112,33 +97,14 @@ static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator generator.writeEndObject(); } - static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { + public static ContentFile unboundContentFileFromJson(JsonNode jsonNode) { Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for content file: null"); - Preconditions.checkArgument( - jsonNode.isObject(), "Invalid JSON node for content file: non-object (%s)", jsonNode); - Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); int specId = JsonUtil.getInt(SPEC_ID, jsonNode); FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode)); String filePath = JsonUtil.getString(FILE_PATH, jsonNode); FileFormat fileFormat = FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode)); - PartitionData partitionData = null; - if (jsonNode.has(PARTITION)) { - partitionData = new PartitionData(spec.partitionType()); - StructLike structLike = - (StructLike) SingleValueParser.fromJson(spec.partitionType(), jsonNode.get(PARTITION)); - Preconditions.checkState( - partitionData.size() == structLike.size(), - "Invalid partition data size: expected = %s, actual = %s", - partitionData.size(), - structLike.size()); - for (int pos = 0; pos < partitionData.size(); ++pos) { - Class javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass(); - partitionData.set(pos, structLike.get(pos, javaClass)); - } - } - long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode); Metrics metrics = metricsFromJson(jsonNode); ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode); @@ -151,7 +117,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { specId, filePath, fileFormat, - partitionData, + null, fileSizeInBytes, metrics, keyMetadata, @@ -163,7 +129,7 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { fileContent, filePath, fileFormat, - partitionData, + null, fileSizeInBytes, metrics, equalityFieldIds, @@ -173,8 +139,17 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { } } - static void unboundContentFileToJson( - ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException { + private static boolean hasPartitionData(StructLike partitionData) { + return partitionData != null && partitionData.size() > 0; + } + + static String toJson(ContentFile contentFile, PartitionSpec spec) { + return JsonUtil.generate( + generator -> ContentFileParser.toJson(contentFile, spec, generator), false); + } + + static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator generator) + throws IOException { Preconditions.checkArgument(contentFile != null, "Invalid content file: null"); Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); @@ -183,8 +158,14 @@ static void unboundContentFileToJson( "Invalid partition spec id from content file: expected = %s, actual = %s", spec.specId(), contentFile.specId()); + Preconditions.checkArgument( + spec.isPartitioned() == hasPartitionData(contentFile.partition()), + "Invalid partition data from content file: expected = %s, actual = %s", + spec.isPartitioned() ? "partitioned" : "unpartitioned", + hasPartitionData(contentFile.partition()) ? "partitioned" : "unpartitioned"); generator.writeStartObject(); + // ignore the ordinal position (ContentFile#pos) of the file in a manifest, // as it isn't used and BaseFile constructor doesn't support it. @@ -222,17 +203,33 @@ static void unboundContentFileToJson( generator.writeEndObject(); } - static ContentFile unboundContentFileFromJson(JsonNode jsonNode) { - // TODO this does not contain ParitionSpec at the time of serialization - // we will need to bind the correct ParitionData to the file at a later point in the protocol. - + static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for content file: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for content file: non-object (%s)", jsonNode); + Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); int specId = JsonUtil.getInt(SPEC_ID, jsonNode); FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode)); String filePath = JsonUtil.getString(FILE_PATH, jsonNode); FileFormat fileFormat = FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode)); + PartitionData partitionData = null; + if (jsonNode.has(PARTITION)) { + partitionData = new PartitionData(spec.partitionType()); + StructLike structLike = + (StructLike) SingleValueParser.fromJson(spec.partitionType(), jsonNode.get(PARTITION)); + Preconditions.checkState( + partitionData.size() == structLike.size(), + "Invalid partition data size: expected = %s, actual = %s", + partitionData.size(), + structLike.size()); + for (int pos = 0; pos < partitionData.size(); ++pos) { + Class javaClass = spec.partitionType().fields().get(pos).type().typeId().javaClass(); + partitionData.set(pos, structLike.get(pos, javaClass)); + } + } + long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode); Metrics metrics = metricsFromJson(jsonNode); ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode); @@ -245,7 +242,7 @@ static ContentFile unboundContentFileFromJson(JsonNode jsonNode) { specId, filePath, fileFormat, - null, + partitionData, fileSizeInBytes, metrics, keyMetadata, @@ -257,7 +254,7 @@ static ContentFile unboundContentFileFromJson(JsonNode jsonNode) { fileContent, filePath, fileFormat, - null, + partitionData, fileSizeInBytes, metrics, equalityFieldIds, diff --git a/core/src/main/java/org/apache/iceberg/FetchPlanningResultResponseParser.java b/core/src/main/java/org/apache/iceberg/FetchPlanningResultResponseParser.java deleted file mode 100644 index bac7367d1928..000000000000 --- a/core/src/main/java/org/apache/iceberg/FetchPlanningResultResponseParser.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.rest.PlanStatus; -import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; -import org.apache.iceberg.util.JsonUtil; - -public class FetchPlanningResultResponseParser { - private static final String PLAN_STATUS = "plan-status"; - private static final String PLAN_TASKS = "plan-tasks"; - private static final String FILE_SCAN_TASKS = "file-scan-tasks"; - private static final String DELETE_FILES = "delete-files"; - - private FetchPlanningResultResponseParser() {} - - public static String toJson(FetchPlanningResultResponse response) { - return toJson(response, false); - } - - public static String toJson(FetchPlanningResultResponse response, boolean pretty) { - return JsonUtil.generate(gen -> toJson(response, gen), pretty); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static void toJson(FetchPlanningResultResponse response, JsonGenerator gen) - throws IOException { - Preconditions.checkArgument( - null != response, "Invalid response: fetchPanningResultResponse null"); - - if (response.planStatus() != PlanStatus.COMPLETED - && (response.planTasks() != null || response.fileScanTasks() != null)) { - throw new IllegalArgumentException( - "Invalid response: tasks can only be returned in a 'completed' status"); - } - - if ((response.deleteFiles() != null && !response.deleteFiles().isEmpty()) - && (response.fileScanTasks() == null || response.fileScanTasks().isEmpty())) { - throw new IllegalArgumentException( - "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); - } - - gen.writeStartObject(); - if (response.planStatus() != null) { - gen.writeStringField(PLAN_STATUS, response.planStatus().status()); - } - - if (response.planTasks() != null) { - gen.writeArrayFieldStart(PLAN_TASKS); - for (String planTask : response.planTasks()) { - gen.writeString(planTask); - } - gen.writeEndArray(); - } - - List deleteFiles = null; - Map deleteFilePathToIndex = Maps.newHashMap(); - if (response.deleteFiles() != null) { - deleteFiles = response.deleteFiles(); - gen.writeArrayFieldStart(DELETE_FILES); - for (int i = 0; i < deleteFiles.size(); i++) { - DeleteFile deleteFile = deleteFiles.get(i); - deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i); - ContentFileParser.unboundContentFileToJson( - deleteFiles.get(i), response.specsById().get(deleteFile.specId()), gen); - } - gen.writeEndArray(); - } - - if (response.fileScanTasks() != null) { - Set deleteFileReferences = Sets.newHashSet(); - gen.writeArrayFieldStart(FILE_SCAN_TASKS); - for (FileScanTask fileScanTask : response.fileScanTasks()) { - if (deleteFiles != null) { - for (DeleteFile taskDelete : fileScanTask.deletes()) { - deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString())); - } - } - PartitionSpec partitionSpec = response.specsById().get(fileScanTask.file().specId()); - RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, partitionSpec, gen); - } - gen.writeEndArray(); - } - - gen.writeEndObject(); - } - - public static FetchPlanningResultResponse fromJson(String json) { - Preconditions.checkArgument(json != null, "Invalid response: fetchPanningResultResponse null"); - return JsonUtil.parse(json, FetchPlanningResultResponseParser::fromJson); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static FetchPlanningResultResponse fromJson(JsonNode json) { - Preconditions.checkArgument( - json != null && !json.isEmpty(), - "Invalid response: fetchPanningResultResponse null or empty"); - - PlanStatus planStatus = null; - if (json.has(PLAN_STATUS)) { - String status = JsonUtil.getString(PLAN_STATUS, json); - planStatus = PlanStatus.fromName(status); - } - - List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); - - List allDeleteFiles = null; - if (json.has(DELETE_FILES)) { - JsonNode deletesArray = json.get(DELETE_FILES); - ImmutableList.Builder deleteFilesBuilder = ImmutableList.builder(); - for (JsonNode deleteFileNode : deletesArray) { - DeleteFile deleteFile = - (DeleteFile) ContentFileParser.unboundContentFileFromJson(deleteFileNode); - deleteFilesBuilder.add(deleteFile); - } - allDeleteFiles = deleteFilesBuilder.build(); - } - - List fileScanTasks = null; - if (json.has(FILE_SCAN_TASKS)) { - JsonNode fileScanTasksArray = json.get(FILE_SCAN_TASKS); - ImmutableList.Builder fileScanTaskBuilder = ImmutableList.builder(); - for (JsonNode fileScanTaskNode : fileScanTasksArray) { - - // TODO we dont have caseSensitive flag at serial/deserial time - FileScanTask fileScanTask = - (FileScanTask) RESTFileScanTaskParser.fromJson(fileScanTaskNode, allDeleteFiles); - fileScanTaskBuilder.add(fileScanTask); - } - fileScanTasks = fileScanTaskBuilder.build(); - } - - if (planStatus != PlanStatus.COMPLETED && (planTasks != null || fileScanTasks != null)) { - throw new IllegalArgumentException( - "Invalid response: tasks can only be returned in a 'completed' status"); - } - - if ((allDeleteFiles != null && !allDeleteFiles.isEmpty()) - && (fileScanTasks == null || fileScanTasks.isEmpty())) { - throw new IllegalArgumentException( - "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); - } - - return new FetchPlanningResultResponse.Builder() - .withPlanStatus(planStatus) - .withPlanTasks(planTasks) - .withFileScanTasks(fileScanTasks) - .withDeleteFiles(allDeleteFiles) - .build(); - } -} diff --git a/core/src/main/java/org/apache/iceberg/FetchScanTasksResponseParser.java b/core/src/main/java/org/apache/iceberg/FetchScanTasksResponseParser.java deleted file mode 100644 index 06960979c9ce..000000000000 --- a/core/src/main/java/org/apache/iceberg/FetchScanTasksResponseParser.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.rest.responses.FetchScanTasksResponse; -import org.apache.iceberg.util.JsonUtil; - -public class FetchScanTasksResponseParser { - private static final String PLAN_TASKS = "plan-tasks"; - private static final String FILE_SCAN_TASKS = "file-scan-tasks"; - private static final String DELETE_FILES = "delete-files"; - - private FetchScanTasksResponseParser() {} - - public static String toJson(FetchScanTasksResponse response) { - // TODO need to pass specByIds - return toJson(response, false); - } - - public static String toJson(FetchScanTasksResponse response, boolean pretty) { - return JsonUtil.generate(gen -> toJson(response, gen), pretty); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static void toJson(FetchScanTasksResponse response, JsonGenerator gen) throws IOException { - Preconditions.checkArgument(null != response, "Invalid response: fetchScanTasksResponse null"); - - if (response.planTasks() == null && response.fileScanTasks() == null) { - throw new IllegalArgumentException( - "Invalid response: planTasks and fileScanTask can not both be null"); - } - - if ((response.deleteFiles() != null && !response.deleteFiles().isEmpty()) - && (response.fileScanTasks() == null || response.fileScanTasks().isEmpty())) { - throw new IllegalArgumentException( - "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); - } - - gen.writeStartObject(); - if (response.planTasks() != null) { - gen.writeArrayFieldStart(PLAN_TASKS); - for (String planTask : response.planTasks()) { - gen.writeString(planTask); - } - gen.writeEndArray(); - } - - List deleteFiles = null; - Map deleteFilePathToIndex = Maps.newHashMap(); - if (response.deleteFiles() != null) { - deleteFiles = response.deleteFiles(); - gen.writeArrayFieldStart(DELETE_FILES); - for (int i = 0; i < deleteFiles.size(); i++) { - DeleteFile deleteFile = deleteFiles.get(i); - deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i); - ContentFileParser.unboundContentFileToJson( - deleteFiles.get(i), response.specsById().get(deleteFile.specId()), gen); - } - gen.writeEndArray(); - } - - if (response.fileScanTasks() != null) { - Set deleteFileReferences = Sets.newHashSet(); - gen.writeArrayFieldStart(FILE_SCAN_TASKS); - for (FileScanTask fileScanTask : response.fileScanTasks()) { - if (deleteFiles != null) { - for (DeleteFile taskDelete : fileScanTask.deletes()) { - deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString())); - } - } - PartitionSpec partitionSpec = response.specsById().get(fileScanTask.file().specId()); - RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, partitionSpec, gen); - } - gen.writeEndArray(); - } - - gen.writeEndObject(); - } - - public static FetchScanTasksResponse fromJson(String json) { - Preconditions.checkArgument(json != null, "Cannot parse fetchScanTasks response from null"); - return JsonUtil.parse(json, FetchScanTasksResponseParser::fromJson); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static FetchScanTasksResponse fromJson(JsonNode json) { - Preconditions.checkArgument( - json != null && !json.isEmpty(), "Invalid response: fetchScanTasksResponse null"); - - List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); - - List allDeleteFiles = null; - if (json.has(DELETE_FILES)) { - JsonNode deletesArray = json.get(DELETE_FILES); - ImmutableList.Builder deleteFilesBuilder = ImmutableList.builder(); - for (JsonNode deleteFileNode : deletesArray) { - DeleteFile deleteFile = - (DeleteFile) ContentFileParser.unboundContentFileFromJson(deleteFileNode); - deleteFilesBuilder.add(deleteFile); - } - allDeleteFiles = deleteFilesBuilder.build(); - } - - List fileScanTasks = null; - if (json.has(FILE_SCAN_TASKS)) { - JsonNode fileScanTasksArray = json.get(FILE_SCAN_TASKS); - ImmutableList.Builder fileScanTaskBuilder = ImmutableList.builder(); - for (JsonNode fileScanTaskNode : fileScanTasksArray) { - // TODO we dont have caseSensitive flag at serial/deserialize time - FileScanTask fileScanTask = - (FileScanTask) RESTFileScanTaskParser.fromJson(fileScanTaskNode, allDeleteFiles); - fileScanTaskBuilder.add(fileScanTask); - } - fileScanTasks = fileScanTaskBuilder.build(); - } - - if (planTasks == null && fileScanTasks == null) { - throw new IllegalArgumentException( - "Invalid response: planTasks and fileScanTask can not both be null"); - } - - if ((allDeleteFiles != null && !allDeleteFiles.isEmpty()) - && (fileScanTasks == null || fileScanTasks.isEmpty())) { - throw new IllegalArgumentException( - "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); - } - - return new FetchScanTasksResponse.Builder() - .withPlanTasks(planTasks) - .withFileScanTasks(fileScanTasks) - .withDeleteFiles(allDeleteFiles) - .build(); - } -} diff --git a/core/src/main/java/org/apache/iceberg/PlanTableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/PlanTableScanResponseParser.java deleted file mode 100644 index b1675c10163b..000000000000 --- a/core/src/main/java/org/apache/iceberg/PlanTableScanResponseParser.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.rest.PlanStatus; -import org.apache.iceberg.rest.responses.PlanTableScanResponse; -import org.apache.iceberg.util.JsonUtil; - -public class PlanTableScanResponseParser { - private static final String PLAN_STATUS = "plan-status"; - private static final String PLAN_ID = "plan-id"; - private static final String PLAN_TASKS = "plan-tasks"; - private static final String FILE_SCAN_TASKS = "file-scan-tasks"; - private static final String DELETE_FILES = "delete-files"; - - private PlanTableScanResponseParser() {} - - public static String toJson(PlanTableScanResponse response) { - return toJson(response, false); - } - - public static String toJson(PlanTableScanResponse response, boolean pretty) { - return JsonUtil.generate(gen -> toJson(response, gen), pretty); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static void toJson(PlanTableScanResponse response, JsonGenerator gen) throws IOException { - Preconditions.checkArgument(null != response, "Invalid response: planTableScanResponse null"); - Preconditions.checkArgument( - response.planStatus() != null, "Invalid response: status can not be null"); - - if (response.planStatus() == PlanStatus.SUBMITTED && response.planId() == null) { - throw new IllegalArgumentException( - "Invalid response: planId to be non-null when status is 'submitted"); - } - - if (response.planStatus() == PlanStatus.CANCELLED) { - throw new IllegalArgumentException( - "Invalid response: 'cancelled' is not a valid status for planTableScan"); - } - - if (response.planStatus() != PlanStatus.COMPLETED - && (response.planTasks() != null || response.fileScanTasks() != null)) { - throw new IllegalArgumentException( - "Invalid response: tasks can only be returned in a 'completed' status"); - } - - if (response.planStatus() != PlanStatus.SUBMITTED && response.planId() != null) { - throw new IllegalArgumentException( - "Invalid response: plan-id can only be returned in a 'submitted' status"); - } - - if ((response.deleteFiles() != null && !response.deleteFiles().isEmpty()) - && (response.fileScanTasks() == null || response.fileScanTasks().isEmpty())) { - throw new IllegalArgumentException( - "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); - } - - gen.writeStartObject(); - gen.writeStringField(PLAN_STATUS, response.planStatus().status()); - - if (response.planId() != null) { - gen.writeStringField(PLAN_ID, response.planId()); - } - if (response.planTasks() != null) { - gen.writeArrayFieldStart(PLAN_TASKS); - for (String planTask : response.planTasks()) { - gen.writeString(planTask); - } - gen.writeEndArray(); - } - - List deleteFiles = null; - Map deleteFilePathToIndex = Maps.newHashMap(); - if (response.deleteFiles() != null) { - deleteFiles = response.deleteFiles(); - gen.writeArrayFieldStart(DELETE_FILES); - for (int i = 0; i < deleteFiles.size(); i++) { - DeleteFile deleteFile = deleteFiles.get(i); - deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i); - ContentFileParser.unboundContentFileToJson( - deleteFiles.get(i), response.specsById().get(deleteFile.specId()), gen); - } - gen.writeEndArray(); - } - - if (response.fileScanTasks() != null) { - Set deleteFileReferences = Sets.newHashSet(); - gen.writeArrayFieldStart(FILE_SCAN_TASKS); - for (FileScanTask fileScanTask : response.fileScanTasks()) { - if (deleteFiles != null) { - for (DeleteFile taskDelete : fileScanTask.deletes()) { - deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString())); - } - } - PartitionSpec partitionSpec = response.specsById().get(fileScanTask.file().specId()); - RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, partitionSpec, gen); - } - gen.writeEndArray(); - } - - gen.writeEndObject(); - } - - public static PlanTableScanResponse fromJson(String json) { - Preconditions.checkArgument( - json != null, "Cannot parse planTableScan response from empty or null object"); - return JsonUtil.parse(json, PlanTableScanResponseParser::fromJson); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static PlanTableScanResponse fromJson(JsonNode json) { - Preconditions.checkArgument(null != json, "Invalid response: planTableScanResponse null"); - Preconditions.checkArgument( - json != null && !json.isEmpty(), - "Cannot parse planTableScan response from empty or null object"); - - PlanStatus planStatus = null; - if (json.has(PLAN_STATUS)) { - String status = JsonUtil.getString(PLAN_STATUS, json); - planStatus = PlanStatus.fromName(status); - } - String planId = JsonUtil.getStringOrNull(PLAN_ID, json); - List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); - - List allDeleteFiles = null; - if (json.has(DELETE_FILES)) { - JsonNode deletesArray = json.get(DELETE_FILES); - ImmutableList.Builder deleteFilesBuilder = ImmutableList.builder(); - for (JsonNode deleteFileNode : deletesArray) { - DeleteFile deleteFile = - (DeleteFile) ContentFileParser.unboundContentFileFromJson(deleteFileNode); - deleteFilesBuilder.add(deleteFile); - } - allDeleteFiles = deleteFilesBuilder.build(); - } - - List fileScanTasks = null; - if (json.has(FILE_SCAN_TASKS)) { - JsonNode fileScanTasksArray = json.get(FILE_SCAN_TASKS); - ImmutableList.Builder fileScanTaskBuilder = ImmutableList.builder(); - for (JsonNode fileScanTaskNode : fileScanTasksArray) { - - // TODO we dont have caseSensitive flag at serial/deserial time - FileScanTask fileScanTask = - (FileScanTask) RESTFileScanTaskParser.fromJson(fileScanTaskNode, allDeleteFiles); - fileScanTaskBuilder.add(fileScanTask); - } - fileScanTasks = fileScanTaskBuilder.build(); - } - - if (planStatus == PlanStatus.SUBMITTED && planId == null) { - throw new IllegalArgumentException( - "Invalid response: planId to be non-null when status is 'submitted"); - } - - if (planStatus == PlanStatus.CANCELLED) { - throw new IllegalArgumentException( - "Invalid response: 'cancelled' is not a valid status for planTableScan"); - } - - if (planStatus != PlanStatus.COMPLETED && (planTasks != null || fileScanTasks != null)) { - throw new IllegalArgumentException( - "Invalid response: tasks can only be returned in a 'completed' status"); - } - - if (planStatus != PlanStatus.SUBMITTED && planId != null) { - throw new IllegalArgumentException( - "Invalid response: plan-id can only be returned in a 'submitted' status"); - } - - if ((allDeleteFiles != null && !allDeleteFiles.isEmpty()) - && (fileScanTasks == null || fileScanTasks.isEmpty())) { - throw new IllegalArgumentException( - "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); - } - - return new PlanTableScanResponse.Builder() - .withPlanId(planId) - .withPlanStatus(planStatus) - .withPlanTasks(planTasks) - .withFileScanTasks(fileScanTasks) - .withDeleteFiles(allDeleteFiles) - .build(); - } -} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java index 51b660d096f7..02bd59657570 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java @@ -28,12 +28,9 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; import java.io.IOException; -import org.apache.iceberg.FetchPlanningResultResponseParser; -import org.apache.iceberg.FetchScanTasksResponseParser; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.MetadataUpdateParser; import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.PlanTableScanResponseParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SortOrderParser; @@ -67,7 +64,9 @@ import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.ErrorResponseParser; import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponseParser; import org.apache.iceberg.rest.responses.FetchScanTasksResponse; +import org.apache.iceberg.rest.responses.FetchScanTasksResponseParser; import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadTableResponseParser; @@ -75,6 +74,7 @@ import org.apache.iceberg.rest.responses.LoadViewResponseParser; import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponseParser; import org.apache.iceberg.util.JsonUtil; public class RESTSerializers { diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java index 7cfc023c32a6..34acbcfc4820 100644 --- a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java @@ -71,13 +71,25 @@ public Map specsById() { return specsById; } + public static Builder builder() { + return new Builder(); + } + @Override public void validate() { Preconditions.checkArgument(planStatus() != null, "Invalid status: null"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } } public static class Builder { - public Builder() {} + private Builder() {} private PlanStatus planStatus; diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java new file mode 100644 index 000000000000..fb8dcf657e8a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponseParser.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.util.JsonUtil; + +public class FetchPlanningResultResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchPlanningResultResponseParser() {} + + public static String toJson(FetchPlanningResultResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchPlanningResultResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchPlanningResultResponse response, JsonGenerator gen) + throws IOException { + Preconditions.checkArgument( + null != response, "Invalid response: fetchPanningResultResponse null"); + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + public static FetchPlanningResultResponse fromJson(String json) { + Preconditions.checkArgument(json != null, "Invalid response: fetchPanningResultResponse null"); + return JsonUtil.parse(json, FetchPlanningResultResponseParser::fromJson); + } + + public static FetchPlanningResultResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), + "Invalid response: fetchPanningResultResponse null or empty"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + return FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java index 8a4f67da6d0f..3413c67ffe99 100644 --- a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java @@ -23,6 +23,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.rest.RESTResponse; public class FetchScanTasksResponse implements RESTResponse { @@ -40,6 +41,7 @@ private FetchScanTasksResponse( this.fileScanTasks = fileScanTasks; this.deleteFiles = deleteFiles; this.specsById = specsById; + validate(); } public List planTasks() { @@ -58,13 +60,25 @@ public Map specsById() { return specsById; } + public static Builder builder() { + return new Builder(); + } + @Override public void validate() { - // validation logic done in FetchScanTasksResponseParser + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + + Preconditions.checkArgument( + planTasks() != null || fileScanTasks() != null, + "Invalid response: planTasks and fileScanTask cannot both be null"); } public static class Builder { - public Builder() {} + private Builder() {} private List planTasks; private List fileScanTasks; diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java new file mode 100644 index 000000000000..60ebea6c85db --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponseParser.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class FetchScanTasksResponseParser { + private static final String PLAN_TASKS = "plan-tasks"; + + private FetchScanTasksResponseParser() {} + + public static String toJson(FetchScanTasksResponse response) { + return toJson(response, false); + } + + public static String toJson(FetchScanTasksResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(FetchScanTasksResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(response != null, "Invalid response: fetchScanTasksResponse null"); + gen.writeStartObject(); + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + gen.writeEndObject(); + } + + public static FetchScanTasksResponse fromJson(String json) { + Preconditions.checkArgument(json != null, "Cannot parse fetchScanTasks response from null"); + return JsonUtil.parse(json, FetchScanTasksResponseParser::fromJson); + } + + public static FetchScanTasksResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), "Invalid response: fetchScanTasksResponse null"); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + return FetchScanTasksResponse.builder() + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java index 2439471191a6..d754bbcfe03c 100644 --- a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java @@ -24,6 +24,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.rest.PlanStatus; import org.apache.iceberg.rest.RESTResponse; @@ -48,6 +49,7 @@ private PlanTableScanResponse( this.fileScanTasks = fileScanTasks; this.deleteFiles = deleteFiles; this.specsById = specsById; + validate(); } public PlanStatus planStatus() { @@ -79,20 +81,38 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("planStatus", planStatus) .add("planId", planId) - .add("planTasks", planTasks) - .add("fileScanTasks", fileScanTasks) - .add("deleteFiles", deleteFiles) - .add("specsById", specsById) .toString(); } @Override public void validate() { - // validation logic to be performed in PlanTableScanResponseParser + Preconditions.checkArgument( + planStatus() != null, "Invalid response: plan status must be defined"); + Preconditions.checkArgument( + planStatus() != PlanStatus.SUBMITTED || planId() != null, + "Invalid response: plan id should be defined when status is 'submitted'"); + Preconditions.checkArgument( + planStatus() != PlanStatus.CANCELLED, + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + Preconditions.checkArgument( + planStatus() == PlanStatus.COMPLETED || (planTasks() == null && fileScanTasks() == null), + "Invalid response: tasks can only be returned in a 'completed' status"); + Preconditions.checkArgument( + planStatus() == PlanStatus.SUBMITTED || planId() == null, + "Invalid response: plan id can only be returned in a 'submitted' status"); + if (fileScanTasks() == null || fileScanTasks.isEmpty()) { + Preconditions.checkArgument( + (deleteFiles() == null || deleteFiles().isEmpty()), + "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); + } + } + + public static Builder builder() { + return new Builder(); } public static class Builder { - public Builder() {} + private Builder() {} private PlanStatus planStatus; private String planId; diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java new file mode 100644 index 000000000000..9b1bea734465 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.util.JsonUtil; + +public class PlanTableScanResponseParser { + private static final String PLAN_STATUS = "plan-status"; + private static final String PLAN_ID = "plan-id"; + private static final String PLAN_TASKS = "plan-tasks"; + + private PlanTableScanResponseParser() {} + + public static String toJson(PlanTableScanResponse response) { + return toJson(response, false); + } + + public static String toJson(PlanTableScanResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(PlanTableScanResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != response, "Invalid response: planTableScanResponse null"); + Preconditions.checkArgument( + response.planStatus() != null, "Invalid response: status can not be null"); + + gen.writeStartObject(); + gen.writeStringField(PLAN_STATUS, response.planStatus().status()); + + if (response.planId() != null) { + gen.writeStringField(PLAN_ID, response.planId()); + } + if (response.planTasks() != null) { + JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen); + } + + TableScanResponseParser.serializeScanTasks( + response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen); + + gen.writeEndObject(); + } + + public static PlanTableScanResponse fromJson(String json) { + Preconditions.checkArgument( + json != null, "Cannot parse planTableScan response from empty or null object"); + return JsonUtil.parse(json, PlanTableScanResponseParser::fromJson); + } + + public static PlanTableScanResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + json != null && !json.isEmpty(), + "Cannot parse planTableScan response from empty or null object"); + + PlanStatus planStatus = PlanStatus.fromName(JsonUtil.getString(PLAN_STATUS, json)); + String planId = JsonUtil.getStringOrNull(PLAN_ID, json); + List planTasks = JsonUtil.getStringListOrNull(PLAN_TASKS, json); + List deleteFiles = TableScanResponseParser.parseDeleteFiles(json); + List fileScanTasks = + TableScanResponseParser.parseFileScanTasks(json, deleteFiles); + + return PlanTableScanResponse.builder() + .withPlanId(planId) + .withPlanStatus(planStatus) + .withPlanTasks(planTasks) + .withFileScanTasks(fileScanTasks) + .withDeleteFiles(deleteFiles) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponseParser.java new file mode 100644 index 000000000000..619e8f993863 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/TableScanResponseParser.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.ContentFileParser; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RESTFileScanTaskParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.JsonUtil; + +class TableScanResponseParser { + + private TableScanResponseParser() {} + + static final String FILE_SCAN_TASKS = "file-scan-tasks"; + static final String DELETE_FILES = "delete-files"; + + public static List parseDeleteFiles(JsonNode node) { + if (node.has(DELETE_FILES)) { + JsonNode deleteFiles = JsonUtil.get(DELETE_FILES, node); + Preconditions.checkArgument( + deleteFiles.isArray(), "Cannot parse delete files from non-array: %s", deleteFiles); + ImmutableList.Builder deleteFilesBuilder = ImmutableList.builder(); + for (JsonNode deleteFileNode : deleteFiles) { + DeleteFile deleteFile = + (DeleteFile) ContentFileParser.unboundContentFileFromJson(deleteFileNode); + deleteFilesBuilder.add(deleteFile); + } + return deleteFilesBuilder.build(); + } + + return null; + } + + public static List parseFileScanTasks(JsonNode node, List deleteFiles) { + if (node.has(FILE_SCAN_TASKS)) { + JsonNode scanTasks = JsonUtil.get(FILE_SCAN_TASKS, node); + Preconditions.checkArgument( + scanTasks.isArray(), "Cannot parse file scan tasks from non-array: %s", scanTasks); + ImmutableList.Builder fileScanTaskBuilder = ImmutableList.builder(); + for (JsonNode fileScanTaskNode : scanTasks) { + FileScanTask fileScanTask = RESTFileScanTaskParser.fromJson(fileScanTaskNode, deleteFiles); + fileScanTaskBuilder.add(fileScanTask); + } + + return fileScanTaskBuilder.build(); + } + + return null; + } + + public static void serializeScanTasks( + List fileScanTasks, + List deleteFiles, + Map specsById, + JsonGenerator gen) + throws IOException { + Map deleteFilePathToIndex = Maps.newHashMap(); + if (deleteFiles != null) { + Preconditions.checkArgument( + specsById != null, "Cannot serialize response without specs by ID defined"); + gen.writeArrayFieldStart(DELETE_FILES); + for (int i = 0; i < deleteFiles.size(); i++) { + DeleteFile deleteFile = deleteFiles.get(i); + deleteFilePathToIndex.put(String.valueOf(deleteFile.path()), i); + ContentFileParser.unboundContentFileToJson( + deleteFiles.get(i), specsById.get(deleteFile.specId()), gen); + } + gen.writeEndArray(); + } + + if (fileScanTasks != null) { + gen.writeArrayFieldStart(FILE_SCAN_TASKS); + Set deleteFileReferences = Sets.newHashSet(); + for (FileScanTask fileScanTask : fileScanTasks) { + if (deleteFiles != null) { + for (DeleteFile taskDelete : fileScanTask.deletes()) { + deleteFileReferences.add(deleteFilePathToIndex.get(taskDelete.path().toString())); + } + } + + PartitionSpec partitionSpec = specsById.get(fileScanTask.file().specId()); + RESTFileScanTaskParser.toJson(fileScanTask, deleteFileReferences, partitionSpec, gen); + } + gen.writeEndArray(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java similarity index 89% rename from core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponse.java rename to core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java index 32c2ea633dde..215ea1481da0 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponse.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchPlanningResultResponseParser.java @@ -30,7 +30,6 @@ import java.util.List; import org.apache.iceberg.BaseFileScanTask; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FetchPlanningResultResponseParser; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.SchemaParser; @@ -39,7 +38,7 @@ import org.apache.iceberg.rest.PlanStatus; import org.junit.jupiter.api.Test; -public class TestFetchPlanningResultResponse { +public class TestFetchPlanningResultResponseParser { @Test public void nullAndEmptyCheck() { @@ -57,7 +56,7 @@ public void roundTripSerdeWithEmptyObject() { assertThatThrownBy( () -> FetchPlanningResultResponseParser.toJson( - new FetchPlanningResultResponse.Builder().build())) + FetchPlanningResultResponse.builder().build())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid status: null"); @@ -79,7 +78,7 @@ public void roundTripSerdeWithInvalidPlanStatus() { public void roundTripSerdeWithValidSubmittedStatus() { PlanStatus planStatus = PlanStatus.fromName("submitted"); FetchPlanningResultResponse response = - new FetchPlanningResultResponse.Builder().withPlanStatus(planStatus).build(); + FetchPlanningResultResponse.builder().withPlanStatus(planStatus).build(); String expectedJson = "{\"plan-status\":\"submitted\"}"; String json = FetchPlanningResultResponseParser.toJson(response); @@ -92,13 +91,12 @@ public void roundTripSerdeWithValidSubmittedStatus() { @Test public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { PlanStatus planStatus = PlanStatus.fromName("submitted"); - FetchPlanningResultResponse response = - new FetchPlanningResultResponse.Builder() - .withPlanStatus(planStatus) - .withPlanTasks(List.of("task1", "task2")) - .build(); - - assertThatThrownBy(() -> FetchPlanningResultResponseParser.toJson(response)) + assertThatThrownBy( + () -> + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withPlanTasks(List.of("task1", "task2")) + .build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); @@ -113,13 +111,12 @@ public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { @Test public void roundTripSerdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { PlanStatus planStatus = PlanStatus.fromName("submitted"); - FetchPlanningResultResponse response = - new FetchPlanningResultResponse.Builder() - .withPlanStatus(planStatus) - .withDeleteFiles(List.of(FILE_A_DELETES)) - .build(); - - assertThatThrownBy(() -> FetchPlanningResultResponseParser.toJson(response)) + assertThatThrownBy( + () -> + FetchPlanningResultResponse.builder() + .withPlanStatus(planStatus) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); @@ -151,7 +148,7 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() { PlanStatus planStatus = PlanStatus.fromName("completed"); FetchPlanningResultResponse response = - new FetchPlanningResultResponse.Builder() + FetchPlanningResultResponse.builder() .withPlanStatus(planStatus) .withFileScanTasks(List.of(fileScanTask)) .withDeleteFiles(List.of(FILE_A_DELETES)) @@ -176,7 +173,7 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() { assertThat(json).isEqualTo(expectedToJson); // make an unbound json where you expect to not have partitions for the data file, - // delete files as service does not send parition spec + // delete files as service does not send partition spec String expectedFromJson = "{\"plan-status\":\"completed\"," + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," @@ -193,7 +190,7 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() { FetchPlanningResultResponse fromResponse = FetchPlanningResultResponseParser.fromJson(json); // Need to make a new response with partitionSpec set FetchPlanningResultResponse copyResponse = - new FetchPlanningResultResponse.Builder() + FetchPlanningResultResponse.builder() .withPlanStatus(fromResponse.planStatus()) .withPlanTasks(fromResponse.planTasks()) .withDeleteFiles(fromResponse.deleteFiles()) diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java similarity index 89% rename from core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponse.java rename to core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java index 46d0006b358e..4ddfbd1fcced 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponse.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestFetchScanTasksResponseParser.java @@ -30,7 +30,6 @@ import java.util.List; import org.apache.iceberg.BaseFileScanTask; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FetchScanTasksResponseParser; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.SchemaParser; @@ -38,7 +37,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.junit.jupiter.api.Test; -public class TestFetchScanTasksResponse { +public class TestFetchScanTasksResponseParser { @Test public void nullAndEmptyCheck() { @@ -54,9 +53,9 @@ public void nullAndEmptyCheck() { @Test public void roundTripSerdeWithEmptyObject() { assertThatThrownBy( - () -> FetchScanTasksResponseParser.toJson(new FetchScanTasksResponse.Builder().build())) + () -> FetchScanTasksResponseParser.toJson(FetchScanTasksResponse.builder().build())) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid response: planTasks and fileScanTask can not both be null"); + .hasMessage("Invalid response: planTasks and fileScanTask cannot both be null"); String emptyJson = "{ }"; assertThatThrownBy(() -> FetchScanTasksResponseParser.fromJson(emptyJson)) @@ -66,11 +65,10 @@ public void roundTripSerdeWithEmptyObject() { @Test public void roundTripSerdeWithPlanTasks() { - FetchScanTasksResponse response = - new FetchScanTasksResponse.Builder().withPlanTasks(List.of("task1", "task2")).build(); - String expectedJson = "{\"plan-tasks\":[\"task1\",\"task2\"]}"; - String json = FetchScanTasksResponseParser.toJson(response); + String json = + FetchScanTasksResponseParser.toJson( + FetchScanTasksResponse.builder().withPlanTasks(List.of("task1", "task2")).build()); assertThat(json).isEqualTo(expectedJson); FetchScanTasksResponse fromResponse = FetchScanTasksResponseParser.fromJson(json); @@ -82,13 +80,12 @@ public void roundTripSerdeWithPlanTasks() { @Test public void roundTripSerdeWithDeleteFilesNoFileScanTasksPresent() { - FetchScanTasksResponse response = - new FetchScanTasksResponse.Builder() - .withPlanTasks(List.of("task1", "task2")) - .withDeleteFiles(List.of(FILE_A_DELETES)) - .build(); - - assertThatThrownBy(() -> FetchScanTasksResponseParser.toJson(response)) + assertThatThrownBy( + () -> + FetchScanTasksResponse.builder() + .withPlanTasks(List.of("task1", "task2")) + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); @@ -119,7 +116,7 @@ public void roundTripSerdeWithFileScanTasks() { residualEvaluator); FetchScanTasksResponse response = - new FetchScanTasksResponse.Builder() + FetchScanTasksResponse.builder() .withFileScanTasks(List.of(fileScanTask)) .withDeleteFiles(List.of(FILE_A_DELETES)) // assume you have set this already @@ -160,7 +157,7 @@ public void roundTripSerdeWithFileScanTasks() { FetchScanTasksResponse fromResponse = FetchScanTasksResponseParser.fromJson(json); // Need to make a new response with partitionSpec set FetchScanTasksResponse copyResponse = - new FetchScanTasksResponse.Builder() + FetchScanTasksResponse.builder() .withPlanTasks(fromResponse.planTasks()) .withDeleteFiles(fromResponse.deleteFiles()) .withFileScanTasks(fromResponse.fileScanTasks()) diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java similarity index 81% rename from core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponse.java rename to core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java index 843436c3ab7d..893ad5d5de09 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponse.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java @@ -32,14 +32,13 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.PlanTableScanResponseParser; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.rest.PlanStatus; import org.junit.jupiter.api.Test; -public class TestPlanTableScanResponse { +public class TestPlanTableScanResponseParser { @Test public void nullAndEmptyCheck() { assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(null)) @@ -48,16 +47,15 @@ public void nullAndEmptyCheck() { assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson((JsonNode) null)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid response: planTableScanResponse null"); + .hasMessage("Cannot parse planTableScan response from empty or null object"); } @Test public void roundTripSerdeWithEmptyObject() { - PlanTableScanResponse response = new PlanTableScanResponse.Builder().build(); - assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(response)) + assertThatThrownBy(() -> PlanTableScanResponse.builder().build()) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid response: status can not be null"); + .hasMessage("Invalid response: plan status must be defined"); String emptyJson = "{ }"; assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(emptyJson)) @@ -76,26 +74,21 @@ public void roundTripSerdeWithInvalidPlanStatus() { @Test public void roundTripSerdeWithInvalidPlanStatusSubmittedWithoutPlanId() { PlanStatus planStatus = PlanStatus.fromName("submitted"); - PlanTableScanResponse response = - new PlanTableScanResponse.Builder().withPlanStatus(planStatus).build(); - assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(response)) + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid response: planId to be non-null when status is 'submitted"); + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); String invalidJson = "{\"plan-status\":\"submitted\"}"; assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid response: planId to be non-null when status is 'submitted"); + .hasMessage("Invalid response: plan id should be defined when status is 'submitted'"); } @Test public void roundTripSerdeWithInvalidPlanStatusCancelled() { PlanStatus planStatus = PlanStatus.fromName("cancelled"); - PlanTableScanResponse response = - new PlanTableScanResponse.Builder().withPlanStatus(planStatus).build(); - - assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(response)) + assertThatThrownBy(() -> PlanTableScanResponse.builder().withPlanStatus(planStatus).build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid response: 'cancelled' is not a valid status for planTableScan"); @@ -108,14 +101,13 @@ public void roundTripSerdeWithInvalidPlanStatusCancelled() { @Test public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { PlanStatus planStatus = PlanStatus.fromName("submitted"); - PlanTableScanResponse response = - new PlanTableScanResponse.Builder() - .withPlanStatus(planStatus) - .withPlanId("somePlanId") - .withPlanTasks(List.of("task1", "task2")) - .build(); - - assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(response)) + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withPlanTasks(List.of("task1", "task2")) + .build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid response: tasks can only be returned in a 'completed' status"); @@ -132,34 +124,32 @@ public void roundTripSerdeWithInvalidPlanStatusSubmittedWithTasksPresent() { @Test public void roundTripSerdeWithInvalidPlanIdWithIncorrectStatus() { PlanStatus planStatus = PlanStatus.fromName("failed"); - PlanTableScanResponse response = - new PlanTableScanResponse.Builder() - .withPlanStatus(planStatus) - .withPlanId("somePlanId") - .build(); - - assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(response)) + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .build()) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid response: plan-id can only be returned in a 'submitted' status"); + .hasMessage("Invalid response: plan id can only be returned in a 'submitted' status"); String invalidJson = "{\"plan-status\":\"failed\"," + "\"plan-id\":\"somePlanId\"}"; assertThatThrownBy(() -> PlanTableScanResponseParser.fromJson(invalidJson)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid response: plan-id can only be returned in a 'submitted' status"); + .hasMessage("Invalid response: plan id can only be returned in a 'submitted' status"); } @Test public void roundTripSerdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { PlanStatus planStatus = PlanStatus.fromName("submitted"); - PlanTableScanResponse response = - new PlanTableScanResponse.Builder() - .withPlanStatus(planStatus) - .withPlanId("somePlanId") - .withDeleteFiles(List.of(FILE_A_DELETES)) - .build(); - - assertThatThrownBy(() -> PlanTableScanResponseParser.toJson(response)) + assertThatThrownBy( + () -> + PlanTableScanResponse.builder() + .withPlanStatus(planStatus) + .withPlanId("somePlanId") + .withDeleteFiles(List.of(FILE_A_DELETES)) + .build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Invalid response: deleteFiles should only be returned with fileScanTasks that reference them"); @@ -192,7 +182,7 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() { PlanStatus planStatus = PlanStatus.fromName("completed"); PlanTableScanResponse response = - new PlanTableScanResponse.Builder() + PlanTableScanResponse.builder() .withPlanStatus(planStatus) .withFileScanTasks(List.of(fileScanTask)) .withDeleteFiles(List.of(FILE_A_DELETES)) @@ -217,7 +207,7 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() { assertThat(json).isEqualTo(expectedToJson); // make an unbound json where you expect to not have partitions for the data file, - // delete files as service does not send parition spec + // delete files as service does not send partition spec String expectedFromJson = "{\"plan-status\":\"completed\"," + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," @@ -234,7 +224,7 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() { PlanTableScanResponse fromResponse = PlanTableScanResponseParser.fromJson(json); // Need to make a new response with partitionSpec set PlanTableScanResponse copyResponse = - new PlanTableScanResponse.Builder() + PlanTableScanResponse.builder() .withPlanStatus(fromResponse.planStatus()) .withPlanId(fromResponse.planId()) .withPlanTasks(fromResponse.planTasks())