Skip to content

Support distribution of broadcast table using disk storage in Presto-on-Spark#15669

Merged
arhimondr merged 1 commit intoprestodb:masterfrom
pgupta2:broadcast_using_checkpoint
Mar 1, 2021
Merged

Support distribution of broadcast table using disk storage in Presto-on-Spark#15669
arhimondr merged 1 commit intoprestodb:masterfrom
pgupta2:broadcast_using_checkpoint

Conversation

@pgupta2
Copy link
Contributor

@pgupta2 pgupta2 commented Feb 3, 2021

== Test plan ==

  • Added unit tests to trigger broadcast join using disk.
  • Ran verifier for around 100 broadcast join queries.

== RELEASE NOTES ==
General Changes

  • Add support for distributing broadcast table using permanent storage, thereby removing spark driver from the distribution flow
    This feature can be enabled/disabled using 'distribute_broadcast_table_using_disk' session property

Hive Changes

  • None

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Feb 3, 2021

CLA Signed

The committers are authorized under a signed CLA.

  • ✅ Arjun Gupta (c477f84eb68e2be06fe5c3b51e99db8b87b946cd, c62a7b323446bcd4af8c9a07392b5c0591cf2e3b, f9d6568e0033836c23df7681ec780cf3c5dc5313, 755d0482f85f3135c4c659eda7995bb1c2956b7c)

@pgupta2 pgupta2 requested a review from arhimondr February 3, 2021 01:54
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be disabled by default.

@pgupta2 pgupta2 force-pushed the broadcast_using_checkpoint branch 2 times, most recently from 5677060 to f77c9d3 Compare February 5, 2021 00:31
Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me, some comments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalTempStorage is expected to be used to store files locally, and generally local files are not expected to be remotely accessible and are not expected to survive the restart.

For broadcast only a remote storage service can be used, and files should survive the restart.

What do you think about adding a getCapabilities method to the StorageService interface that would return a Set<StorageCapabilities>, where capabilities could be an enum, e.g.:

StorageCapabilities {
REMOTELY_ACCESSIBLE,
PERSISTANT_BEETWEEN_RESTARTS,
}

Then the broadcast implementation should check if the configured storage provides required capabilities.

CC: @wenleix @viczhang861

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally the api of this interface used to use URI or Path as an identifier. But then we realized that some storages (e.g.: Manifold) may not have primary keys in a form of a URL. That's why we decided to go with an opaque StorageHandle. But as you have noticed the StorageHandle is not serializable. We thought about adding a serialization methods later (as for the purpose of spilling serialization wasn't needed).

What do you think about adding 2 methods to the storage interface:

byte[] serializeHandle(StorageHandle storageHandle)
StorageHandle deserialize(byte[] serializedStorageHandle)

?

Copy link
Contributor Author

@pgupta2 pgupta2 Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.. The problem with storage handle was that it cannot be serialized. I fixed it by adding a new API that accepts URI which is serializable (under the assumption that that every storage will have URI). Ideal option was to make StorageHandle serializable which will work for any storage implementations

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StorageHandle is just a marker interface. It might be hard to enforce that every implementation is Java serializable. Though I suggest to add 2 explicit methods to the StorageService interface to serialize / deserialize handles.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please back that by a configuration property as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract the RddAndMore into a standalone class, since not it is used not only by the PrestoSparkQueryExecutionFactory

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes pages could be very small. From what I remember the storage implementation is not required to buffer. If that's the case - we should buffer up to some amount here. For PageFile format we buffer up to 24mb by default (that's an optimal block size for tempfs, but it is configurable).

CC: @wenleix @viczhang861

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: UncheckedIOException

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ditto about wrapping

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the broadcastId a String and use a UUID, that is simpler and more reliable. After you use a local temporary file as a cache the requirement for the ID to be a long will no longer be there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broadcastId is being used to cache HT in spark's blockmanager. The blockmanager API requires us to pass a BlockID object which could be of different type. BroadcastBlockId class accepts a long identifier. There is another blockId called TestBlockId that accepts a string. We can use it but its sounded counter-intuitive since it is meant for testing purposes.

We cannot extend BlockId class since its sealed and we cannot add a new class since we depend on OSS spark artifacts. So, either we use what we have now or we can use TestBlockId which will accept string as input.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's wrap remove in a try-catch

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simply do uri.toString().getBytes(UTF_8)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Presto we prefer using airlift.json. You can have a look at JsonMapper. But for the purpose of serialization for this class we can simply use uri.toString()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe storageBasedBroadcastJoinEnabled (also change the config names)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe storageBasedBroadcastJoinWriteBufferSize (also change the config names)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you decide to rename config names please don't forget to rename the session properties to keep the naming consistent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now we have control over caching we need to reserve the memory used by the cache using the Presto memory reservation mechanism. It might be somehow tricky though. Presto offers LocalMemoryContext that is "per operator". There will be several instances of the PrestoSparkRemoteSourceOperator for a single plan node, though the memory for caching should be accounted only once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it would be best to cache already deserialized pages. Though deserialization should be done outside the lock. Only reading pages from the input stream must be done under the lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget to change the config here (to use the broadcast one)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure it is closed if the execution terminates exceptionally (it may perform some cleanups on close)

@pgupta2 pgupta2 force-pushed the broadcast_using_checkpoint branch 2 times, most recently from 3dfc264 to ad52305 Compare February 17, 2021 07:35
Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good. A whole bunch of nits though. We should be good to go once the comments are resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a singleton class. Do we still need to make it static?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. That's a good point. Yeah, we don't have to make it static. Let's make it final though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause the ConcurrentModificationException. Create a copy of the key set before iterating.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would recommend removing this method as it is not very clear what size does it return (e.g.: number of pages, number of cached stages, size in bytes, etc.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClassLayout.parseClass(BroadcastTableCacheKey.class).instanceSize() will only include the size of the BroadcastTableCacheKey object itself, it won't include the size of the stageId and planNodeId objects. Since the memory footprint of the cache keys are rather very small I would recommend simply not count for the keys memory usage for simplicity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be called on every page. I would recommend caching the current retained size of a cache in a private static long variable, and recompute the size in the cache method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop the String.format(, the Airlift logger can do formatting

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableList.builder()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could potentially be immutable or shared, I would recommend creating a copy, and then doing shuffle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the requireNonNull check

@pgupta2 pgupta2 force-pushed the broadcast_using_checkpoint branch from ad52305 to 755d048 Compare February 18, 2021 07:24
Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % comments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a rebase artifact

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for ioException != exception, as self suppression is not allowed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as an afterthought: Although the cache is currently used always from a single thread, since it is a singleton it might be accidentally used from different threads. I would recommend you protecting all the public methods with synchronized to be on the safe side.

@pgupta2 pgupta2 force-pushed the broadcast_using_checkpoint branch 3 times, most recently from 700f802 to 427fd03 Compare February 24, 2021 21:29
Spark provided broadcast variables are not scalable enough to reliably distribute large volumes of data (gigabytes).
Also broadcast variables cause additional memory overhead, as serialized blocks have to be stored in memory for the
torrent algorithm to function.

This implementation uses distributed storage as a medium. It stores the broadcast data into a distributed storage
and then broadcasts only the pointers (usually file names) with a Broadcast variable to let the executors know where
to read the broadcasted data.

This PR doesn't provide a StorageService implementation. Testing is done in a local mode with a local file system as a
local storage.
@pgupta2 pgupta2 force-pushed the broadcast_using_checkpoint branch from 427fd03 to ab26717 Compare February 26, 2021 19:55
@arhimondr arhimondr merged commit bd93326 into prestodb:master Mar 1, 2021
@arhimondr arhimondr mentioned this pull request Mar 12, 2021
13 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants