Skip to content

Commit b50ddfd

Browse files
haoyuanpwendell
authored andcommitted
SPARK-1305: Support persisting RDD's directly to Tachyon
Move the PR#468 of apache-incubator-spark to the apache-spark "Adding an option to persist Spark RDD blocks into Tachyon." Author: Haoyuan Li <[email protected]> Author: RongGu <[email protected]> Closes #158 from RongGu/master and squashes the following commits: 72b7768 [Haoyuan Li] merge master 9f7fa1b [Haoyuan Li] fix code style ae7834b [Haoyuan Li] minor cleanup a8b3ec6 [Haoyuan Li] merge master branch e0f4891 [Haoyuan Li] better check offheap. 55b5918 [RongGu] address matei's comment on the replication of offHeap storagelevel 7cd4600 [RongGu] remove some logic code for tachyonstore's replication 51149e7 [RongGu] address aaron's comment on returning value of the remove() function in tachyonstore 8adfcfa [RongGu] address arron's comment on inTachyonSize 120e48a [RongGu] changed the root-level dir name in Tachyon 5cc041c [Haoyuan Li] address aaron's comments 9b97935 [Haoyuan Li] address aaron's comments d9a6438 [Haoyuan Li] fix for pspark 77d2703 [Haoyuan Li] change python api.git status 3dcace4 [Haoyuan Li] address matei's comments 91fa09d [Haoyuan Li] address patrick's comments 589eafe [Haoyuan Li] use TRY_CACHE instead of MUST_CACHE 64348b2 [Haoyuan Li] update conf docs. ed73e19 [Haoyuan Li] Merge branch 'master' of github.com:RongGu/spark-1 619a9a8 [RongGu] set number of directories in TachyonStore back to 64; added a TODO tag for duplicated code from the DiskStore be79d77 [RongGu] find a way to clean up some unnecessay metods and classed to make the code simpler 49cc724 [Haoyuan Li] update docs with off_headp option 4572f9f [RongGu] reserving the old apply function API of StorageLevel 04301d3 [RongGu] rename StorageLevel.TACHYON to Storage.OFF_HEAP c9aeabf [RongGu] rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP 76805aa [RongGu] unifies the config properties name prefix; add the configs into docs/configuration.md e700d9c [RongGu] add the SparkTachyonHdfsLR example and some comments fd84156 [RongGu] use randomUUID to generate sparkapp directory name on tachyon;minor code style fix 939e467 [Haoyuan Li] 0.4.1-thrift from maven central 86a2eab [Haoyuan Li] tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to download it. temporarily revert it back to 0.4.1 16c5798 [RongGu] make the dependency on tachyon as tachyon-0.4.1-thrift eacb2e8 [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 bbeb4de [RongGu] fix the JsonProtocolSuite test failure problem 6adb58f [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 d827250 [RongGu] fix JsonProtocolSuie test failure 716e93b [Haoyuan Li] revert the version ca14469 [Haoyuan Li] bump tachyon version to 0.4.1-thrift 2825a13 [RongGu] up-merging to the current master branch of the apache spark 6a22c1a [Haoyuan Li] fix scalastyle 8968b67 [Haoyuan Li] exclude more libraries from tachyon dependency to be the same as referencing tachyon-client. 77be7e8 [RongGu] address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice. 1dcadf9 [Haoyuan Li] typo bf278fa [Haoyuan Li] fix python tests e82909c [Haoyuan Li] minor cleanup 776a56c [Haoyuan Li] address patrick's and ali's comments from the previous PR 8859371 [Haoyuan Li] various minor fixes and clean up e3ddbba [Haoyuan Li] add doc to use Tachyon cache mode. fcaeab2 [Haoyuan Li] address Aaron's comment e554b1e [Haoyuan Li] add python code 47304b3 [Haoyuan Li] make tachyonStore in BlockMananger lazy val; add more comments StorageLevels. dc8ef24 [Haoyuan Li] add old storelevel constructor e01a271 [Haoyuan Li] update tachyon 0.4.1 8011a96 [RongGu] fix a brought-in mistake in StorageLevel 70ca182 [RongGu] a bit change in comment 556978b [RongGu] fix the scalastyle errors 791189b [RongGu] "Adding an option to persist Spark RDD blocks into Tachyon." move the PR#468 of apache-incubator-spark to the apache-spark
1 parent 1347ebd commit b50ddfd

29 files changed

+976
-169
lines changed

core/pom.xml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,53 @@
200200
<artifactId>derby</artifactId>
201201
<scope>test</scope>
202202
</dependency>
203+
<dependency>
204+
<groupId>org.tachyonproject</groupId>
205+
<artifactId>tachyon</artifactId>
206+
<version>0.4.1-thrift</version>
207+
<exclusions>
208+
<exclusion>
209+
<groupId>org.apache.hadoop</groupId>
210+
<artifactId>hadoop-client</artifactId>
211+
</exclusion>
212+
<exclusion>
213+
<groupId>org.apache.curator</groupId>
214+
<artifactId>curator-recipes</artifactId>
215+
</exclusion>
216+
<exclusion>
217+
<groupId>org.eclipse.jetty</groupId>
218+
<artifactId>jetty-jsp</artifactId>
219+
</exclusion>
220+
<exclusion>
221+
<groupId>org.eclipse.jetty</groupId>
222+
<artifactId>jetty-webapp</artifactId>
223+
</exclusion>
224+
<exclusion>
225+
<groupId>org.eclipse.jetty</groupId>
226+
<artifactId>jetty-server</artifactId>
227+
</exclusion>
228+
<exclusion>
229+
<groupId>org.eclipse.jetty</groupId>
230+
<artifactId>jetty-servlet</artifactId>
231+
</exclusion>
232+
<exclusion>
233+
<groupId>junit</groupId>
234+
<artifactId>junit</artifactId>
235+
</exclusion>
236+
<exclusion>
237+
<groupId>org.powermock</groupId>
238+
<artifactId>powermock-module-junit4</artifactId>
239+
</exclusion>
240+
<exclusion>
241+
<groupId>org.powermock</groupId>
242+
<artifactId>powermock-api-mockito</artifactId>
243+
</exclusion>
244+
<exclusion>
245+
<groupId>org.apache.curator</groupId>
246+
<artifactId>curator-test</artifactId>
247+
</exclusion>
248+
</exclusions>
249+
</dependency>
203250
<dependency>
204251
<groupId>org.scalatest</groupId>
205252
<artifactId>scalatest_${scala.binary.version}</artifactId>

core/src/main/java/org/apache/spark/api/java/StorageLevels.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,18 @@
2323
* Expose some commonly useful storage level constants.
2424
*/
2525
public class StorageLevels {
26-
public static final StorageLevel NONE = create(false, false, false, 1);
27-
public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
28-
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
29-
public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
30-
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
31-
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
32-
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
33-
public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
34-
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
35-
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
36-
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
26+
public static final StorageLevel NONE = create(false, false, false, false, 1);
27+
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
28+
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
29+
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
30+
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
31+
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
32+
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
33+
public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
34+
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
35+
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
36+
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
37+
public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);
3738

3839
/**
3940
* Create a new StorageLevel object.
@@ -42,7 +43,26 @@ public class StorageLevels {
4243
* @param deserialized saved as deserialized objects, if true
4344
* @param replication replication factor
4445
*/
45-
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
46-
return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
46+
@Deprecated
47+
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
48+
int replication) {
49+
return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
50+
}
51+
52+
/**
53+
* Create a new StorageLevel object.
54+
* @param useDisk saved to disk, if true
55+
* @param useMemory saved to memory, if true
56+
* @param useOffHeap saved to Tachyon, if true
57+
* @param deserialized saved as deserialized objects, if true
58+
* @param replication replication factor
59+
*/
60+
public static StorageLevel create(
61+
boolean useDisk,
62+
boolean useMemory,
63+
boolean useOffHeap,
64+
boolean deserialized,
65+
int replication) {
66+
return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
4767
}
4868
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ package org.apache.spark
1919

2020
import java.io._
2121
import java.net.URI
22-
import java.util.{Properties, UUID}
2322
import java.util.concurrent.atomic.AtomicInteger
24-
23+
import java.util.{Properties, UUID}
24+
import java.util.UUID.randomUUID
2525
import scala.collection.{Map, Set}
2626
import scala.collection.generic.Growable
2727
import scala.collection.mutable.{ArrayBuffer, HashMap}
2828
import scala.reflect.{ClassTag, classTag}
29-
3029
import org.apache.hadoop.conf.Configuration
3130
import org.apache.hadoop.fs.Path
3231
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
@@ -130,6 +129,11 @@ class SparkContext(
130129
val master = conf.get("spark.master")
131130
val appName = conf.get("spark.app.name")
132131

132+
// Generate the random name for a temp folder in Tachyon
133+
// Add a timestamp as the suffix here to make it more safe
134+
val tachyonFolderName = "spark-" + randomUUID.toString()
135+
conf.set("spark.tachyonStore.folderName", tachyonFolderName)
136+
133137
val isLocal = (master == "local" || master.startsWith("local["))
134138

135139
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend(
5353
case RegisteredExecutor(sparkProperties) =>
5454
logInfo("Successfully registered with driver")
5555
// Make this host instead of hostPort ?
56-
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
56+
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
57+
false)
5758

5859
case RegisterExecutorFailed(message) =>
5960
logError("Slave registration failed: " + message)
@@ -105,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend {
105106
// set it
106107
val sparkHostPort = hostname + ":" + boundPort
107108
actorSystem.actorOf(
108-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
109+
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
110+
sparkHostPort, cores),
109111
name = "Executor")
110112
workerUrl.foreach{ url =>
111113
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,22 @@ object ExecutorExitCode {
4141
/** DiskStore failed to create a local temporary directory after many attempts. */
4242
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
4343

44+
/** TachyonStore failed to initialize after many attempts. */
45+
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
46+
47+
/** TachyonStore failed to create a local temporary directory after many attempts. */
48+
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
49+
4450
def explainExitCode(exitCode: Int): String = {
4551
exitCode match {
4652
case UNCAUGHT_EXCEPTION => "Uncaught exception"
4753
case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
4854
case OOM => "OutOfMemoryError"
4955
case DISK_STORE_FAILED_TO_CREATE_DIR =>
5056
"Failed to create local directory (bad spark.local.dir?)"
57+
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
58+
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
59+
"TachyonStore failed to create a local temporary directory."
5160
case _ =>
5261
"Unknown executor exit code (" + exitCode + ")" + (
5362
if (exitCode > 128) {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,20 @@ package org.apache.spark.storage
1919

2020
import java.io.{File, InputStream, OutputStream}
2121
import java.nio.{ByteBuffer, MappedByteBuffer}
22-
2322
import scala.collection.mutable.{ArrayBuffer, HashMap}
2423
import scala.concurrent.{Await, Future}
2524
import scala.concurrent.duration._
2625
import scala.util.Random
27-
2826
import akka.actor.{ActorSystem, Cancellable, Props}
2927
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
3028
import sun.nio.ch.DirectBuffer
31-
3229
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
3330
import org.apache.spark.io.CompressionCodec
3431
import org.apache.spark.network._
3532
import org.apache.spark.serializer.Serializer
3633
import org.apache.spark.util._
3734

35+
3836
sealed trait Values
3937

4038
case class ByteBufferValues(buffer: ByteBuffer) extends Values
@@ -59,6 +57,17 @@ private[spark] class BlockManager(
5957

6058
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
6159
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
60+
var tachyonInitialized = false
61+
private[storage] lazy val tachyonStore: TachyonStore = {
62+
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
63+
val appFolderName = conf.get("spark.tachyonStore.folderName")
64+
val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
65+
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
66+
val tachyonBlockManager = new TachyonBlockManager(
67+
shuffleBlockManager, tachyonStorePath, tachyonMaster)
68+
tachyonInitialized = true
69+
new TachyonStore(this, tachyonBlockManager)
70+
}
6271

6372
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
6473
private val nettyPort: Int = {
@@ -248,8 +257,10 @@ private[spark] class BlockManager(
248257
if (info.tellMaster) {
249258
val storageLevel = status.storageLevel
250259
val inMemSize = Math.max(status.memSize, droppedMemorySize)
260+
val inTachyonSize = status.tachyonSize
251261
val onDiskSize = status.diskSize
252-
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
262+
master.updateBlockInfo(
263+
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
253264
} else true
254265
}
255266

@@ -259,22 +270,24 @@ private[spark] class BlockManager(
259270
* and the updated in-memory and on-disk sizes.
260271
*/
261272
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
262-
val (newLevel, inMemSize, onDiskSize) = info.synchronized {
273+
val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized {
263274
info.level match {
264275
case null =>
265-
(StorageLevel.NONE, 0L, 0L)
276+
(StorageLevel.NONE, 0L, 0L, 0L)
266277
case level =>
267278
val inMem = level.useMemory && memoryStore.contains(blockId)
279+
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
268280
val onDisk = level.useDisk && diskStore.contains(blockId)
269281
val deserialized = if (inMem) level.deserialized else false
270-
val replication = if (inMem || onDisk) level.replication else 1
271-
val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication)
282+
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
283+
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
272284
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
285+
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
273286
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
274-
(storageLevel, memSize, diskSize)
287+
(storageLevel, memSize, diskSize, tachyonSize)
275288
}
276289
}
277-
BlockStatus(newLevel, inMemSize, onDiskSize)
290+
BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize)
278291
}
279292

280293
/**
@@ -354,6 +367,24 @@ private[spark] class BlockManager(
354367
logDebug("Block " + blockId + " not found in memory")
355368
}
356369
}
370+
371+
// Look for the block in Tachyon
372+
if (level.useOffHeap) {
373+
logDebug("Getting block " + blockId + " from tachyon")
374+
if (tachyonStore.contains(blockId)) {
375+
tachyonStore.getBytes(blockId) match {
376+
case Some(bytes) => {
377+
if (!asValues) {
378+
return Some(bytes)
379+
} else {
380+
return Some(dataDeserialize(blockId, bytes))
381+
}
382+
}
383+
case None =>
384+
logDebug("Block " + blockId + " not found in tachyon")
385+
}
386+
}
387+
}
357388

358389
// Look for block on disk, potentially storing it back into memory if required:
359390
if (level.useDisk) {
@@ -620,6 +651,23 @@ private[spark] class BlockManager(
620651
}
621652
// Keep track of which blocks are dropped from memory
622653
res.droppedBlocks.foreach { block => updatedBlocks += block }
654+
} else if (level.useOffHeap) {
655+
// Save to Tachyon.
656+
val res = data match {
657+
case IteratorValues(iterator) =>
658+
tachyonStore.putValues(blockId, iterator, level, false)
659+
case ArrayBufferValues(array) =>
660+
tachyonStore.putValues(blockId, array, level, false)
661+
case ByteBufferValues(bytes) => {
662+
bytes.rewind();
663+
tachyonStore.putBytes(blockId, bytes, level)
664+
}
665+
}
666+
size = res.size
667+
res.data match {
668+
case Right(newBytes) => bytesAfterPut = newBytes
669+
case _ =>
670+
}
623671
} else {
624672
// Save directly to disk.
625673
// Don't get back the bytes unless we replicate them.
@@ -644,8 +692,8 @@ private[spark] class BlockManager(
644692

645693
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
646694
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
647-
// Now that the block is in either the memory or disk store, let other threads read it,
648-
// and tell the master about it.
695+
// Now that the block is in either the memory, tachyon, or disk store,
696+
// let other threads read it, and tell the master about it.
649697
marked = true
650698
putBlockInfo.markReady(size)
651699
if (tellMaster) {
@@ -707,7 +755,8 @@ private[spark] class BlockManager(
707755
*/
708756
var cachedPeers: Seq[BlockManagerId] = null
709757
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
710-
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
758+
val tLevel = StorageLevel(
759+
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
711760
if (cachedPeers == null) {
712761
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
713762
}
@@ -832,9 +881,10 @@ private[spark] class BlockManager(
832881
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
833882
val removedFromMemory = memoryStore.remove(blockId)
834883
val removedFromDisk = diskStore.remove(blockId)
835-
if (!removedFromMemory && !removedFromDisk) {
884+
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
885+
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
836886
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
837-
"the disk or memory store")
887+
"the disk, memory, or tachyon store")
838888
}
839889
blockInfo.remove(blockId)
840890
if (tellMaster && info.tellMaster) {
@@ -871,6 +921,9 @@ private[spark] class BlockManager(
871921
if (level.useDisk) {
872922
diskStore.remove(id)
873923
}
924+
if (level.useOffHeap) {
925+
tachyonStore.remove(id)
926+
}
874927
iterator.remove()
875928
logInfo("Dropped block " + id)
876929
}
@@ -946,6 +999,9 @@ private[spark] class BlockManager(
946999
blockInfo.clear()
9471000
memoryStore.clear()
9481001
diskStore.clear()
1002+
if (tachyonInitialized) {
1003+
tachyonStore.clear()
1004+
}
9491005
metadataCleaner.cancel()
9501006
broadcastCleaner.cancel()
9511007
logInfo("BlockManager stopped")

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
6363
blockId: BlockId,
6464
storageLevel: StorageLevel,
6565
memSize: Long,
66-
diskSize: Long): Boolean = {
66+
diskSize: Long,
67+
tachyonSize: Long): Boolean = {
6768
val res = askDriverWithReply[Boolean](
68-
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
69+
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
6970
logInfo("Updated info of block " + blockId)
7071
res
7172
}

0 commit comments

Comments
 (0)