-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Help Needed] CombineLatest in the style of RxJs #228
Comments
I should add that I also looked at withLatestFrom, which doesn't seem to match on requirements 1 & 4. WithLatestFrom seems to work best when you have a source stream with which you want to combine another stream. CombineLatest is subtly different in that all source streams are considered equal. |
I have some further information. The problem happens under the following conditions:
Under these circumstances, the nested stream's Eg myouterStream.map(() => {
flyd.withLatestFrom(
getAccessStream(),
getRolesStream(),
)
.pipe(flyd.once()) // Commenting this out makes this work as expected
.map(([roles, access]) => {
console.log(roles, access); // never gets called
});
}) Moving the nested stream out of the parent also gets things working as normal. Implementation of flyd.once = flyd.curryN(1, function (stream$) {
return flyd.combine(
function (s$, self) {
self(s$());
self.end(true);
},
[stream$],
);
}); Does the implementation of |
Would be great if you could elaborate on the problem, maybe with a code snippet on flems? |
Even without the code snippet it's worth calling out that flyd's "end" logic is far from perfect and relies on every operator doing the "right" thing as discussed here #216 (comment) |
flyd-zip is very similar, I think the code can be adapted to your needs. It also sounds very similar to what I'd done with I think there are two basic strategies, join and merge. This is one of them. The term combine is ambiguous in my opinion. |
Hi, @nordfjord Thanks for your quick response. I have a minimal(ish) repro on flems which I have also posted below. I think the problem is to do with the number of operators rather than with the implementation of one particular operator. I have added comments in a couple of places on how removing operators fixes the problem. I also noticed that adding a It's possible that I'm doing something stupid here with my implementation of the Any help here would be much appreciated! (Thanks, StreetStrider for pointing me to zip) /*******************************
Required flyd modules
*****************************/
// Once
flyd.once = flyd.curryN(1, function (stream$) {
return flyd.combine(
function (s$, self) {
self(s$());
self.end(true);
},
[stream$],
);
});
// With latest from
flyd.withLatestFrom = flyd.curryN(2, function (streams, source$) {
if (streams.constructor !== Array) streams = [streams];
return flyd.combine(
function (source, self) {
var res = [source()];
for (var i = 0; i < streams.length; ++i) {
if (!streams[i].hasVal) return;
res.push(streams[i]());
}
self(res);
},
[source$],
);
});
// Filter
flyd.filter = flyd.curryN(2, function (fn, s) {
return flyd.combine(
function (s, self) {
if (fn(s())) self(s.val);
},
[s],
);
});
/*******************************
App Code
*****************************/
class State {
stateStream = flyd.stream({ foo: 'initial-state'});
count = 0;
constructor() {}
getNext() {
const stream = this.stateStream.map(state => {
return {
...state,
count: ++this.count
};
})
.pipe(flyd.filter(value => !!value)); // Removing this also makes it work
return stream;
}
}
const state = new State();
state.getNext()
.map((value) => {
console.log('Top level', value);
// Line A
flyd.withLatestFrom(state.getNext(), state.getNext())
.pipe(flyd.once()) // Commenting this out makes it work
.map(value => {
console.log('Success', value); // The goal is to get to here
});
});
|
Thanks for the repro! I looked into it a bit and I think I can tell you what's going on. The I believe this behaviour was originally introduced so we could preserve the atomicity of updates, see #179 and #180, but looks like you've found an edge case where the behaviour isn't desired. To clarify, I think the case you're hitting has to do with the re-use of the same stream here. See this flems I'm curious, what is your actual use case for this data flow? (Not saying this isn't a bug, but I've never seen this data flow before so I'm curious) In any case, I think I can get a PR out to fix this soon but am dependent on @paldepind to release to NPM |
Hi, @nordfjord It's good news that the problem has been identified (and quickly too) as I was fearing something fundamentally was amiss. Regarding my use case, this is a simple Redux-stye state manager. (I would say this is more like NgRx but without much of the drama.) Basically, a copy of the state object is pushed onto the stream whenever there is an update to the application state (eg user clicks a button). Listeners, which are typically other UI components then react to the state update. I saw your flems and TBH at one point I was also thinking that I need to be creating a new stream whenever the state got an update. But I assumed that Many thanks for the quick response. |
Hi, @nordfjord, I've applied your fix locally and things are working as expected. Many thanks for this! |
Going back to the original problem, my implementation of |
@NoelAbrahams How did you conclude that Before a stream is updated there is a check that all of its dependencies are "met" meaning that they have a value. This should satisfy 3. In the implementation of Again, I might misunderstand something so please excuse me if that is the case. |
Hi, @paldepind, I've gone back and had a look at the example for lift and it doesn't look like anything that I remember seeing! Haven't checked it out, but I agree that on paper lift seems to be doing the same thing as |
Hi, @paldepind the problem with Once could define the function separately of course (as in the example on the This definitely falls within the realm of nitpicking — I admit that. But the point of going functional is to make the code more readable too. So, these things do tend to matter. |
Depending on your definition of "functional" one might consider Note, you can always derive your const combineLatest = (...streams) => lift((...args) => args, ...streams) |
Doesn't seem to have the same behaviour as my version of Need to look into this further to see what I'm doing wrong. But thanks — that would certainly be a nice solution! |
What is the behaviour you're expecting? here's an example flems The main difference is that it ends when ALL of the deps end instead of when ANY of the deps end. I would consider that desirable personally. |
@nordfjord I think I'm fine with that too (just rolling out flyd on something and will get back if I have further thoughts). Tracing my steps back to why I mistakenly opened this issue, I remember looking into the implementation of So, in summary, Thanks |
Hi, guys
I wonder if someone could help me with this problem? I'm trying to create a static method on flyd that has exactly the same behaviour as RxJs's CombineLatest. Needs to have the following:
flyd.combineLatest(stream1, stream2)
.flyd.combineLatest(stream1, stream2).map(([v1, v2]) => {...});
.I've looked at the existing
lift
module, but this does not seem to handle 3 & 4.Here's my attempt so far:
This seems to work, except for a very odd problem when
combineLatest
is used inside another stream.Happy to elaborate on this, but really would appreciate if anyone can cast an eye over my implementation and spot anything that is obviously wrong.
I should add that my attempt is a bit of guesswork at the moment copied blatantly from the
merge
implemenation — in case you couldn't tell that already!Thanks
Noel
The text was updated successfully, but these errors were encountered: