Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot run in multiple worker theads #54

Open
Bahlinc-Dev opened this issue May 3, 2022 · 4 comments
Open

Cannot run in multiple worker theads #54

Bahlinc-Dev opened this issue May 3, 2022 · 4 comments

Comments

@Bahlinc-Dev
Copy link

Is this package context aware?
It does run in node worker threads, but only a single thread.

I have an appsink in my pipeline, if I run 2 or more threads it will segfault during init:

Thread 1: pipeline.findChild('sink')
Result: OK

Thread 2: pipeline.findChild('sink')
Result: FATAL ERROR: v8::HandleScope::CreateHandle() Cannot create a handle without a HandleScope

@Bahlinc-Dev
Copy link
Author

Bahlinc-Dev commented May 3, 2022

I know why you didn't see this before, it's tricky to reproduce.

I was trying to put together a simple test case and found it didn't happen all time, where as on our production pipeline it fails immediately. Then I realized it depends on what elements you use, how many instances, what you interact with on the pipeline, etc, but it will fail eventually at some point if you are using multiple instances across threads.

Here is the simplest test case I could produce that DOES fail, but you may have to run it a few times.
It creates 2 threads, and 2 instances per thread.

To run:

  1. put index.js, worker.js, and the samplevideo from ./examples/loop/samplevideo.mp4 in a folder
  2. start with UV_THREADPOOL_SIZE=10 node ./index.js

// index.js

const { Worker } = require('worker_threads');
async function startWorker(script, id) {
    return new Promise((resolve, reject) => {
        const worker = new Worker(script, { workerData: { id: id } });
        worker.on('online', () => {
            console.log(`Worker ${worker.threadId} online`);
            resolve(worker);
        });
        worker.on('message', msg => {
            console.log('Worker msg: ' + msg);
        });
    });
}

async function run() {
    const makeWorkers = 2
    for (let i = 0; i < makeWorkers; i += 1) {
        const worker = await startWorker('./worker.js', i);
    }
}
run();

// worker.js

const gstreamer = require('gstreamer-superficial');
const { parentPort, workerData } = require('worker_threads');
function createInstance(instanceId) {
    parentPort.postMessage(`CREATE INSTANCE Worker id:${workerData.id} instanceId:${instanceId}`);
    // simple broken pipeline
    let pipeline = new gstreamer.Pipeline(`
        filesrc location=./samplevideo.mp4 
        ! qtdemux 
        ! mp4mux
        ! appsink name=sink
    `);
    pipeline.play();
    const sink = pipeline.findChild('sink');
    let bufCount = 0;
    function onMp4Data(buf, caps) {
        if (buf) {
            bufCount += 1;
            if (!(bufCount % 50)) {
                parentPort.postMessage(`got 50 buffers in worker id:${workerData.id} cnt:${bufCount}`);
            }
            sink.pull(onMp4Data);
        }
    }
    sink.pull(onMp4Data);
    pipeline.pollBus( (msg) => {
        switch (msg.type) {
            case 'eos':
                console.log('End of Stream, making another instance');
                    createInstance(instanceId);
                break;
        }
    });
}
createInstance(1);
createInstance(2);

The full error on my machine is:

Worker 1 online
Worker msg: CREATE INSTANCE Worker id:0 instanceId:1
Worker msg: CREATE INSTANCE Worker id:0 instanceId:2
Worker 2 online
Worker msg: CREATE INSTANCE Worker id:1 instanceId:1
Worker msg: CREATE INSTANCE Worker id:1 instanceId:2
Worker msg: got 50 buffers in worker id:0 cnt:100
Worker msg: got 50 buffers in worker id:0 cnt:100
Worker msg: got 50 buffers in worker id:1 cnt:100
Worker msg: got 50 buffers in worker id:1 cnt:100
Worker msg: got 50 buffers in worker id:0 cnt:200
Worker msg: got 50 buffers in worker id:0 cnt:200
Worker msg: got 50 buffers in worker id:1 cnt:200
Worker msg: got 50 buffers in worker id:1 cnt:200
Worker msg: got 50 buffers in worker id:0 cnt:300
Worker msg: got 50 buffers in worker id:0 cnt:300
Worker msg: got 50 buffers in worker id:1 cnt:300
Worker msg: got 50 buffers in worker id:1 cnt:300
FATAL ERROR: v8::HandleScope::CreateHandle() Cannot create a handle without a HandleScope
End of Stream, making another instance
 1: 0xb00e10 node::Abort() [node]
Worker msg: CREATE INSTANCE Worker id:0 instanceId:1
 2: 0xa1823b node::FatalError(char const*, char const*) [node]
 3: 0xceddda v8::Utils::ReportApiFailure(char const*, char const*) [node]
 4: 0xe590a2 v8::internal::HandleScope::Extend(v8::internal::Isolate*) [node]
 5: 0xcef491 v8::HandleScope::CreateHandle(v8::internal::Isolate*, unsigned long) [node]
 6: 0x7f1d84690314  [/home/rc/iftest/node_modules/gstreamer-superficial/build/Release/gstreamer-superficial.node]
 7: 0xd4b6de  [node]
 8: 0xd4beb3 v8::internal::Builtins::InvokeApiFunction(v8::internal::Isolate*, bool, v8::internal::Handle<v8::internal::HeapObject>, v8::internal::Handle<v8::internal::Object>, int, v8::internal::Handle<v8::internal::Object>*, v8::internal::Handle<v8::internal::HeapObject>) [node]
 9: 0xe1a0ab  [node]
10: 0xe1a9ae v8::internal::Execution::New(v8::internal::Isolate*, v8::internal::Handle<v8::internal::Object>, v8::internal::Handle<v8::internal::Object>, int, v8::internal::Handle<v8::internal::Object>*) [node]
11: 0xd0a27c v8::Function::NewInstanceWithSideEffectType(v8::Local<v8::Context>, int, v8::Local<v8::Value>*, v8::SideEffectType) const [node]
12: 0x7f1d8469140d GObjectWrap::NewInstance(Nan::FunctionCallbackInfo<v8::Value> const&, _GObject*) [/home/rc/iftest/node_modules/gstreamer-superficial/build/Release/gstreamer-superficial.node]
13: 0x7f1d84696b7b Pipeline::FindChild(Nan::FunctionCallbackInfo<v8::Value> const&) [/home/rc/iftest/node_modules/gstreamer-superficial/build/Release/gstreamer-superficial.node]
14: 0x7f1d84692a5c  [/home/rc/iftest/node_modules/gstreamer-superficial/build/Release/gstreamer-superficial.node]
15: 0xd4a82e  [node]
16: 0xd4bc4f v8::internal::Builtin_HandleApiCall(int, unsigned long*, v8::internal::Isolate*) [node]
17: 0x15e7dd9  [node]
Aborted (core dumped)

@Bahlinc-Dev
Copy link
Author

Bahlinc-Dev commented May 3, 2022

Note that you can create as many instances as you want within a SINGLE thread, and it will work fine. This only happens when you have instances in different threads.

@dturing
Copy link
Owner

dturing commented Oct 14, 2023

@Bahlinc-Dev node-gstreamer-superficial should be context-aware since #50 was closed- but i can't really test. do you have a suggestion what to do about this (or ideally a PR)? (i have little time to dig into this at all these days).

@onfire4g05
Copy link

onfire4g05 commented Nov 13, 2023

I also am having this issue when using workers using the latest version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants