Gene.bordegaray/2026/01/improve broadcast cache#324
Conversation
…lled then partitions get executed multiple times
…on-cancellable background task
|
I did not have time to make issue upstream @gabotechs , hopefully I have provided enough context for you. Feel free to make issue if you have time, if not I will make it tmrw 😄 |
|
Good catch! Filed an issue and a PR upstream here: |
There was a problem hiding this comment.
I think it would be worth to explore a lock-free solution to this. Do you foresee any reason why using channels for storing the record batches would be worst?
EDIT: just made a coarse attempt at how this would look like, and I think using channels can actually express this with less code while evicting already consumed record batches and without introducing any locking contention: 8c0e5d4
My example does not work though, for some reason it's saying that "Partition X was already executed", which makes me think that there might be a problem somewhere else.
gabotechs
left a comment
There was a problem hiding this comment.
👍 really good first step! I think there are opportunities for making it even better.
I would go straight away and call things as they are here: we are building an unbounded broadcasting queue baked by a Vec + Mutex.
I think if we choose to reorganize the code with different abstractions, like a BroadcastQueue struct with a Mutex + Vec inside, with APIs more or less similar to the ones for producing/consuming channels, we'll end up in a more natural situation and we should be able to do the same thing with less, clearer code.
What do you think about trying something like that?
| version = "57.1.0" | ||
| version = "57.2.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "cb372a7cbcac02a35d3fb7b3fc1f969ec078e871f9bb899bf00a2e1809bec8a3" | ||
| checksum = "2a2b10dcb159faf30d3f81f6d56c1211a5bea2ca424eabe477648a44b993320e" |
There was a problem hiding this comment.
This should not be happening, we should be running with arrow 57.2.0 57.1.0, I needed to do a PR reverting an unintended change like this recently:
There was a problem hiding this comment.
I tried updating, still big diff but this shoudl be right?
There was a problem hiding this comment.
Sorry! I meant arrow 57.1.0. The diff in the Cargo.lock file should be minimal
src/execution_plans/broadcast.rs
Outdated
| /// | ||
| /// Represents a single cache entry in the [BroadcastExec] cache. This entry will hold the results | ||
| /// for a single "real" partition. | ||
| #[derive(Debug)] | ||
| struct CacheEntry { |
There was a problem hiding this comment.
How about putting all this structs at the bottom of the file (above tests). That way, the doc comment above belongs to the correct structure.
There was a problem hiding this comment.
One other thing that comes to mind is: I don't think this fits the definition of a cache. A cache is something like a temporal storage for data that gets accessed very frequently, however, the only common thing is that this stores data. In order words: following the same reasoning, anything that stores data in memory could be considered a cache.
What this is, it's more like a storage that gets produced by one entity and consumed by many, which seems to fit the definition of a broadcast queue rather than a cache.
I think using the broadcast terminology for this represents with more accuracy what it really is.
There was a problem hiding this comment.
moved, and I played around with some words to be better fitting. let me know what you think
src/execution_plans/broadcast.rs
Outdated
| // Use Weak to avoid a ref-cycle (CacheEntry -> task -> CacheEntry) and | ||
| // allow cleanup when all consumers drop. | ||
| let entry = Arc::downgrade(self); |
There was a problem hiding this comment.
🤔 this feels a bit sketchy. I think there should be cleaner ways of doing this. The fact that the current abstractions can put the code in situations were reference cycles happen makes me think that they are probably better represented with different abstractions.
At first sight, it can be a consequence of trying to put too many things onto the CacheEntry struct.
There was a problem hiding this comment.
Tried separating shared state from the entry so the task could hold a strong Arc instead of Weak. This eliminates the cycle but adds an extra indirection in the hot path (entry.shared.state vs entry.state) which showed up as a perf regression in benchmarks.
The Weak here is intentional and should be fine I believe, it lets the entry drop early if all consumers cancel, and Weak::upgrade is cheap when the entry is still alive.
There was a problem hiding this comment.
It's not that much about how expensive it is computationally speaking, it's more that the fact that there is a dependency cycle in code suggests that the chosen abstractions might not be the best ones for a human to comprehend what's happening.
In this project we are in a lot of situations where we need to do the same thing for keeping a task alive, and in these other places we are not in positions where we need to resource to dependency cycles.
Let me suggest an idea
src/execution_plans/broadcast.rs
Outdated
| let Some(entry) = Weak::upgrade(&entry) else { | ||
| return Ok(()); | ||
| }; | ||
| entry.push_batch(Arc::new(batch))?; |
There was a problem hiding this comment.
I see two challenges to solve with the way things are getting buffered right now:
- We need to register any retained record batch in DataFusion's
dyn MemoryPool, present in the execution context. This is what DataFusion uses for tracking memory consumption and perform spilling if necessary. Note how all operators that accumulate data in DataFusion do this. Not registering appropriately can make the process OOM rather than gracefully failing. - We should find a mechanism of freeing the memory once all the data is consumed. Right now, data will be retained even long after the build side has been fully consumed. The channel approaches had this by construction, but here we would need to do it manually.
There was a problem hiding this comment.
I see what you are saying, yes that was nice about the channel approaches. let me know if what I implemented is like what you had in mind.
There was a problem hiding this comment.
I still think that the current choice of structures for modeling the problem is not the most adequate, as it's still cornering you into situations with dependency cycles, unnecessary locking and manual operations like freeing memory or starting the tasks "if necessary". It's also overall too much code for something that should be simpler.
It reminds of situations where the code is initially generated by an LLM, it realized there's something going wrong, and in order to make up for it further patches are added on top without questioning the initial structure, like the dependency cycle.
It would be nice to explore a more natural solution that satisfies our requirements "by construction" rather than manually.
I've tried to reflect these thoughts into code, and this is what I came up with:
The implementation is nearly half the amount of code (215 vs 380 LOC) and performance is similar or slightly better (because of less unnecessary locking):
Bench results against the head commit of this PR
broadcast_cache_scenarios/scenario/all_fast
time: [1.0182 ms 1.0227 ms 1.0288 ms]
change: [-2.8109% -1.7997% -0.8921%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 1 outliers among 10 measurements (10.00%)
1 (10.00%) high mild
Benchmarking broadcast_cache_scenarios/scenario/one_slow: Collecting 10 samples in estbroadcast_cache_scenarios/scenario/one_slow
time: [335.19 ms 335.97 ms 336.77 ms]
change: [-0.2062% +0.1015% +0.3992%] (p = 0.56 > 0.05)
No change in performance detected.
Benchmarking broadcast_cache_scenarios/scenario/one_cancel: Collecting 10 samples in ebroadcast_cache_scenarios/scenario/one_cancel
time: [1.0141 ms 1.0158 ms 1.0173 ms]
change: [-2.7607% -1.8285% -0.9978%] (p = 0.00 < 0.05)
Change within noise threshold.
Benchmarking broadcast_cache_scenarios/scenario/unused_partition: Warming up for 3.000Benchmarking broadcast_cache_scenarios/scenario/unused_partition: Collecting 10 samplebroadcast_cache_scenarios/scenario/unused_partition
time: [999.92 µs 1.0038 ms 1.0081 ms]
change: [-2.1000% -1.3509% -0.6288%] (p = 0.00 < 0.05)
Change within noise threshold.
Benchmarking broadcast_cache_scenarios/scenario/many_consumers: Warming up for 3.0000 Benchmarking broadcast_cache_scenarios/scenario/many_consumers: Collecting 10 samples broadcast_cache_scenarios/scenario/many_consumers
time: [3.1661 ms 3.1893 ms 3.2430 ms]
change: [-11.068% -8.6364% -6.2442%] (p = 0.00 < 0.05)
Performance has improved.
Found 2 outliers among 10 measurements (20.00%)
I think there's something to solve in that code about the memory reservation from DataFusion's MemoryPool getting dropped too soon, but besides that, I think it's a more natural way of modeling the problem.
EDIT: I'm realizing that i think both implementations (the one in this PR and the one I shared above) might have a race condition.
loop {
let entry_for_notify = Arc::clone(&entry);
let notified = entry_for_notify.notify.notified(); // Line 417: start listening to `notify_waiters()` calls.
let result = match entry.read_at_index(idx) {
Ok(r) => r,
Err(err) => return Some((Err(err), StreamState::Done)),
};
if let Some(batch) = result.batch {
// ... return batch
}
if result.done {
// ... return None or error
}
notified.await; // Line 439: wait for `notify_waiters()`
// DANGEROUS ZONE: here, and until the loop goes back to line 417, nobody is listening to `notify_waiters()`,
// so if the producer side decides to perform the last call to `notify_waiters()` during the very small nanosecond gap
// between this line and going back to line 417, the next call to `notified.await` will block forever.
}| version = "57.1.0" | ||
| version = "57.2.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "cb372a7cbcac02a35d3fb7b3fc1f969ec078e871f9bb899bf00a2e1809bec8a3" | ||
| checksum = "2a2b10dcb159faf30d3f81f6d56c1211a5bea2ca424eabe477648a44b993320e" |
There was a problem hiding this comment.
Sorry! I meant arrow 57.1.0. The diff in the Cargo.lock file should be minimal
src/execution_plans/broadcast.rs
Outdated
| // Use Weak to avoid a ref-cycle (CacheEntry -> task -> CacheEntry) and | ||
| // allow cleanup when all consumers drop. | ||
| let entry = Arc::downgrade(self); |
There was a problem hiding this comment.
It's not that much about how expensive it is computationally speaking, it's more that the fact that there is a dependency cycle in code suggests that the chosen abstractions might not be the best ones for a human to comprehend what's happening.
In this project we are in a lot of situations where we need to do the same thing for keeping a task alive, and in these other places we are not in positions where we need to resource to dependency cycles.
Let me suggest an idea
|
btw, I benchmarked this against |
this is getting me hyped up |
|
Ok I am really liking this approach. This is similar to what we talked about but there seems to be an exact structure made for managing the future. We could just use I also ensured that memoory is managed properly when all consumers finish. Since the MemoryReservation was created in the producer task, it would get dropped while the buffered queues would still be alive. This forced me to add some extra fields and structs but this seemed alright in my eyes. We may still want to consider cases where consumers are never created or never dropped. Maybe if a node fails? But this can be follow up work. The queue inner might seem a bit off-putting but this is to have cheap copies and think it is intuitive. We see good results, and these reflect in the local tpcds bench: Let me know thoughts @gabotechs |
src/execution_plans/broadcast.rs
Outdated
| } | ||
| } | ||
|
|
||
| fn push(&self, entry: QueueEntry) { |
There was a problem hiding this comment.
At the moment of pushing, we have ownership of the broadcast queue, so we can afford to do fn push(&mut self, entry: QueueEntry) and avoid some locks.
| #[derive(Debug, Clone, Copy)] | ||
| struct BroadcastState { | ||
| len: usize, | ||
| closed: bool, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct BroadcastQueue { | ||
| inner: Arc<BroadcastQueueInner>, | ||
| } | ||
|
|
||
| type QueueEntry = Result<datafusion::arrow::record_batch::RecordBatch, Arc<DataFusionError>>; | ||
|
|
||
| #[derive(Debug)] | ||
| struct BroadcastQueueInner { | ||
| entries: Mutex<Vec<QueueEntry>>, | ||
| state_tx: watch::Sender<BroadcastState>, | ||
| expected_consumers: usize, | ||
| completed_consumers: AtomicUsize, | ||
| buffered_bytes: AtomicUsize, | ||
| mem_reservation: Mutex<MemoryReservation>, | ||
| } |
There was a problem hiding this comment.
IMO I think this is getting messy again. I don't think you need this many structs or this many synchronization primitives to model this problem.
The problem to solve here is really just that we cannot use a tokio::sync::Notify because of the small time gap between the notification arrival and the re-subscription. I think you are on point with using a tokio::sync::watch channel as an alternative to tokio::sync::Notify, but it can be implemented in a simpler way.
Some drawbacks I see with this approach:
- We couple the broadcast queue to specifically a
QueueEntryrather than letting it be generic. This is not an issue for this specific case, but signals that the problem was not model in a general way, even though it's a general problem about broadcasting. - We are again back to the manual operations like "try_release", rather than letting the things happen "by construction"
- We have extra synchronization primitives that are potentially unnecessary like the
Mutex<MemoryReservation> - We have overall too much code for what we are doing (double the amount of structs, and ~100 lines of extra code).
Here's one suggestion using tokio::sync::watch as the notification channel:
And here's another one using tokio::sync::broadcast as the notification channel (for some reason, this one seems to be slightly faster, even though I think they should be the same)
There was a problem hiding this comment.
The extra structs on my solution is for memory accountinh. The linked watch stream solution per batch reservation is correct but clones a new MemoryConsumer per batch which is uneeded overhead. The queue level doesn't have that cost and lets me release memory exactly when closed and all consumers done. That’s why I added QueueInner, counters, and try_release: it keeps accounting tied to data lifetime without per‑batch consumer clones.
One possible compromise is:
- keep watch and simple queue
- keep one reservation in the producer
- split() it per batch and store Arc with each entry
There was a problem hiding this comment.
Also note about broadcast, even if slightly faster I don't prefer here because it can drop messages on lag
There was a problem hiding this comment.
The linked watch stream solution per batch reservation is correct but clones a new MemoryConsumer per batch which is uneeded overhead
It should be negligible (you can verify it in the benchmarks) and it does simplify the code. This comes from a limitation from upstream that was addressed in apache/datafusion#19759, so it will be available on v53. Until then this is the simplest thing to do.
Note that it also avoids a Mutex lock, which should be an order of magnitude more expensive than the clone.
The queue level doesn't have that cost and lets me release memory exactly when closed and all consumers done
MemoryReservation is designed to automatically release memory on Drop, it would be nice to use it rather than doing it manually.
This is one example of what I'm referring to in doing things "by construction" rather than manually.
Also note about broadcast, even if slightly faster I don't prefer here because it can drop messages on lag
That should be fine, a "lagged" notification is also a valid notification. I do prefer the watch based solution though, feel more organic.
There was a problem hiding this comment.
Ok there are just three things:
-
I enjoy having the BroadcastState struct. It is like a single source of truth for our atomic operations. We could consolidate this into
struct BroadcastState {
len: usize,
closed: bool,
}
I know there was feedback n the extra structs but this would eliminate an extra atomic and no risk of one drifting from the other -
Having an explicit close() method would guarantee drops when done, it would be one small method and is just defensive.
-
This is a major nit: I think has_more can become stale and we can just use the WatchStream for all updates.
Consumer:
reads idx=0
sets has_more = true
returns Poll::Ready(Some)
Producer:
finishes, closes queue
Queue state:
entries.len == 1
closed == true
Next poll:
has_more is still true -> stale
consumer checks entries -> None
sees closed -> returns None
we could just use the WatchStream which just tells us exactly what we need
There was a problem hiding this comment.
There is a race condition in this implementation that causes the tpcds benchmark to hang indefinitely. Be careful when running, I just crashed laptop because I walked away 😅
We are using what watch::Sender just as a signal with is_closed as a separate state and has_more as a hint. This makes a convoluted timing window where the consumer just waits forever
We are only polling notify when has_more = false.
Produceer:
push batch 1
push batch 2
done now
Drop(queue)
is_closed = true
notify consumers
Consumer:
subscribes
has_more = false
polls notify -> nothing changes again
returns Pending
Hang forever
I added logs to print what is happening and this happens in tpcds query 1. Here are the exact logs:
BroadcastConsumer: pending wait (index=0, closed=false)
BroadcastConsumer: pending wait (index=0, closed=false)
BroadcastConsumer: pending wait (index=0, closed=false)
BroadcastQueue: drop -> closed
BroadcastQueue: drop -> closed
BroadcastConsumer: closed (index=0)
BroadcastConsumer: closed (index=0)
BroadcastConsumer: pending wait (index=0, closed=false)
BroadcastConsumer: pending wait (index=0, closed=false)
BroadcastQueue: drop -> closed
BroadcastConsumer: pending wait (index=0, closed=false)
BroadcastConsumer: closed (index=0)
As seen even though the queue drops the some consumer's timing is off and they hang forever.
This is the motivation for the broadcast state. Rather than just being a signal it can hold state and everything being derived from this preventing timing issues. I know we want to keep code minimal but I view this as necessary and safely handling state and race conditions
There was a problem hiding this comment.
the struct will eliminate the stream + atomic bool and the has_more creating a single source of truth.
this will also eliminate atomic checks like suggested in the comment above
There was a problem hiding this comment.
Any chance of solving that while still simplifying the code?
There was a problem hiding this comment.
I think what I am suggesting is actually simpler than what is currently suggested. The extra struct while at first glance seems like more complex code but its really encapsulating a lot and eliminating race conditions in a clean manner.
I will draft it and send
There was a problem hiding this comment.
What needs fixing and why
The current watch‑based queue solves the notify race, but it still never releases cached batches until the entire queue drops. This seems ok but I found differently in the benches, this means:
- memory grows unbounded if any consumer stalls or drops
- producers keep pushing even when consumers stop polling
I have confirmed this with logging and running the benchmarks.
Our missing piece is per‑batch lifetime management tied to live consumers.
This is my minimal change that I can come up with
I’ll keep the watch‑based BroadcastState { len, closed } and add one counter per entry:
- Each entry is stored as (value, remaining_consumers)
- On push: remaining_consumers = active_consumers
- On consumer yield: decrement, if it hits zero, pop the entry
- On consumer drop: decrement remaining for all not‑yet‑consumed entries and pop any newly free entries
This keeps the code small, preserves the simple watch notification model, and makes memory release correct.
I think an effective way to see this is by trying the logging to see. Insert these logs and run the tpcds benchmark:
1) BroadcastQueue::push
fn push(&self, entry: T) {
self.entries.lock().unwrap().push(entry);
eprintln!("push len={}", self.entries.lock().unwrap().len());
let _ = self.notify.send(());
}
2) Drop for BroadcastQueue
impl<T: Clone> Drop for BroadcastQueue<T> {
fn drop(&mut self) {
self.is_closed.store(true, Ordering::Release);
eprintln!("queue drop -> closed");
let _ = self.notify.send(());
}
}
3) BroadcastConsumer::poll_next (yield + closed)
match entry {
Some(v) => {
self.index += 1;
self.has_more = true;
eprintln!("yield index={}", self.index);
return Poll::Ready(Some(v));
}
None => {
if self.is_closed.load(Ordering::Acquire) {
eprintln!("closed index={}", self.index);
return Poll::Ready(None);
}
self.has_more = false;
}
}You should see push len=... continuing to grow while yield index=... stops (and no queue drop -> closed), meaning the producer keeps caching batches but consumers are no longer progressing, so memory grows unbounded and the query hangs.
Make sure that you cancel the benchmark because your computer may die 😄
b4b6c53 to
c7b58be
Compare
| tokio-stream = "0.1.17" | ||
| tokio-stream = { version = "0.1.17", features = ["sync"] } |
There was a problem hiding this comment.
I just realized we have this dependency duplicated

Context and Problem
closes #319
1. Panics in tpcds
There have been panics after that look like:
This was happening because some operators (like TopK) cancel once satisfied. When used with
BroadcastExecthis would cause issues with the caching mechanism: since the consumer task was responsible for populating the cache, if it was cancelled, the cache would not be populated although it should be. Thus, the first consumer task would execute partition, begin populating cache, abort it when cancelled, second task would execute the same partition because cache not populated.2. Streaming Cache
There was follow-up work noted to make the cache in broadcast "streaming".
Prior to this, the cache was simple: the first task would collect all batches from the input and populate the cache. This is not optimal as there can be multiple subscribers that want the same partition data concurrently. Requiring all other consumers to wait until all batches have been fetched obviously causes slow-down as consumers can receive partial data as its received.
Solution
This PR solves these two problems by:
CacheEntryandEntryStateResults / Benches
Some queries that are using broadcast see speedups 😄
and we definitively fixed the panic error on re-executing the same partition. here is a before and after on a query that had this panic:
Before: we see the panic about partitions being executed multiple times

After: we no longer see it but see the arrow error as explained below

IMPORTANT NOTE - DataFusion Regression
Some tpcds queries panic with:
due to upstream error:
I have confirmed that this is an upstream DataFusion issue (which I may or may not have time to do tonight 😅) and not related to broadcast, but it unveiled it. I proved this by reproducing this error on single-node DataFusion on the same data:
Prerequisite: have the tpcds data downloaded at some arbitrary location
datafusion-cliWhen initially debugging this, I didn't see it in my single-node (I was using 50.2.0). Then I realized we upgraded in distributed thus I upgraded to (52.1.0) and the error occurs.