Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.util.Map;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class InMemoryFileIO implements FileIO {

private Map<String, byte[]> inMemoryFiles = Maps.newConcurrentMap();
private boolean closed = false;

public void addFile(String location, byte[] contents) {
Preconditions.checkState(!closed, "Cannot call addFile after calling close()");
inMemoryFiles.put(location, contents);
}

public boolean fileExists(String location) {
return inMemoryFiles.containsKey(location);
}

@Override
public InputFile newInputFile(String location) {
Preconditions.checkState(!closed, "Cannot call newInputFile after calling close()");
byte[] contents = inMemoryFiles.get(location);
if (null == contents) {
throw new NotFoundException("No in-memory file found for location: %s", location);
}
return new InMemoryInputFile(location, contents);
}

@Override
public OutputFile newOutputFile(String location) {
Preconditions.checkState(!closed, "Cannot call newOutputFile after calling close()");
return new InMemoryOutputFile(location, this);
}

@Override
public void deleteFile(String location) {
Preconditions.checkState(!closed, "Cannot call deleteFile after calling close()");
if (null == inMemoryFiles.remove(location)) {
throw new NotFoundException("No in-memory file found for location: %s", location);
}
}

public boolean isClosed() {
return closed;
}

@Override
public void close() {
closed = true;
}
}
26 changes: 24 additions & 2 deletions core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,38 @@ public class InMemoryOutputFile implements OutputFile {

private boolean exists = false;
private ByteArrayOutputStream contents;
private InMemoryFileIO parentFileIO;

public InMemoryOutputFile() {
this("memory:" + UUID.randomUUID());
}

public InMemoryOutputFile(String location) {
this(location, null);
}

/**
* If the optional parentFileIO is provided, file-existence behaves similarly to S3FileIO;
* existence checks are performed up-front if creating without overwrite, but files only exist in
* the parentFileIO if close() has been called on the associated output streams (or pre-existing
* files are populated into the parentFileIO through other means).
*
* @param location the location returned by location() of this OutputFile, the InputFile obtained
* from calling toInputFile(), and the location for looking up the associated InputFile from a
* parentFileIO, if non-null.
* @param parentFileIO if non-null, commits an associated InMemoryInputFile on close() into the
* parentFileIO, and uses the parentFileIO for "already exists" checks if creating without
* overwriting.
*/
public InMemoryOutputFile(String location, InMemoryFileIO parentFileIO) {
Preconditions.checkNotNull(location, "location is null");
this.location = location;
this.parentFileIO = parentFileIO;
}

@Override
public PositionOutputStream create() {
if (exists) {
if (exists || (parentFileIO != null && parentFileIO.fileExists(location))) {
throw new AlreadyExistsException("Already exists");
}
return createOrOverwrite();
Expand Down Expand Up @@ -70,7 +89,7 @@ public byte[] toByteArray() {
return contents.toByteArray();
}

private static class InMemoryPositionOutputStream extends PositionOutputStream {
private class InMemoryPositionOutputStream extends PositionOutputStream {
private final ByteArrayOutputStream delegate;
private boolean closed = false;

Expand Down Expand Up @@ -112,6 +131,9 @@ public void flush() throws IOException {
public void close() throws IOException {
delegate.close();
closed = true;
if (parentFileIO != null) {
parentFileIO.addFile(location(), toByteArray());
}
}

private void checkOpen() {
Expand Down
111 changes: 111 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class TestInMemoryFileIO {
String location = "s3://foo/bar.txt";

@Test
public void testBasicEndToEnd() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
Assertions.assertThat(fileIO.fileExists(location)).isFalse();

OutputStream outputStream = fileIO.newOutputFile(location).create();
byte[] data = "hello world".getBytes();
outputStream.write(data);
outputStream.close();
Assertions.assertThat(fileIO.fileExists(location)).isTrue();

InputStream inputStream = fileIO.newInputFile(location).newStream();
byte[] buf = new byte[data.length];
inputStream.read(buf);
inputStream.close();
Assertions.assertThat(new String(buf)).isEqualTo("hello world");

fileIO.deleteFile(location);
Assertions.assertThat(fileIO.fileExists(location)).isFalse();
}

@Test
public void testNewInputFileNotFound() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
Assertions.assertThatExceptionOfType(NotFoundException.class)
.isThrownBy(() -> fileIO.newInputFile("s3://nonexistent/file"));
}

@Test
public void testDeleteFileNotFound() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
Assertions.assertThatExceptionOfType(NotFoundException.class)
.isThrownBy(() -> fileIO.deleteFile("s3://nonexistent/file"));
}

@Test
public void testCreateNoOverwrite() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
fileIO.addFile(location, "hello world".getBytes());
Assertions.assertThatExceptionOfType(AlreadyExistsException.class)
.isThrownBy(() -> fileIO.newOutputFile(location).create());
}

@Test
public void testOverwriteBeforeAndAfterClose() throws IOException {
byte[] oldData = "old data".getBytes();
byte[] newData = "new data".getBytes();

InMemoryFileIO fileIO = new InMemoryFileIO();
OutputStream outputStream = fileIO.newOutputFile(location).create();
outputStream.write(oldData);

// Even though we've called create() and started writing data, this file won't yet exist
// in the parentFileIO before we've closed it.
Assertions.assertThat(fileIO.fileExists(location)).isFalse();

// File appears after closing it.
outputStream.close();
Assertions.assertThat(fileIO.fileExists(location)).isTrue();

// Start a new OutputFile and write new data but don't close() it yet.
outputStream = fileIO.newOutputFile(location).createOrOverwrite();
outputStream.write(newData);

// We'll still read old data.
InputStream inputStream = fileIO.newInputFile(location).newStream();
byte[] buf = new byte[oldData.length];
inputStream.read(buf);
inputStream.close();
Assertions.assertThat(new String(buf)).isEqualTo("old data");

// Finally, close the new output stream; data should be overwritten with new data now.
outputStream.close();
inputStream = fileIO.newInputFile(location).newStream();
buf = new byte[newData.length];
inputStream.read(buf);
inputStream.close();
Assertions.assertThat(new String(buf)).isEqualTo("new data");
}
}