diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index bb741198d794..6b6010b19b7f 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -21,10 +21,11 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.io.Serializable; +import java.util.Collections; import java.util.List; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData.SchemaConstructable; @@ -153,7 +154,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { this.existingRowsCount = toCopy.existingRowsCount; this.deletedFilesCount = toCopy.deletedFilesCount; this.deletedRowsCount = toCopy.deletedRowsCount; - this.partitions = ImmutableList.copyOf(Iterables.transform(toCopy.partitions, PartitionFieldSummary::copy)); + this.partitions = copyList(toCopy.partitions, PartitionFieldSummary::copy); this.fromProjectionPos = toCopy.fromProjectionPos; } @@ -387,7 +388,7 @@ private CopyBuilder(ManifestFile toCopy) { toCopy.path(), toCopy.length(), toCopy.partitionSpecId(), toCopy.snapshotId(), toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(), toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(), - toCopy.partitions()); + copyList(toCopy.partitions(), PartitionFieldSummary::copy)); } } @@ -400,4 +401,15 @@ public ManifestFile build() { return manifestFile; } } + + private static List copyList(List list, Function transform) { + if (list != null) { + List copy = Lists.newArrayListWithExpectedSize(list.size()); + for (E element : list) { + copy.add(transform.apply(element)); + } + return Collections.unmodifiableList(copy); + } + return null; + } } diff --git a/spark/src/test/java/org/apache/iceberg/TestManifestFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestManifestFileSerialization.java new file mode 100644 index 000000000000..df1a7ea20126 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/TestManifestFileSerialization.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.ManifestFile.PartitionFieldSummary; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.types.Types; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestManifestFileSerialization { + + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + required(3, "date", Types.StringType.get())); + + private static final PartitionSpec SPEC = PartitionSpec + .builderFor(SCHEMA) + .identity("date") + .build(); + + private static final DataFile FILE_A = DataFiles.builder(SPEC) + .withPath("/path/to/data-1.parquet") + .withFileSizeInBytes(0) + .withPartitionPath("date=2018-06-08") + .withMetrics(new Metrics(5L, + null, // no column sizes + ImmutableMap.of(1, 5L, 2, 3L), // value count + ImmutableMap.of(1, 0L, 2, 2L), // null count + ImmutableMap.of(1, longToBuffer(0L)), // lower bounds + ImmutableMap.of(1, longToBuffer(4L)) // upper bounds + )) + .build(); + + private static final FileIO FILE_IO = new HadoopFileIO(new Configuration()); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testManifestFileKryoSerialization() throws IOException { + File data = temp.newFile(); + Assert.assertTrue(data.delete()); + + Kryo kryo = new KryoSerializer(new SparkConf()).newKryo(); + + ManifestFile manifest = writeManifest(FILE_A); + + try (Output out = new Output(new FileOutputStream(data))) { + kryo.writeClassAndObject(out, manifest); + kryo.writeClassAndObject(out, manifest.copy()); + kryo.writeClassAndObject(out, GenericManifestFile.copyOf(manifest).build()); + } + + try (Input in = new Input(new FileInputStream(data))) { + for (int i = 0; i < 3; i += 1) { + Object obj = kryo.readClassAndObject(in); + Assert.assertTrue("Should be a ManifestFile", obj instanceof ManifestFile); + checkManifestFile(manifest, (ManifestFile) obj); + } + } + } + + @Test + public void testManifestFileJavaSerialization() throws Exception { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + ManifestFile manifest = writeManifest(FILE_A); + + try (ObjectOutputStream out = new ObjectOutputStream(bytes)) { + out.writeObject(manifest); + out.writeObject(manifest.copy()); + out.writeObject(GenericManifestFile.copyOf(manifest).build()); + } + + try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) { + for (int i = 0; i < 3; i += 1) { + Object obj = in.readObject(); + Assert.assertTrue("Should be a ManifestFile", obj instanceof ManifestFile); + checkManifestFile(manifest, (ManifestFile) obj); + } + } + } + + private void checkManifestFile(ManifestFile expected, ManifestFile actual) { + Assert.assertEquals("Path must match", expected.path(), actual.path()); + Assert.assertEquals("Length must match", expected.length(), actual.length()); + Assert.assertEquals("Spec id must match", expected.partitionSpecId(), actual.partitionSpecId()); + Assert.assertEquals("Snapshot id must match", expected.snapshotId(), actual.snapshotId()); + Assert.assertEquals("Added files flag must match", expected.hasAddedFiles(), actual.hasAddedFiles()); + Assert.assertEquals("Added files count must match", expected.addedFilesCount(), actual.addedFilesCount()); + Assert.assertEquals("Added rows count must match", expected.addedRowsCount(), actual.addedRowsCount()); + Assert.assertEquals("Existing files flag must match", expected.hasExistingFiles(), actual.hasExistingFiles()); + Assert.assertEquals("Existing files count must match", expected.existingFilesCount(), actual.existingFilesCount()); + Assert.assertEquals("Existing rows count must match", expected.existingRowsCount(), actual.existingRowsCount()); + Assert.assertEquals("Deleted files flag must match", expected.hasDeletedFiles(), actual.hasDeletedFiles()); + Assert.assertEquals("Deleted files count must match", expected.deletedFilesCount(), actual.deletedFilesCount()); + Assert.assertEquals("Deleted rows count must match", expected.deletedRowsCount(), actual.deletedRowsCount()); + + PartitionFieldSummary expectedPartition = expected.partitions().get(0); + PartitionFieldSummary actualPartition = actual.partitions().get(0); + + Assert.assertEquals("Null flag in partition must match", + expectedPartition.containsNull(), actualPartition.containsNull()); + Assert.assertEquals("Lower bounds in partition must match", + expectedPartition.lowerBound(), actualPartition.lowerBound()); + Assert.assertEquals("Upper bounds in partition must match", + expectedPartition.upperBound(), actualPartition.upperBound()); + } + + private ManifestFile writeManifest(DataFile... files) throws IOException { + File manifestFile = temp.newFile("input.m0.avro"); + Assert.assertTrue(manifestFile.delete()); + OutputFile outputFile = FILE_IO.newOutputFile(manifestFile.getCanonicalPath()); + + ManifestWriter writer = ManifestWriter.write(SPEC, outputFile); + try { + for (DataFile file : files) { + writer.add(file); + } + } finally { + writer.close(); + } + + return writer.toManifestFile(); + } + + private static ByteBuffer longToBuffer(long value) { + return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value); + } +}