Skip to content

Commit

Permalink
Spark: Clean up FileIO instances on executors (#8685)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Oct 6, 2023
1 parent 0259918 commit 5a98aef
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,30 @@
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a serializable table with a known size estimate. Spark calls its
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
* providing a known size estimate allows that operation to be skipped.
*
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
* garbage collected on the driver. The implementation ensures only resources used by copies of the
* main table are released.
*/
public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation {
public class SerializableTableWithSize extends SerializableTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;

private final transient Object serializationMarker;

protected SerializableTableWithSize(Table table) {
super(table);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -49,6 +61,14 @@ public static Table copyOf(Table table) {
}
}

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
}

public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
implements KnownSizeEstimation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -63,6 +69,44 @@ public void initTable() throws IOException {
this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
}

@Test
public void testCloseSerializableTableKryoSerialization() throws Exception {
Table spyTable = spy(table);
FileIO spyIO = spy(table.io());
when(spyTable.io()).thenReturn(spyIO);

Table serializableTable = SerializableTableWithSize.copyOf(spyTable);

Table serializableTableCopy = spy(KryoHelpers.roundTripSerialize(serializableTable));
FileIO spyFileIOCopy = spy(serializableTableCopy.io());
when(serializableTableCopy.io()).thenReturn(spyFileIOCopy);

((AutoCloseable) serializableTable).close(); // mimics close on the driver
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
}

@Test
public void testCloseSerializableTableJavaSerialization() throws Exception {
Table spyTable = spy(table);
FileIO spyIO = spy(table.io());
when(spyTable.io()).thenReturn(spyIO);

Table serializableTable = SerializableTableWithSize.copyOf(spyTable);

Table serializableTableCopy = spy(TestHelpers.roundTripSerialize(serializableTable));
FileIO spyFileIOCopy = spy(serializableTableCopy.io());
when(serializableTableCopy.io()).thenReturn(spyFileIOCopy);

((AutoCloseable) serializableTable).close(); // mimics close on the driver
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
}

@Test
public void testSerializableTableKryoSerialization() throws IOException {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,30 @@
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a serializable table with a known size estimate. Spark calls its
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
* providing a known size estimate allows that operation to be skipped.
*
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
* garbage collected on the driver. The implementation ensures only resources used by copies of the
* main table are released.
*/
public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation {
public class SerializableTableWithSize extends SerializableTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;

private final transient Object serializationMarker;

protected SerializableTableWithSize(Table table) {
super(table);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -49,6 +61,14 @@ public static Table copyOf(Table table) {
}
}

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
}

public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
implements KnownSizeEstimation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -78,6 +84,44 @@ public void initTable() throws IOException {
this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
}

@Test
public void testCloseSerializableTableKryoSerialization() throws Exception {
Table spyTable = spy(table);
FileIO spyIO = spy(table.io());
when(spyTable.io()).thenReturn(spyIO);

Table serializableTable = SerializableTableWithSize.copyOf(spyTable);

Table serializableTableCopy = spy(KryoHelpers.roundTripSerialize(serializableTable));
FileIO spyFileIOCopy = spy(serializableTableCopy.io());
when(serializableTableCopy.io()).thenReturn(spyFileIOCopy);

((AutoCloseable) serializableTable).close(); // mimics close on the driver
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
}

@Test
public void testCloseSerializableTableJavaSerialization() throws Exception {
Table spyTable = spy(table);
FileIO spyIO = spy(table.io());
when(spyTable.io()).thenReturn(spyIO);

Table serializableTable = SerializableTableWithSize.copyOf(spyTable);

Table serializableTableCopy = spy(TestHelpers.roundTripSerialize(serializableTable));
FileIO spyFileIOCopy = spy(serializableTableCopy.io());
when(serializableTableCopy.io()).thenReturn(spyFileIOCopy);

((AutoCloseable) serializableTable).close(); // mimics close on the driver
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
}

@Test
public void testSerializableTableKryoSerialization() throws IOException {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,30 @@
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a serializable table with a known size estimate. Spark calls its
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
* providing a known size estimate allows that operation to be skipped.
*
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
* garbage collected on the driver. The implementation ensures only resources used by copies of the
* main table are released.
*/
public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation {
public class SerializableTableWithSize extends SerializableTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;

private final transient Object serializationMarker;

protected SerializableTableWithSize(Table table) {
super(table);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -49,6 +61,14 @@ public static Table copyOf(Table table) {
}
}

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
}

public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
implements KnownSizeEstimation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -78,6 +84,44 @@ public void initTable() throws IOException {
this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
}

@Test
public void testCloseSerializableTableKryoSerialization() throws Exception {
Table spyTable = spy(table);
FileIO spyIO = spy(table.io());
when(spyTable.io()).thenReturn(spyIO);

Table serializableTable = SerializableTableWithSize.copyOf(spyTable);

Table serializableTableCopy = spy(KryoHelpers.roundTripSerialize(serializableTable));
FileIO spyFileIOCopy = spy(serializableTableCopy.io());
when(serializableTableCopy.io()).thenReturn(spyFileIOCopy);

((AutoCloseable) serializableTable).close(); // mimics close on the driver
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
}

@Test
public void testCloseSerializableTableJavaSerialization() throws Exception {
Table spyTable = spy(table);
FileIO spyIO = spy(table.io());
when(spyTable.io()).thenReturn(spyIO);

Table serializableTable = SerializableTableWithSize.copyOf(spyTable);

Table serializableTableCopy = spy(TestHelpers.roundTripSerialize(serializableTable));
FileIO spyFileIOCopy = spy(serializableTableCopy.io());
when(serializableTableCopy.io()).thenReturn(spyFileIOCopy);

((AutoCloseable) serializableTable).close(); // mimics close on the driver
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
}

@Test
public void testSerializableTableKryoSerialization() throws IOException {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,30 @@
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a serializable table with a known size estimate. Spark calls its
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
* providing a known size estimate allows that operation to be skipped.
*
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
* garbage collected on the driver. The implementation ensures only resources used by copies of the
* main table are released.
*/
public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation {
public class SerializableTableWithSize extends SerializableTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;

private final transient Object serializationMarker;

protected SerializableTableWithSize(Table table) {
super(table);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -49,6 +61,14 @@ public static Table copyOf(Table table) {
}
}

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
}

public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
implements KnownSizeEstimation {

Expand Down
Loading

0 comments on commit 5a98aef

Please sign in to comment.