1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .spark .storage
19+
20+ import java .util .concurrent .ConcurrentHashMap
21+
22+ private [storage] trait BlockInfo {
23+ def level : StorageLevel
24+ def tellMaster : Boolean
25+ // To save space, 'pending' and 'failed' are encoded as special sizes:
26+ @ volatile var size : Long = BlockInfo .BLOCK_PENDING
27+ private def pending : Boolean = size == BlockInfo .BLOCK_PENDING
28+ private def failed : Boolean = size == BlockInfo .BLOCK_FAILED
29+ private def initThread : Thread = BlockInfo .blockInfoInitThreads.get(this )
30+
31+ setInitThread()
32+
33+ private def setInitThread () {
34+ // Set current thread as init thread - waitForReady will not block this thread
35+ // (in case there is non trivial initialization which ends up calling waitForReady as part of
36+ // initialization itself)
37+ BlockInfo .blockInfoInitThreads.put(this , Thread .currentThread())
38+ }
39+
40+ /**
41+ * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
42+ * Return true if the block is available, false otherwise.
43+ */
44+ def waitForReady (): Boolean = {
45+ if (pending && initThread != Thread .currentThread()) {
46+ synchronized {
47+ while (pending) this .wait()
48+ }
49+ }
50+ ! failed
51+ }
52+
53+ /** Mark this BlockInfo as ready (i.e. block is finished writing) */
54+ def markReady (sizeInBytes : Long ) {
55+ require (sizeInBytes >= 0 , " sizeInBytes was negative: " + sizeInBytes)
56+ assert (pending)
57+ size = sizeInBytes
58+ BlockInfo .blockInfoInitThreads.remove(this )
59+ synchronized {
60+ this .notifyAll()
61+ }
62+ }
63+
64+ /** Mark this BlockInfo as ready but failed */
65+ def markFailure () {
66+ assert (pending)
67+ size = BlockInfo .BLOCK_FAILED
68+ BlockInfo .blockInfoInitThreads.remove(this )
69+ synchronized {
70+ this .notifyAll()
71+ }
72+ }
73+ }
74+
75+ private object BlockInfo {
76+ // initThread is logically a BlockInfo field, but we store it here because
77+ // it's only needed while this block is in the 'pending' state and we want
78+ // to minimize BlockInfo's memory footprint.
79+ private val blockInfoInitThreads = new ConcurrentHashMap [BlockInfo , Thread ]
80+
81+ private val BLOCK_PENDING : Long = - 1L
82+ private val BLOCK_FAILED : Long = - 2L
83+ }
84+
85+ // All shuffle blocks have the same `level` and `tellMaster` properties,
86+ // so we can save space by not storing them in each instance:
87+ private [storage] class ShuffleBlockInfo extends BlockInfo {
88+ // These need to be defined using 'def' instead of 'val' in order for
89+ // the compiler to eliminate the fields:
90+ def level : StorageLevel = StorageLevel .DISK_ONLY
91+ def tellMaster : Boolean = false
92+ }
93+
94+ private [storage] class BlockInfoImpl (val level : StorageLevel , val tellMaster : Boolean )
95+ extends BlockInfo {
96+ // Intentionally left blank
97+ }
0 commit comments