Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggestion: WebAssembly support #479

Open
oli-w opened this issue Apr 29, 2024 · 22 comments
Open

Suggestion: WebAssembly support #479

oli-w opened this issue Apr 29, 2024 · 22 comments

Comments

@oli-w
Copy link

oli-w commented Apr 29, 2024

Hi,
I was interested in seeing if I could run differential-dataflow on the web by compiling to WebAssembly using wasm-pack and it worked out really well! However it required some hacks changes to differential-dataflow and timely-dataflow, specifically:

  • For both packages, replacing std::time::Instant with web_time::Instant - crate: web-time This is because there is no "current time" implemented for the wasm32-unknown-unknown compilation target - so we have to hook it up to the browser's Performance.now() function. In all other contexts this uses the regular std::time::Instant under the hood so shouldn't make any difference.
  • Differential dataflow has a 63 bit shift (1 << 63) which needs to be (1 << 31) in 32-bit WebAssembly.

The changes required are:

I was wondering if you would be interested in incorporating this or would prefer to keep this kind of change as a fork?

@frankmcsherry
Copy link
Member

It sounds interesting! I'll need a bit to get my head around what it entails, but it seems quite reasonable to 1. limit the use of assumptions about time, and 2. be as serious as possible about usize sizes.

If it's alright, I'll take a few to read up on things. Mostly, I'd love to avoid having code in TD/DD that I don't understand, because it's largely on me if something is broken elsewhere. But I have to imagine if nothing else a feature flag for time would be really easy, as should be fixing DD's 64 bit assumption (which is probably a bug anyhow).

@antiguru
Copy link
Member

I think all changes look fine from a high level. The time changes don't seem to change any behavior on non-wasm targets. The RHH code isn't used anywhere at this point. Before committing to supporting wasm, I'd like to know if there's a regression test we could have. It seems testing isn't as simple as it is for other targets, but if we can get it into CI then I don't think there's anything blocking this.

As @frankmcsherry points out, there are some parts of the code that might assume usize to have the same length as u64. Removing this assumption would be a win in itself.

@frankmcsherry
Copy link
Member

I've started to take a look (sorry for the delay) and have some quick thoughts:

  1. The DD changes .. could probably just instead be deletion of the associated code. At least, the time changes are a. an unused _timer, and b. a YieldingIter that isn't used other than in some commented-out Kafka code (it could become commented out also, and place a similar obligation on uncommenting the code as the rdkafka connections). If we clean this all up, then no one needs timers at all in here.
  2. The DD changes miss an important use in the dogsdogsdogs project, which is the only load-bearing subproject (though I can understand that it isn't obvious). Specifically, half_join.rs uses Instant as an argument to the yield_function, allowing the user to call .elapsed() if they are interested and not otherwise. We could also just not have dogs3 compile on web assembly / delay that work until someone needs it.
  3. The TD changes are harder to avoid, but do seem some amount of harmless. There's some weirdly dead-ish code (viz Worker::timer()), but things like scheduling, sequencing, and activation seem to want to do things based on external notions of time, and logging just seems somewhat wedged without access to time.

I'm going to look into a DD PR that essentially tracks your changes (fixing RHH) but subtracts out the time-based code rather than modifies it. I'll report back here about that.

@frankmcsherry
Copy link
Member

The RHH fix seems more subtle than at first glance. The code also has elsewhere this

        /// Indicates both the desired location and the hash signature of the key.
        fn desired_location<K: Hashable>(&self, key: &K) -> usize {
            let hash: usize = key.hashed().into().try_into().unwrap();
            hash / self.divisor
        }

where the try_into() is from u64 to usize.

The code is not currently active, though .. it could become so in the future when I get some time. Ideally it wouldn't silently break the WASM builds, though this is an example of where it could/would.

My understanding of WASM is that 64-bit types are fine, just that usize is 32 bits. So, probably the right thing here (and perhaps elsewhere) is to be more serious about either using u64 throughout, or pivoting to usize earlier (and using it throughout).

@oli-w
Copy link
Author

oli-w commented May 18, 2024

Apologies for my lack of reply sooner - I've been away. Thanks for making progress on these changes 😃.

The DD changes miss an important use in the dogsdogsdogs project

Ah yes, I haven't used that before.

We could also just not have dogs3 compile on web assembly / delay that work until someone needs it.

I'd be very happy with that - I have had a lot of fun composing DD operators without having to reach for what dogsdogsdogs provides.

My understanding of WASM is that 64-bit types are fine, just that usize is 32 bits

Yep that's right, similar to using u128 on a 64-bit machine. There may be other issues that aren't exposed until runtime of some code paths - full disclosure: I tried out making the surgical changes required to get DD working in WASM, found issues, fixed, rinse-and-repeat, rather than reading through all the code to find where usize being 32 might be problematic. The left shift was picked up by the compiler - you can reproduce with cargo build --target wasm32-unknown-unknown.

Let me know if there's something I can help with that doesn't involve changing production "load-bearing" code. E.g. setting up some tests as suggested - details for WASM: https://rustwasm.github.io/wasm-bindgen/wasm-bindgen-test/index.html. Note this does run the compiled WASM using Node JS. Or I can provide a basic WASM example for motivation - play around with an interactive dataflow by visiting a web page and clicking on some buttons.

@frankmcsherry
Copy link
Member

I'm looking at this again, and have some thoughts and questions:

  1. The DD stuff seems to be tidied up. I have not looked at the dogs^3 stuff yet, but happy to punt on that.
  2. The TD stuff .. well the main thing it wants time for is ultimately std::thread::park_timeout(duration). I'm a bit surprised that WASM supports this, but does not support time otherwise. Does it maybe just return without parking? The other uses seem to be a. the sequence.rs construct (which I think one can supply with a generic "ticker"), and b. the logging which similarly could have a generic ticker, though it's a bit more pervasive.

@antiguru : I'm looking at the code and Activations::activate_after is the reason all of this exists, and internally it is called only by replay_core, which wants to supply a period (essentially, it has no other mechanism to avoid a hot loop). Are there other big users you can think of? Currently mulling ripping it out and saying that if you need a time-based deferred activation, you should schedule one yourself.

Alternately, making the scheduler pluggable, which has been a long-standing ask, I think!

@frankmcsherry
Copy link
Member

@oli-w you said

Let me know if there's something I can help with that doesn't involve changing production "load-bearing" code.

and that would be great! I'm not very familiar with the ecosystem, and it would be great to have something to aim at / with. In particular, I'd love to remove / factor out "time" generally, rather than replace it with something that works in a few more cases. And while I can test much of that myself, understanding how to try it out for WASM could make that more productive.

@frankmcsherry
Copy link
Member

I put up TimelyDataflow/timely-dataflow#577 which may allow you to use TD without it invoking Instant::now() or Instant::elapsed(). To try this out, follow the examples/threadless.rs model, where you just create a worker directly and use it. This allows you to pass in a None for the "initial instant", and .. many things may just work? All core uses of ::now() and .elapsed() should derive from this Option<Instant>, and most of them shouldn't panic if you try and use them (e.g. access to logging is now an Option<_> that depends on time; calling activate_after() schedules a thing immediately). There are still uses in examples/, in sequence.rs (an optional synchronization primitive) and in the default worker construction (e.g. everything except threadless.rs).

@oli-w
Copy link
Author

oli-w commented Aug 27, 2024

The TD stuff .. well the main thing it wants time for is ultimately std::thread::park_timeout(duration). I'm a bit surprised that WASM supports this, but does not support time otherwise.

So far I have been running DD inside WASM using a Web Worker - which is similar to running on a separate thread. There are quite a few hoops to jump through to get threads working in WASM and several caveats (details), which I haven't tried yet. For the moment I'm happy to run everything in one dedicated Web Worker. I had sidestepped the code that parks the thread by manually calling what timely::execute_directly does, but without worker.step_or_park() called in a loop:

let alloc = Generic::Thread(timely::communication::allocator::thread::Thread::new());
let config = timely::WorkerConfig::default();
let worker = Worker::new(config, alloc);

Then I'm able to control how often I push progress through and when by calling worker.step() manually. It also allows the code to return execution so that JavaScript can call in later to send more inputs. Of course this won't work once delay is used (which I want to add for time-based expression).

Currently mulling ripping it out and saying that if you need a time-based deferred activation, you should schedule one yourself.

I like this idea of being able to provide your own scheduler. The default implementation could be parking a thread, whereas for other cases I imagine steps:

  1. TD says "call me back in at least X duration", which IIUC is the time of the earliest next event to process, enqueued via a delay.
  2. In WASM it's quite simple to hook it up to JavaScript's setTimeout to enqueue a callback. In other systems this could be using some kind of ad-hoc scheduled task API / external service.
  3. Call back into TD.

it would be great to have something to aim at / with

Awesome, I'll work on setting something up.


I'm having a look at TimelyDataflow/timely-dataflow#577 and will report back, thanks!

@antiguru
Copy link
Member

@antiguru : I'm looking at the code and Activations::activate_after is the reason all of this exists, and internally it is called only by replay_core, which wants to supply a period (essentially, it has no other mechanism to avoid a hot loop). Are there other big users you can think of? Currently mulling ripping it out and saying that if you need a time-based deferred activation, you should schedule one yourself.

We're only using it in one place in Materialize, which is in mz_replay. It needs to periodically wake up to drain inputs and advance frontiers. We might be able to replace it with a separate task I think that schedules activations across thread boundaries. So, no objections for removing this code!

@frankmcsherry
Copy link
Member

Thinking out loud: we have the ability to activate operators across threads, and arguably a polling approach to waking up operators is a bit of a smell. Not terrible, but 100% one should strive to active an operator as soon as there is work to do, and anywhere we are not doing that we could consider how we might do it (e.g. hand an activator to the thing that is enqueueing data to replay).

I'm sure there are moments where one can't do that (some queue that you don't control the implementation of), so maybe removing it isn't the best. But, good to chew on and understand. TD is pretty close to being free of "real time" concepts.

@frankmcsherry
Copy link
Member

Ah, and activate_after is also the method that led to that DOS at one point, because it cannot dedup requests. Definitely worth re-thinking! I'll ponder "pluggable schedulers" a bit!

@antiguru
Copy link
Member

Yeah, spamming activations needs to be avoided. Materialize uses a wrapper that needs to be ack'ed before it can be scheduled again: https://github.com/MaterializeInc/materialize/blob/e194a6f3fdd59c8efd5bd7521da7fba9cb79ac3d/src/timely-util/src/activator.rs#L25

@frankmcsherry
Copy link
Member

I pushed another commit onto TimelyDataflow/timely-dataflow#577, this time moving the action of parking into the scheduler (and away from the communication layer). This allows the scheduler to "conceal" its internal take on whether and for how long to park, rather than needing to communicate it through the worker to the communication layer.

I'm not 100% certain why parking was in the communication layer, tbh. It's possible that I've forgotten something important, but the logic for all channel allocators was just to check if their events queue was empty and park if so. That's easy enough to externalize (we have just drained events before this, and only need to make sure that we are in a state where any future events will unpark the thread).

This is a bit off topic from the PR's subject (make Instant optional), but is noodling in the direction of making the scheduler pluggable (by concealing more of its opinions). @oli-w if things go sideways as I push at the branch, 100% it may make sense just to use the first commit.

@oli-w
Copy link
Author

oli-w commented Aug 30, 2024

Exciting progress! TimelyDataflow/timely-dataflow#577 (comment)

I still need to create a test though to aim towards for WASM with deferred events, that could potentially be solved with a "pluggable scheduler". Can you remind me how I can schedule data in TD / DD to be delay by some amount of wall-clock time (i.e. a Duration) or am I dreaming and that doesn't actually exist? I know there is TD .delay but that delays the Scope::Timestamp not "real time". I seem to recall seeing some method you could call, and then the Activator would provide a Duration that parks the thread until the earliest next wall-clock time event.

@frankmcsherry
Copy link
Member

frankmcsherry commented Aug 30, 2024

Right, so: two things you could do:

  1. Try and use the scheduler to do this. There is a scheduler method activate_after, which .. is the sort of thing that the PR gets in front of. It relies on access to some sort of timer, and is the sort of thing that (I think) would cause WASM to choke at the moment (i.e. without a substitute for "real time"). Happy to discuss, but I don't personally recommend this. At least, let's discuss option 2.
  2. Use the time component of updates, in the way that delay would: change e.g. (data, time, diff) into (data, max(time, new_time), diff). This moves the update forward in the notion of time that TD/DD "understands". It results in a computation with clearer deterministic behavior, that does not depend on the vagaries of scheduling.

A third option, in between these two, is to have a deferred event that introduces the data through an input at a time of your choosing. It is still presented to TD/DD as an explicitly timestamped event, but you get to control when this happens. So, do the "delaying" at the boundary of your app and TD/DD, rather than "inside TD/DD" using either of the first two mechanisms.

If you can say a bit more about your goal with the delaying I could opine a bit more. Most uses that I know of prefer the second option (in Materialize these are called "temporal filters", and we don't really use the first form because of the non-determinism it would introduce).

@frankmcsherry
Copy link
Member

I have a light background goal of putting together a TD/DD demo that uses JSON objects as the "data", and yields a playground that allows you to stitch dataflows together, interactively in a web app. Still TBD what the "Rust closure" replacement will be (some lightweight IR?) but .. if I make progress on that it might result in a clearer forum to work through idioms.

@oli-w
Copy link
Author

oli-w commented Aug 30, 2024

I basically want to do the same thing as mz_now(). As an example use case, let's say I have a query against event database rows that have a time column, with a temporal filter on that:

SELECT ... FROM event
WHERE mz_now() > event.time

I want to SUBSCRIBE to changes in this query to trigger future scheduled events.
Looking at the predicate, I can determine if the predicate is true/false for the current system time flowing through TD/DD. If event.time is in the future, I can also determine the future time when the predicate will become true, so calculate that new future time and put it into a (data, time, diff), then I get a bit lost trying to figure out how to re-incorporate it back into the computation to update the predicate. Maybe if data is a boolean for the predicate value, I would emit (false, t=0, +1) and then at that future event time, emit (false, t=future, -1) and (true, t=future, +1) 🤔.

What I also haven't quite understood is how to handle "quiet" periods, where the system isn't otherwise doing anything (so nothing is calling worker.step()). Imagine this is used to trigger sending out some emails in the middle of the night, where no other activity is happening.

To make sure this change is "pushed through" TD/DD, I need to either:

  • Repeatedly advance time and call worker.step() every X seconds (this feels like polling / wasteful?) in case something was delayed to that time.
  • Determine the next minimum value of time from the system, which will cause a change in a predicate. So if the next "interesting" time is 6 hours away, externally schedule something to call me back after that time to then continue calling worker.step(). Is this describing option 3?

@frankmcsherry
Copy link
Member

I see! I think I understand. Let me unpack how mz_now() works, and I think there may be a thing to do, but it may require help outside of TD/DD, or it may not work and we'll discover that.

So what using WHERE mz_now() <= event.time does is transform (event, time, diff) into two updates,

  1. the update itself: (event, time, diff),
  2. a future retraction: (event, max(time, event.time), -diff).
    This works even with a negative diff, and even if event.time is less than time.

You can implement this with a flat_map; nothing fancy other than some careful logic about which updates to produce at which times. The updates flow downstream, and the retraction likely "lingers", in that until TD is certain that no more updates for max(time, event.time) could happen it won't commit to that being "final".

What MZ does is continually "tick" the inputs, second-by-second, or as frequently as you would like to be certain that nothing much has changed. In that context, things could change at any moment, because the updates are coming in from external sources. This is the first thing that you've indicated, and .. it's not too wasteful; TD/DD don't need to do lots of work to stay up to date, but .. they will actually do a reasonable amount of work when temporal filters are in play.

But you make a good point that one doesn't fundamentally need to tick the system to see if anything has changed.

One thing that seems possible is that when you have a temporal filter construct, you could just downstream put event.time into the data of a stream of updates, forked from the ones you were working with. For example, (event, time, diff) would result in

  1. the update (event.time, time, diff)
  2. a future retraction (event.time, max(time, event.time), -diff).
    For as long as the retraction is unresolved, i.e. from time until max(time, event.time), the collection reflects event.time. But it tells you this immediately, at time, rather than asking you to discover it by probing the system at various times. The "min" of this collection would I think tell you the next moment at which, all other things being equal, the output could possibly change.

This is a bit ad-hoc, in that I'm making it up and the logic might be wrong. It certainly is embedding an understanding of what the temporal filter is going to do into DD logic, rather than having DD directly tell you the truth about what work is outstanding. It seems like a good ask to want to reflect that information, though.

Perhaps another similar way to accomplish the same is rather than using a temporal filter as above, having it present as another output of the computation named please_retrect_me_at, and it is a collection of (event, future_time, diff) data, with an understanding that someone, you perhaps, should introduce (event, future_time, -diff) once you are ready to advance the input to future_time. This is a bit more manual, in that you have to do work that DD would have done, but also by being more manual it is more explicit and less magical, and there is less magic to have to work around.

@oli-w
Copy link
Author

oli-w commented Aug 31, 2024

Thanks, that makes sense to me at a high level. I'll have to do a bit of experimenting to make sure I can put it into practice. If MZ achieves this by regularly ticking time forward, I'm happy to do the same and that will work fine in WASM - I just need to hook it up to be called using setInterval.

... having it present as another output of the computation named please_retrect_me_at

I think this is closest to what I originally had in mind. The trick with a pair of updates cancelling each other out at some time is pretty neat though, gut feeling is this would be the most reliable and I'm keen to try that approach.


I have a light background goal of putting together a TD/DD demo that uses JSON objects as the "data", and yields a playground that allows you to stitch dataflows together, interactively in a web app.

I worked on something today that hopefully can help - check out https://oli-w.github.io/dd-wasm-playground, where you can:

  • Enter some key,value,diff triples (i.e. a single Collection of (String, String))
  • A "view" that is simply some unique key, plus a key to look up in the Collection.

This is using the timeless branch you created, running DD in WASM and saving the inputs + "view" to local storage in between page refreshes. There's a few edge cases not handled - e.g. you have to refresh the page if you remove the last matching result from a view. However my focus wasn't really on creating interesting demo-able queries yet but more so getting all the plumbing done and then sharing to hear what kinds of views you're interested in seeing. The performance overhead of calling back into JavaScript repeatedly is quite high, so I think Rust closures would indeed need to be replaced with a data structure you can send to WASM as JSON to represent "views" (like MZ's various IR's).

Code is here: https://github.com/oli-w/dd-wasm-playground feel free to play around and I'm happy to help with any questions.

@frankmcsherry
Copy link
Member

Oh fascinating! Very neat, and I'll check this out. I have been poking at a minimal IR for DD, and potentially blending that it could be neat. I'll try it out and see if I can propose anything! :D

@frankmcsherry
Copy link
Member

frankmcsherry commented Sep 3, 2024

Sharing time! I'm not certain how to show things off, on account of I don't know how the modern web works, but .. I have a rig that lets you do things like

    <script type="module">
        import init, { write_to_console } from './out/timely_wasm.js';

        async function run() {
            await init();

            let plan = [];
            
            plan[0] = { 
                name: "filter",
                inputs: ["input"],
                operator: "filter",
                logic: "return parseInt(key) > 5",
            };

            plan[1] = { 
                name: "map",
                inputs: ["filter"],
                operator: "map",
                logic: "return { key: key, val: (parseInt(key) + 1).toString() }",
            };

            plan[2] = { 
                name: "trans",
                inputs: ["map"],
                operator: "map",
                logic: "return { key: val, val: key }",
            };

            plan[3] = { 
                name: "join",
                inputs: ["trans", "map"],
                operator: "join",
                logic: "return { key: val0, val: val1 }",
            };

            write_to_console(plan);
        }

        run();
    </script>

and then when you load the page the console reports things like (there is an inspect after each operator):

Screenshot 2024-09-03 at 6 37 39 PM

It's about 100 lines of Rust, mostly bouncing in and out of strings and JsValue, but .. seems to "work". :D

It starts pre-loaded with the 0 .. 10 collection, and all data are (String, String). Supports map, filter, join, and reduce. Would be pretty easy to support one level of iteration also (variables for names, rather than collections).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants