Skip to content

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Aug 24, 2022

What changes were proposed in this pull request?

This is a followup of #36200, the main change of this pr as follows:

  • Make DB and DBProvider as @Private to make the API intent clearer
  • Delete remove method from DBProvider witch is unnecessary @Override
  • Remove the useless null check condition from ExternalShuffleBlockResolver and RemoteBlockPushResolver, fix related sutes
  • Correction log print content

Why are the changes needed?

Fix new comments after #36200 merged.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Pass All GitHub Actions

@LuciferYang LuciferYang marked this pull request as draft August 24, 2022 13:23
@LuciferYang LuciferYang changed the title [SPARK-38909][CORE][YARN][FOLLOWUP] Name later [WIP][SPARK-38909][CORE][YARN][FOLLOWUP] Name later Aug 24, 2022
@LuciferYang
Copy link
Contributor Author

cc @Ngone51 this pr used to fix the comments you left in #36200,

@LuciferYang LuciferYang changed the title [WIP][SPARK-38909][CORE][YARN][FOLLOWUP] Name later [WIP][SPARK-38909][CORE][YARN][FOLLOWUP] Aug 24, 2022
@github-actions github-actions bot added the BUILD label Aug 25, 2022
@LuciferYang LuciferYang changed the title [WIP][SPARK-38909][CORE][YARN][FOLLOWUP] [WIP][SPARK-38909][BUILD][CORE][YARN][FOLLOWUP] Aug 25, 2022
.build(indexCacheLoader);
DBBackend dbBackend = null;
DBBackend dbBackend;
if (registeredExecutorFile != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this condition? I feel like it's useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... registeredExecutorFile may be null, for example, the following code path:

protected def newShuffleBlockHandler(conf: TransportConf): ExternalBlockHandler = {
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND)
val dBBackend = DBBackend.byName(shuffleDBName)
logInfo(s"Configured ${config.SHUFFLE_SERVICE_DB_BACKEND.key} as $shuffleDBName " +
s"and actually used value ${dBBackend.name()} ")
new ExternalBlockHandler(conf,
findRegisteredExecutorsDBFile(dBBackend.fileName(registeredExecutorsDB)))
} else {
new ExternalBlockHandler(conf, null)
}
}

public ExternalBlockHandler(TransportConf conf, File registeredExecutorFile)
throws IOException {
this(new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, registeredExecutorFile),
new NoOpMergedShuffleFileManager(conf, null));
}

However, without adding this condition, the result will not change

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT And the log will look strange without this condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e3500cf remove the condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait CI

String dbBackendName =
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name());
DBBackend dbBackend = DBBackend.byName(dbBackendName);
db = DBProvider.initDB(dbBackend, this.registeredExecutorFile, CURRENT_VERSION, mapper);
Copy link
Contributor

@mridulm mridulm Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review note: Recovery need not be enabled for node managers - in which case registeredExecutorFile will be null (in addition to tests).

DBProvider.initDB does handle null input though.

So the main change in this file and RemoteBlockPushResolver is moving the log message into if (db != null)

@mridulm
Copy link
Contributor

mridulm commented Aug 26, 2022

Is this still WIP @LuciferYang ?

@LuciferYang LuciferYang changed the title [WIP][SPARK-38909][BUILD][CORE][YARN][FOLLOWUP] [WIP][SPARK-38909][BUILD][CORE][YARN][FOLLOWUP] Make some code cleanup related to shuffle state db Aug 27, 2022
@LuciferYang LuciferYang marked this pull request as ready for review August 27, 2022 02:05
@LuciferYang LuciferYang changed the title [WIP][SPARK-38909][BUILD][CORE][YARN][FOLLOWUP] Make some code cleanup related to shuffle state db [SPARK-38909][BUILD][CORE][YARN][FOLLOWUP] Make some code cleanup related to shuffle state db Aug 27, 2022
@LuciferYang
Copy link
Contributor Author

Removed [WIP], and need @Ngone51 confirm whether there are new comments of #36200.

And I have a point to discuss:

There are two initLevelDB method in LevelDBProvider, one of them is only for testing code and only used by org.apache.spark.network.shuffle.ShuffleTestAccessor now, so should we move the test only initLevelDB method to ShuffleTestAccessor? The implementation of RocksDB will also encounter this issue

@LuciferYang
Copy link
Contributor Author

Removed [WIP], and need @Ngone51 confirm whether there are new comments of #36200.

And I have a point to discuss:

There are two initLevelDB method in LevelDBProvider, one of them is only for testing code and only used by org.apache.spark.network.shuffle.ShuffleTestAccessor now, so should we move the test only initLevelDB method to ShuffleTestAccessor? The implementation of RocksDB will also encounter this issue

@Ngone51 @mridulm 7a22aeb Move initDB method used only by test back to ShuffleTestAccessor

@LuciferYang
Copy link
Contributor Author

Do you have time to further review this one? Thanks @mridulm

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of comments - but since this is to mostly address @Ngone51's feedback; would be good if you can review it !

}
} else {
null
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will end up duplicating logic between DBProvider and ShuffleTestAccessor - let us consolidate it in DBProvider.
This also impacts the integration test anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let me revert this change first and then see if there is a better way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do some check, I think we can directly use another initDB and delete this one.

import org.apache.spark.network.yarn.util.HadoopConfigProvider
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.util.Utils

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@LuciferYang
Copy link
Contributor Author

should we merge this one ? @Ngone51 @mridulm

version: StoreVersion,
mapper: ObjectMapper)
: ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
val db = DBProvider.initDB(dbBackend, file, version, mapper)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change for test, right (this ?)
Essentially, YarnShuffleIntegrationSuite would expect it to fail with an exception and not do error handling (which initDB does).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }.
collect().toSet
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet)
result = "success"
// only one process can open a leveldb file at a time, so we copy the files
if (registeredExecFile != null && execStateCopy != null) {
val dbBackendName = conf.get(SHUFFLE_SERVICE_DB_BACKEND.key)
val dbBackend = DBBackend.byName(dbBackendName)
logWarning(s"Configured ${SHUFFLE_SERVICE_DB_BACKEND.key} as $dbBackendName " +
s"and actually used value ${dbBackend.name()}")
FileUtils.copyDirectory(registeredExecFile, execStateCopy)
assert(!ShuffleTestAccessor
.reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty)
}

Do you mean the behavior change is execStateCopy exists but open failed? The original method will throw an exception directly, and the new method will return a new instance? It seems that the case just to test whether an existing execStateCopy can be loaded again?

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the original case does not test the execStateCopy exists but open failed scenario, it seems that there is no change.

For the initDB(dbBackend, file) method:

@VisibleForTesting
static DB initLevelDB(File file) throws IOException {
Options options = new Options();
options.createIfMissing(true);
JniDBFactory factory = new JniDBFactory();
return factory.open(file, options);
}

  • if execStateCopy exists, it will re-open it and load data from execStateCopy
  • else if execStateCopy not exists, it will open a new db and return empty data due to options.createIfMissing(true), then assert failed

For the initDB(dbBackend, dbFile, version, mapper) method:

  • if execStateCopy exists, it will re-open it and load data from execStateCopy
  • else if execStateCopy not exists, it will open a new db and return empty data, then assert failed

For execStateCopy exists but open failed scenario, use initDB(dbBackend, file) will throw an unhandled exception, use initDB(dbBackend, dbFile, version, mapper) will assert failed. Is this unacceptable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito or @tgravescs should have more context about this, and comment better - looking at the test, I would expect this test code path to additionally test if the DB is missing/invalid/etc and fail in case there is any issues (due to lack of error handling/fallback in LevelDBProvider.initLevelDB) - which changes with in PR, that no longer happens; right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm add SPARK-40364 to tracking this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will give a separately pr to continue to explore the feasibility about this change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry didn't get a chance to look before your filed the other issue, happy to look if you make the change under the other issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tgravescs , I will give another pr later ~

@LuciferYang
Copy link
Contributor Author

Run / Base image build failed as follows:

2022-09-07T04:35:50.8451091Z #32 exporting to image
2022-09-07T04:35:50.8451363Z #32 exporting layers done
2022-09-07T04:35:50.8452103Z #32 exporting manifest sha256:1738df8ea1b7b1d839a5fdaa47f4a3d414d44c331688eddf60df5e9529e1fad5 done
2022-09-07T04:35:50.8452943Z #32 exporting config sha256:9e4187b41de14caaad1f643e02cb8ac50ec0dbb65d3dea2dfa4397b86addfb1d
2022-09-07T04:35:50.9955812Z #32 exporting config sha256:9e4187b41de14caaad1f643e02cb8ac50ec0dbb65d3dea2dfa4397b86addfb1d done
2022-09-07T04:35:50.9956540Z #32 pushing layers
2022-09-07T04:35:51.2964868Z #32 ...
2022-09-07T04:35:51.2965252Z 
2022-09-07T04:35:51.2966013Z #33 [auth] luciferyang/apache-spark-ci-image:pull,push token for ghcr.io
2022-09-07T04:35:51.2966363Z #33 DONE 0.0s
2022-09-07T04:35:51.4476109Z 
2022-09-07T04:35:51.4479341Z #32 exporting to image
2022-09-07T04:35:51.8931704Z #32 ...
2022-09-07T04:35:51.8932307Z 
2022-09-07T04:35:51.8933413Z #34 [auth] apache/spark/apache-spark-github-action-image-cache:pull luciferyang/apache-spark-ci-image:pull,push token for ghcr.io
2022-09-07T04:35:51.8934022Z #34 DONE 0.0s
2022-09-07T04:35:52.0437234Z 
2022-09-07T04:35:52.0437900Z #32 exporting to image
2022-09-07T04:35:52.4041282Z #32 pushing layers 1.5s done
2022-09-07T04:35:52.4041635Z #32 ERROR: unexpected status: 403 Forbidden
2022-09-07T04:35:52.4042165Z ------
2022-09-07T04:35:52.4042409Z  > exporting to image:
2022-09-07T04:35:52.4042677Z ------
2022-09-07T04:35:52.4081024Z ERROR: failed to solve: unexpected status: 403 Forbidden
2022-09-07T04:35:52.4215414Z ##[error]buildx failed with: ERROR: failed to solve: unexpected status: 403 Forbidden

friendly ping @Yikun for help, I should not have changed GitHub's configuration

@Yikun
Copy link
Member

Yikun commented Sep 7, 2022

pls rebase this PR, after #37815 merged.

Related github incident: https://www.githubstatus.com/incidents/d181frs643d4

@LuciferYang
Copy link
Contributor Author

pls rebase this PR, after #37815 merged.

Thanks ~

@Yikun
Copy link
Member

Yikun commented Sep 7, 2022

You can retrigger the ci, the issue github already fix the https://www.githubstatus.com/incidents/d181frs643d4

@mridulm mridulm closed this in 3566657 Sep 8, 2022
@mridulm
Copy link
Contributor

mridulm commented Sep 8, 2022

Merged to master.
Thanks for working on this @LuciferYang !
Thanks for flagging this issue and the review @Ngone51 :-)

@LuciferYang
Copy link
Contributor Author

Thanks for your review @mridulm @tgravescs ~
Thanks for helping fix GA @Yikun ~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants