diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java index f25b21bac7e9..fb4d52a52115 100644 --- a/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java +++ b/core/src/test/java/org/apache/iceberg/io/InMemoryInputFile.java @@ -64,6 +64,7 @@ private static class InMemorySeekableInputStream extends SeekableInputStream { private final int length; private final ByteArrayInputStream delegate; + private boolean closed = false; InMemorySeekableInputStream(byte[] contents) { this.length = contents.length; @@ -72,11 +73,13 @@ private static class InMemorySeekableInputStream extends SeekableInputStream { @Override public long getPos() throws IOException { + checkOpen(); return length - delegate.available(); } @Override public void seek(long newPos) throws IOException { + checkOpen(); delegate.reset(); // resets to a marked position Preconditions.checkState(delegate.skip(newPos) == newPos, "Invalid position %s within stream of length %s", newPos, length); @@ -84,26 +87,31 @@ public void seek(long newPos) throws IOException { @Override public int read() { + checkOpen(); return delegate.read(); } @Override public int read(byte[] b) throws IOException { + checkOpen(); return delegate.read(b); } @Override public int read(byte[] b, int off, int len) { + checkOpen(); return delegate.read(b, off, len); } @Override public long skip(long n) { + checkOpen(); return delegate.skip(n); } @Override public int available() { + checkOpen(); return delegate.available(); } @@ -120,12 +128,19 @@ public void mark(int readAheadLimit) { @Override public void reset() { + checkOpen(); delegate.reset(); } @Override public void close() throws IOException { delegate.close(); + closed = true; + } + + private void checkOpen() { + // ByteArrayInputStream can be used even after close, so for test purposes disallow such use explicitly + Preconditions.checkState(!closed, "Stream is closed"); } } } diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java index c64ec00c6e5e..3f03470d64ab 100644 --- a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java +++ b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java @@ -73,6 +73,7 @@ public byte[] toByteArray() { private static class InMemoryPositionOutputStream extends PositionOutputStream { private final ByteArrayOutputStream delegate; + private boolean closed = false; InMemoryPositionOutputStream(ByteArrayOutputStream delegate) { Preconditions.checkNotNull(delegate, "delegate is null"); @@ -86,27 +87,37 @@ public long getPos() { @Override public void write(int b) { + checkOpen(); delegate.write(b); } @Override public void write(byte[] b) throws IOException { + checkOpen(); delegate.write(b); } @Override public void write(byte[] b, int off, int len) { + checkOpen(); delegate.write(b, off, len); } @Override public void flush() throws IOException { + checkOpen(); delegate.flush(); } @Override public void close() throws IOException { delegate.close(); + closed = true; + } + + private void checkOpen() { + // ByteArrayOutputStream can be used even after close, so for test purposes disallow such use explicitly + Preconditions.checkState(!closed, "Stream is closed"); } } } diff --git a/core/src/test/java/org/apache/iceberg/io/TestInMemoryInputFile.java b/core/src/test/java/org/apache/iceberg/io/TestInMemoryInputFile.java new file mode 100644 index 000000000000..95a10fcf4bb8 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestInMemoryInputFile.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; + +public class TestInMemoryInputFile { + @Test + public void testReadAfterClose() throws IOException { + InMemoryInputFile inputFile = new InMemoryInputFile("abc".getBytes(StandardCharsets.ISO_8859_1)); + InputStream inputStream = inputFile.newStream(); + Assert.assertEquals('a', inputStream.read()); + inputStream.close(); + Assertions.assertThatThrownBy(inputStream::read) + .hasMessage("Stream is closed"); + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/TestInMemoryOutputFile.java b/core/src/test/java/org/apache/iceberg/io/TestInMemoryOutputFile.java new file mode 100644 index 000000000000..1c9c4b8e4c39 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestInMemoryOutputFile.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class TestInMemoryOutputFile { + @Test + public void testWriteAfterClose() throws IOException { + InMemoryOutputFile outputFile = new InMemoryOutputFile(); + OutputStream outputStream = outputFile.create(); + outputStream.write('a'); + outputStream.write('b'); + outputStream.close(); + Assertions.assertThatThrownBy(() -> outputStream.write('c')) + .hasMessage("Stream is closed"); + Assertions.assertThat(outputFile.toByteArray()) + .isEqualTo("ab".getBytes(StandardCharsets.ISO_8859_1)); + } +}