-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream stuck when used with mongodb and node10 #670
Comments
Nothing immediately stands out to me. I won't have time to look deeply into
this today, but a few questions:
1. Why do you batch and then flatten? That seems redundant to me.
2. Can you try using the 3.0 beta (instructions in the README). Is it still
broken?
3. What kind of data needs to be in the database to reproduce this? Does
the number or size of the records matter?
…On Mon, Feb 4, 2019, 10:02 AM jeanbaptiste-brasselet < ***@***.*** wrote:
The following example works just fine with node 8/9 but stays stuck when
using node 10+
const MongoClient = require('mongodb').MongoClient;
const hl = require('highland');
const MONGO_URL = 'mongodb://localhost:27017/';
const DB_NAME = 'someDb';
const COLLECTION = 'someCollection';
const run = async () => {
const mongo = new MongoClient(url);
await mongo.connect();
const cursor = mongo.db(DB_NAME)
.collection(COLLECTION)
.find()
.stream();
return new Promise((resolve, reject) => {
return hl(cursor)
.batch(5)
.flatten()
.map(x => console.log(x && x._id))
.stopOnError(reject)
.done(resolve);
});
};
run()
.catch(console.log)
.then(() => process.exit())
If you replace the mongoDb stream by a simple stream like this one it
works just fine:
class Counter extends Readable {
constructor(opt) {
super({ ...opt, objectMode: true });
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
this.push({ count: i });
}
}
}
Thing is I had a look at the stream implementation inside mongoDB lib and
they used the node stream implementation, I have seen nothing weird about
it. And something else a little bit strange, if you remove the flatten
inside the hl chain, the stream will not be stucked (even with node10).
Anyway I have no clue whet the problem is so if anyone has an idea please
share it.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#670>, or mute the thread
<https://github.com/notifications/unsubscribe-auth/AGIyZTWF37W1GbMnQ16rzomNF1igFgSYks5vKFmHgaJpZM4ahgE3>
.
|
About mongo : tried with last mongo driver and several mongo server. (3.0, 3.2, 4.0). I will try with another mongo driver version. |
Ok, I know what's wrong. There's an Turns out, it's possible for Highland to emit Here's a simplified test case without mongo. const {Readable, Writable} = require('stream')
const _ = require('highland');
class Counter extends Readable {
constructor(opt) {
super({ ...opt, objectMode: true });
this._max = 10;
this._index = 1;
this._firstTime = false;
}
_emitNext() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
this.push({ count: i });
}
}
_read() {
if (this._firstTime) {
// Delay the first item so that when it's emitted, Highland isn't in a
// generator loop. This allows 'drain' to be emitted within write().
this._firstTime = false;
setTimeout(() => this._emitNext(), 0);
} else {
this._emitNext();
}
}
}
async function fn() {
await _(new Counter())
.batch(2)
.consume((err, x, push, next) => {
// Delay the batch so that the write() -> emit('drain') -> write() stack
// can unwind, causing multiple increments of awaitDrain.
setTimeout(() => {
if (x !== null) {
next();
}
push(err, x);
}, 0);
})
.filter((x) => {
console.log(x);
return false;
})
.toPromise(Promise);
console.log('done');
}
fn(); In node 9, this wasn't a problem, since there was a guard that prevented this kind of double increment. Node 10+ contains nodejs/node#18516, which removed the guard in favor of some other logic. It's not clear to me that the change in Node 10 is absolutely correct, since there can still be a deadlock if write happens to be called within a write. However, it's also clear that Highland's usage of the The fix is simple enough: just don't drain unless we've previously paused the source. I'll submit it later this week. |
Wow, thx for the detailed explanation and for the time spent on this. |
I am applying this fix to the 2.x branch even though caolan#670 only seems to happen in 3.x, since the 2.x code isn't really doing the right thing. It's likely that the issue doesn't manifest due to some other coincidence.
I am applying this fix to the 2.x branch even though caolan#670 only seems to happen in 3.x, since the 2.x code isn't really doing the right thing. It's likely that the issue doesn't manifest due to some other coincidence. As a bonus, correctly using the pipe/drain protocol allows us to remove the target.pause() call from the implementation of through().
I think I have a fix. As a sanity check, can you apply this diff to your diff --git a/lib/index.js b/lib/index.js
index 927ae19..3faf0de 100755
--- a/lib/index.js
+++ b/lib/index.js
@@ -834,6 +834,10 @@ function Stream(generator) {
this._defer_run_generator = false;
this._run_generator_deferred = false;
+ // Signals whether or not a call to write() returned false, and thus we can
+ // drain. This is only relevant for streams constructed with _().
+ this._can_drain = false;
+
var self = this;
// These are defined here instead of on the prototype
@@ -1258,8 +1262,9 @@ addMethod('resume', function () {
if (this._generator) {
this._runGenerator();
}
- else {
+ else if (this._can_drain) {
// perhaps a node stream is being piped in
+ this._can_drain = false;
this.emit('drain');
}
});
@@ -1641,6 +1646,11 @@ addMethod('pull', function (f) {
addMethod('write', function (x) {
// console.log(this.id, 'write', x, this.paused);
this._writeOutgoing(x);
+
+ if (this.paused && !this._generator) {
+ this._can_drain = true;
+ }
+
return !this.paused;
});
@@ -3412,7 +3422,6 @@ addMethod('through', function (target) {
return target(this);
}
else {
- target.pause();
output = this.createChild();
this.on('error', writeErr);
target.on('error', writeErr); You should be able to run I want to verify that there's not a deeper issue that I've missed. |
Hello, I can confirm the changes also fix my use case :) |
This fix has been released as 3.0.0-beta.8 and 2.13.1. |
Hello, I really like Highland and I would like to continue to use it but it starts to be a problem with newest version of node.
The following example works just fine with node 8/9 but stays stuck when using node 10+
If you replace the mongoDb stream by a simple stream like this one it works just fine:
Thing is I had a look at the stream implementation inside mongoDB lib and they used the node stream implementation, I have seen nothing weird about it. And something else is a little bit strange, if you remove the flatten inside the hl chain, the stream will not be stucked anymore(even with node10).
Anyway I have no clue what the problem is so if anyone has an idea please share it.
The text was updated successfully, but these errors were encountered: