diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java index 390ba0de12ae..8cc64f0787a4 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java @@ -30,6 +30,7 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InMemoryOutputFile; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -251,7 +252,7 @@ public void testManifestsPartitionSummary() throws IOException { } private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException { - OutputFile manifestList = Files.localOutput(temp.newFile()); + OutputFile manifestList = new InMemoryOutputFile(); try (FileAppender writer = ManifestLists.write( formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, formatVersion > 1 ? SEQ_NUM : 0)) { writer.add(manifest); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index c9230315e48b..79aa35db974c 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -24,6 +24,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InMemoryOutputFile; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -235,7 +236,7 @@ void checkRewrittenManifest(ManifestFile manifest, long expectedSequenceNumber, } private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException { - OutputFile manifestList = Files.localOutput(temp.newFile()); + OutputFile manifestList = new InMemoryOutputFile(); try (FileAppender writer = ManifestLists.write( formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, formatVersion > 1 ? SEQUENCE_NUMBER : 0)) { writer.add(manifest); diff --git a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java index 0ae5944013ea..5a60ece7b0a3 100644 --- a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java +++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java @@ -28,6 +28,8 @@ import org.apache.iceberg.avro.RandomAvroData; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InMemoryOutputFile; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.After; @@ -65,11 +67,11 @@ public TestScansAndSchemaEvolution(int formatVersion) { @Rule public TemporaryFolder temp = new TemporaryFolder(); - private DataFile createDataFile(File dataPath, String partValue) throws IOException { + private DataFile createDataFile(String partValue) throws IOException { List expected = RandomAvroData.generate(SCHEMA, 100, 0L); - File dataFile = new File(dataPath, FileFormat.AVRO.addExtension(UUID.randomUUID().toString())); - try (FileAppender writer = Avro.write(Files.localOutput(dataFile)) + OutputFile dataFile = new InMemoryOutputFile(FileFormat.AVRO.addExtension(UUID.randomUUID().toString())); + try (FileAppender writer = Avro.write(dataFile) .schema(SCHEMA) .named("test") .build()) { @@ -82,7 +84,7 @@ private DataFile createDataFile(File dataPath, String partValue) throws IOExcept PartitionData partition = new PartitionData(SPEC.partitionType()); partition.set(0, partValue); return DataFiles.builder(SPEC) - .withInputFile(Files.localInput(dataFile)) + .withInputFile(dataFile.toInputFile()) .withPartition(partition) .withRecordCount(100) .build(); @@ -96,13 +98,12 @@ public void cleanupTables() { @Test public void testPartitionSourceRename() throws IOException { File location = temp.newFolder(); - File dataLocation = new File(location, "data"); Assert.assertTrue(location.delete()); // should be created by table create Table table = TestTables.create(location, "test", SCHEMA, SPEC, formatVersion); - DataFile fileOne = createDataFile(dataLocation, "one"); - DataFile fileTwo = createDataFile(dataLocation, "two"); + DataFile fileOne = createDataFile("one"); + DataFile fileTwo = createDataFile("two"); table.newAppend() .appendFile(fileOne) diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java index dc6113922119..d772f54dd626 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java @@ -35,6 +35,7 @@ import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.io.InMemoryOutputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -73,9 +74,7 @@ public void createDeleteRecords() { @Test public void testEqualityDeleteWriter() throws IOException { - File deleteFile = temp.newFile(); - - OutputFile out = Files.localOutput(deleteFile); + OutputFile out = new InMemoryOutputFile(); EqualityDeleteWriter deleteWriter = Avro.writeDeletes(out) .createWriterFunc(DataWriter::create) .overwrite() @@ -108,8 +107,6 @@ public void testEqualityDeleteWriter() throws IOException { @Test public void testPositionDeleteWriter() throws IOException { - File deleteFile = temp.newFile(); - Schema deleteSchema = new Schema( MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS, @@ -119,7 +116,7 @@ public void testPositionDeleteWriter() throws IOException { GenericRecord posDelete = GenericRecord.create(deleteSchema); List expectedDeleteRecords = Lists.newArrayList(); - OutputFile out = Files.localOutput(deleteFile); + OutputFile out = new InMemoryOutputFile(); PositionDeleteWriter deleteWriter = Avro.writeDeletes(out) .createWriterFunc(DataWriter::create) .overwrite() diff --git a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java index 6849657ddf33..f88c89127c88 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java @@ -19,25 +19,22 @@ package org.apache.iceberg.avro; -import java.io.File; import java.io.IOException; import java.util.List; import org.apache.avro.generic.GenericData.Record; -import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InMemoryOutputFile; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.junit.Assert; public class TestGenericAvro extends AvroDataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { List expected = RandomAvroData.generate(schema, 100, 0L); - File testFile = temp.newFile(); - Assert.assertTrue("Delete should succeed", testFile.delete()); - - try (FileAppender writer = Avro.write(Files.localOutput(testFile)) + OutputFile outputFile = new InMemoryOutputFile(); + try (FileAppender writer = Avro.write(outputFile) .schema(schema) .named("test") .build()) { @@ -47,7 +44,7 @@ protected void writeAndValidate(Schema schema) throws IOException { } List rows; - try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + try (AvroIterable reader = Avro.read(outputFile.toInputFile()) .project(schema) .build()) { rows = Lists.newArrayList(reader); diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java new file mode 100644 index 000000000000..f25b21bac7e9 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.UUID; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class InMemoryInputFile implements InputFile { + + private final String location; + private final byte[] contents; + + public InMemoryInputFile(byte[] contents) { + this("memory:" + UUID.randomUUID(), contents); + } + + public InMemoryInputFile(String location, byte[] contents) { + Preconditions.checkNotNull(location, "location is null"); + Preconditions.checkNotNull(contents, "contents is null"); + this.location = location; + this.contents = contents.clone(); + } + + @Override + public long getLength() { + return contents.length; + } + + @Override + public SeekableInputStream newStream() { + return new InMemorySeekableInputStream(contents); + } + + @Override + public String location() { + return location; + } + + @Override + public boolean exists() { + return true; + } + + private static class InMemorySeekableInputStream extends SeekableInputStream { + + private final int length; + private final ByteArrayInputStream delegate; + + InMemorySeekableInputStream(byte[] contents) { + this.length = contents.length; + this.delegate = new ByteArrayInputStream(contents); + } + + @Override + public long getPos() throws IOException { + return length - delegate.available(); + } + + @Override + public void seek(long newPos) throws IOException { + delegate.reset(); // resets to a marked position + Preconditions.checkState(delegate.skip(newPos) == newPos, + "Invalid position %s within stream of length %s", newPos, length); + } + + @Override + public int read() { + return delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return delegate.read(b); + } + + @Override + public int read(byte[] b, int off, int len) { + return delegate.read(b, off, len); + } + + @Override + public long skip(long n) { + return delegate.skip(n); + } + + @Override + public int available() { + return delegate.available(); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readAheadLimit) { + // The delegate's mark is used to implement seek + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + delegate.reset(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java new file mode 100644 index 000000000000..c64ec00c6e5e --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.UUID; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class InMemoryOutputFile implements OutputFile { + + private final String location; + + private boolean exists = false; + private ByteArrayOutputStream contents; + + public InMemoryOutputFile() { + this("memory:" + UUID.randomUUID()); + } + + public InMemoryOutputFile(String location) { + Preconditions.checkNotNull(location, "location is null"); + this.location = location; + } + + @Override + public PositionOutputStream create() { + if (exists) { + throw new AlreadyExistsException("Already exists"); + } + return createOrOverwrite(); + } + + @Override + public PositionOutputStream createOrOverwrite() { + exists = true; + contents = new ByteArrayOutputStream(); + return new InMemoryPositionOutputStream(contents); + } + + @Override + public String location() { + return location; + } + + @Override + public InputFile toInputFile() { + Preconditions.checkState(exists, "Cannot convert a file that has not been written yet"); + return new InMemoryInputFile(location(), toByteArray()); + } + + public byte[] toByteArray() { + return contents.toByteArray(); + } + + private static class InMemoryPositionOutputStream extends PositionOutputStream { + private final ByteArrayOutputStream delegate; + + InMemoryPositionOutputStream(ByteArrayOutputStream delegate) { + Preconditions.checkNotNull(delegate, "delegate is null"); + this.delegate = delegate; + } + + @Override + public long getPos() { + return delegate.size(); + } + + @Override + public void write(int b) { + delegate.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + delegate.write(b); + } + + @Override + public void write(byte[] b, int off, int len) { + delegate.write(b, off, len); + } + + @Override + public void flush() throws IOException { + delegate.flush(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } +}