diff --git a/src/Consumer.cc b/src/Consumer.cc index e56f8ba..0b82436 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -77,32 +77,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListe Napi::Object msg = Message::NewInstance({}, data->cMessage); Consumer *consumer = data->consumer; - // `consumer` might be null in certain cases, segmentation fault might happend without this null check. We - // need to handle this rare case in future. - if (consumer) { - Napi::Value ret; - try { - ret = jsCallback.Call({msg, consumer->Value()}); - } catch (std::exception &exception) { - logMessageListenerError(consumer, exception.what()); - } + Napi::Value ret; + try { + ret = jsCallback.Call({msg, consumer->Value()}); + } catch (std::exception &exception) { + logMessageListenerError(consumer, exception.what()); + } - if (ret.IsPromise()) { - Napi::Promise promise = ret.As(); - Napi::Function catchFunc = promise.Get("catch").As(); + if (ret.IsPromise()) { + Napi::Promise promise = ret.As(); + Napi::Function catchFunc = promise.Get("catch").As(); - ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) { - Napi::Error error = info[0].As(); - logMessageListenerError(consumer, error.what()); - })}); + ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) { + Napi::Error error = info[0].As(); + logMessageListenerError(consumer, error.what()); + })}); - promise = ret.As(); - Napi::Function finallyFunc = promise.Get("finally").As(); + promise = ret.As(); + Napi::Function finallyFunc = promise.Get("finally").As(); - finallyFunc.Call( - promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })}); - return; - } + finallyFunc.Call( + promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })}); + return; } data->callback(); } @@ -111,7 +107,7 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag std::shared_ptr cMessage(rawMessage, pulsar_message_free); MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx; - Consumer *consumer = (Consumer *)listenerCallback->consumer; + Consumer *consumer = static_cast(listenerCallback->consumerFuture.get()); if (listenerCallback->callback.Acquire() != napi_ok) { return; @@ -135,7 +131,7 @@ void Consumer::SetListenerCallback(MessageListenerCallback *listener) { } if (listener != nullptr) { - listener->consumer = this; + listener->consumerPromise.set_value(this); // If a consumer listener is set, the Consumer instance is kept alive even if it goes out of scope in JS // code. this->Ref(); @@ -168,11 +164,6 @@ struct ConsumerNewInstanceContext { auto cConsumer = std::shared_ptr(rawConsumer, pulsar_consumer_free); auto listener = consumerConfig->GetListenerCallback(); - if (listener) { - // pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer - pulsar_consumer_pause_message_listener(cConsumer.get()); - } - deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) { Napi::Object obj = Consumer::constructor.New({}); Consumer *consumer = Consumer::Unwrap(obj); @@ -180,11 +171,6 @@ struct ConsumerNewInstanceContext { consumer->SetCConsumer(cConsumer); consumer->SetListenerCallback(listener); - if (listener) { - // resume to enable MessageListener function callback - resume_message_listener(cConsumer.get()); - } - return obj; }); } diff --git a/src/MessageListener.h b/src/MessageListener.h index ff4efee..704c8ec 100644 --- a/src/MessageListener.h +++ b/src/MessageListener.h @@ -21,14 +21,16 @@ #define MESSAGELISTENER_H #include +#include struct MessageListenerCallback { Napi::ThreadSafeFunction callback; - // Using consumer as void* since the ListenerCallback is shared between Config and Consumer. - void *consumer; + // Use future store consumer point, because need ensure sync. + std::promise consumerPromise; + std::shared_future consumerFuture; - MessageListenerCallback() : consumer(nullptr) {} + MessageListenerCallback() : consumerPromise(), consumerFuture(consumerPromise.get_future()) {} }; #endif