4242import java .io .UncheckedIOException ;
4343import java .nio .file .Files ;
4444import java .nio .file .Path ;
45+ import java .util .ArrayList ;
4546import java .util .Iterator ;
4647import java .util .List ;
4748import java .util .Optional ;
4849import java .util .concurrent .atomic .AtomicBoolean ;
4950import java .util .concurrent .atomic .AtomicLong ;
5051
5152import static com .google .common .base .Preconditions .checkState ;
53+ import static com .google .common .collect .Iterators .transform ;
5254import static com .google .common .util .concurrent .Futures .immediateFuture ;
5355import static io .trino .spi .StandardErrorCode .GENERIC_INTERNAL_ERROR ;
5456import static io .trino .spiller .FileSingleStreamSpillerFactory .SPILL_FILE_PREFIX ;
@@ -63,7 +65,9 @@ public class FileSingleStreamSpiller
6365 @ VisibleForTesting
6466 static final int BUFFER_SIZE = 4 * 1024 ;
6567
66- private final FileHolder targetFile ;
68+ private final List <FileHolder > targetFiles ;
69+ private volatile int currentFileIndex ;
70+
6771 private final Closer closer = Closer .create ();
6872 private final PagesSerdeFactory serdeFactory ;
6973 private volatile Optional <SecretKey > encryptionKey ;
@@ -84,7 +88,7 @@ public FileSingleStreamSpiller(
8488 PagesSerdeFactory serdeFactory ,
8589 Optional <SecretKey > encryptionKey ,
8690 ListeningExecutorService executor ,
87- Path spillPath ,
91+ List < Path > spillPaths ,
8892 SpillerStats spillerStats ,
8993 SpillContext spillContext ,
9094 LocalMemoryContext memoryContext ,
@@ -109,8 +113,14 @@ public FileSingleStreamSpiller(
109113 // middle of execution when close() is called (note that this applies to both readPages() and writePages() methods).
110114 this .memoryContext .setBytes (BUFFER_SIZE );
111115 this .fileSystemErrorHandler = requireNonNull (fileSystemErrorHandler , "filesystemErrorHandler is null" );
116+ requireNonNull (spillPaths , "spillPaths is null" );
117+ checkState (!spillPaths .isEmpty (), "spillPaths is empty" );
112118 try {
113- this .targetFile = closer .register (new FileHolder (Files .createTempFile (spillPath , SPILL_FILE_PREFIX , SPILL_FILE_SUFFIX )));
119+ ImmutableList .Builder <FileHolder > builder = ImmutableList .builder ();
120+ for (Path path : spillPaths ) {
121+ builder .add (closer .register (new FileHolder (Files .createTempFile (path , SPILL_FILE_PREFIX , SPILL_FILE_SUFFIX ))));
122+ }
123+ this .targetFiles = builder .build ();
114124 }
115125 catch (IOException e ) {
116126 this .fileSystemErrorHandler .run ();
@@ -137,61 +147,129 @@ public long getSpilledPagesInMemorySize()
137147 public Iterator <Page > getSpilledPages ()
138148 {
139149 checkNoSpillInProgress ();
140- return readPages ();
150+ checkState (writable .getAndSet (false ), "Repeated reads are disallowed to prevent potential resource leaks" );
151+
152+ try {
153+ Optional <SecretKey > encryptionKey = this .encryptionKey ;
154+ checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
155+ PageDeserializer deserializer = serdeFactory .createDeserializer (encryptionKey );
156+ this .encryptionKey = Optional .empty ();
157+
158+ int fileCount = targetFiles .size ();
159+ List <Iterator <Page >> iterators = new ArrayList <>(targetFiles .size ());
160+ for (FileHolder file : targetFiles ) {
161+ iterators .add (readFilePages (deserializer , file , closer ));
162+ }
163+
164+ return new AbstractIterator <>()
165+ {
166+ int fileIndex ;
167+
168+ @ Override
169+ protected Page computeNext ()
170+ {
171+ Iterator <Page > iterator = iterators .get (fileIndex );
172+ if (!iterator .hasNext ()) {
173+ return endOfData ();
174+ }
175+
176+ Page page = iterator .next ();
177+ fileIndex = (fileIndex + 1 ) % fileCount ;
178+ return page ;
179+ }
180+ };
181+ }
182+ catch (IOException e ) {
183+ fileSystemErrorHandler .run ();
184+ throw new TrinoException (GENERIC_INTERNAL_ERROR , "Failed to read spilled pages" , e );
185+ }
141186 }
142187
143188 @ Override
144189 public ListenableFuture <List <Page >> getAllSpilledPages ()
145190 {
146- return executor .submit (() -> ImmutableList .copyOf (getSpilledPages ()));
191+ checkNoSpillInProgress ();
192+ checkState (writable .getAndSet (false ), "Repeated reads are disallowed to prevent potential resource leaks" );
193+
194+ Optional <SecretKey > encryptionKey = this .encryptionKey ;
195+ checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
196+ this .encryptionKey = Optional .empty ();
197+
198+ List <ListenableFuture <List <Page >>> futures = new ArrayList <>();
199+ for (FileHolder file : targetFiles ) {
200+ futures .add (executor .submit (() -> {
201+ PageDeserializer deserializer = serdeFactory .createDeserializer (encryptionKey );
202+ ImmutableList .Builder <Page > pages = ImmutableList .builder ();
203+ try (Closer closer = Closer .create ()) {
204+ readFilePages (deserializer , file , closer ).forEachRemaining (pages ::add );
205+ }
206+ return pages .build ();
207+ }));
208+ }
209+
210+ // Combine pages from all spill files according to the round-robin order.
211+ return Futures .transform (Futures .allAsList (futures ), pagesPerFile -> {
212+ ImmutableList .Builder <Page > builder = ImmutableList .builder ();
213+ int fileCount = targetFiles .size ();
214+
215+ List <Iterator <Page >> iterators = new ArrayList <>(fileCount );
216+ for (List <Page > pages : pagesPerFile ) {
217+ iterators .add (pages .iterator ());
218+ }
219+
220+ int fileIndex = 0 ;
221+ while (true ) {
222+ Iterator <Page > iterator = iterators .get (fileIndex );
223+ if (!iterator .hasNext ()) {
224+ break ;
225+ }
226+ builder .add (iterator .next ());
227+ fileIndex = (fileIndex + 1 ) % fileCount ;
228+ }
229+ return builder .build ();
230+ }, executor );
147231 }
148232
149- private DataSize writePages (Iterator <Page > pageIterator )
233+ private DataSize writePages (Iterator <Page > pages )
150234 {
151235 checkState (writable .get (), "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent" );
152236
153237 Optional <SecretKey > encryptionKey = this .encryptionKey ;
154238 checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
155239 PageSerializer serializer = serdeFactory .createSerializer (encryptionKey );
240+
156241 long spilledPagesBytes = 0 ;
157- try (SliceOutput output = new OutputStreamSliceOutput (targetFile .newOutputStream (APPEND ), BUFFER_SIZE )) {
158- while (pageIterator .hasNext ()) {
159- Page page = pageIterator .next ();
242+ int fileIndex = currentFileIndex ;
243+ int fileCount = targetFiles .size ();
244+
245+ try {
246+ while (pages .hasNext ()) {
247+ Page page = pages .next ();
160248 long pageSizeInBytes = page .getSizeInBytes ();
249+ Slice serialized = serializer .serialize (page );
250+ long serializedPageSize = serialized .length ();
251+
252+ try (SliceOutput out = newSliceOutput (fileIndex )) {
253+ out .writeBytes (serialized );
254+ }
255+
161256 spilledPagesBytes += pageSizeInBytes ;
257+
162258 spilledPagesInMemorySize .addAndGet (pageSizeInBytes );
163- Slice serializedPage = serializer .serialize (page );
164- long pageSize = serializedPage .length ();
165- localSpillContext .updateBytes (pageSize );
166- spillerStats .addToTotalSpilledBytes (pageSize );
167- output .writeBytes (serializedPage );
259+ localSpillContext .updateBytes (serializedPageSize );
260+ spillerStats .addToTotalSpilledBytes (serializedPageSize );
261+
262+ fileIndex = (fileIndex + 1 ) % fileCount ;
168263 }
264+
265+ currentFileIndex = fileIndex ;
169266 }
170267 catch (UncheckedIOException | IOException e ) {
171268 fileSystemErrorHandler .run ();
172269 throw new TrinoException (GENERIC_INTERNAL_ERROR , "Failed to spill pages" , e );
173270 }
174- return DataSize .ofBytes (spilledPagesBytes );
175- }
176271
177- private Iterator <Page > readPages ()
178- {
179- checkState (writable .getAndSet (false ), "Repeated reads are disallowed to prevent potential resource leaks" );
180-
181- try {
182- Optional <SecretKey > encryptionKey = this .encryptionKey ;
183- checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
184- PageDeserializer deserializer = serdeFactory .createDeserializer (encryptionKey );
185- // encryption key is safe to discard since it now belongs to the PageDeserializer and repeated reads are disallowed
186- this .encryptionKey = Optional .empty ();
187- InputStream input = closer .register (targetFile .newInputStream ());
188- Iterator <Page > pages = PagesSerdeUtil .readPages (deserializer , input );
189- return closeWhenExhausted (pages , input );
190- }
191- catch (IOException e ) {
192- fileSystemErrorHandler .run ();
193- throw new TrinoException (GENERIC_INTERNAL_ERROR , "Failed to read spilled pages" , e );
194- }
272+ return DataSize .ofBytes (spilledPagesBytes );
195273 }
196274
197275 @ Override
@@ -215,6 +293,23 @@ private void checkNoSpillInProgress()
215293 checkState (spillInProgress .isDone (), "spill in progress" );
216294 }
217295
296+ private SliceOutput newSliceOutput (int fileIndex )
297+ throws IOException
298+ {
299+ return new OutputStreamSliceOutput (targetFiles .get (fileIndex ).newOutputStream (APPEND ), BUFFER_SIZE );
300+ }
301+
302+ /**
303+ * Returns an iterator that exposes all pages stored in the given file.
304+ * Pages are lazily deserialized as the iterator is consumed.
305+ */
306+ private Iterator <Page > readFilePages (PageDeserializer deserializer , FileHolder file , Closer closer )
307+ throws IOException
308+ {
309+ InputStream input = closer .register (file .newInputStream ());
310+ return transform (closeWhenExhausted (PagesSerdeUtil .readSerializedPages (input ), input ), deserializer ::deserialize );
311+ }
312+
218313 private static <T > Iterator <T > closeWhenExhausted (Iterator <T > iterator , Closeable resource )
219314 {
220315 requireNonNull (iterator , "iterator is null" );
0 commit comments