AsyncCallbacks lost on finished stage by error#23185
Conversation
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
build failed due to binary incompatible - not sure how to fix this so far |
|
Test PASSed. |
|
Ref #23078 |
patriknw
left a comment
There was a problem hiding this comment.
hmm, I need to think some more about this. So far only detailed comments without understanding if there is a better solution.
| Source.queue[Unit](10, OverflowStrategy.fail).toMat(TestSink.probe)(Keep.both).run() | ||
| intercept[IllegalStateException] { | ||
| Await.result( | ||
| (1 to 15).map(_ ⇒ queue.offer(())).last, 3.seconds) |
There was a problem hiding this comment.
is this guaranteed to throw?
I think you have to use a TestSink that is not requesting elements to be sure?
There was a problem hiding this comment.
actually, I use TestSink.probe in line 297. So I believe this is what you are expecting.
| val out = Outlet[T]("queueSource.out") | ||
| val out = Outlet[T]("queueSource.offerout") | ||
| override val shape: SourceShape[T] = SourceShape.of(out) | ||
| println(QueueSource) |
| abstract class ConcurrentGraphStageLogic[T] private[stream] (override val inCount: Int, override val outCount: Int) extends GraphStageLogic(inCount, outCount) { | ||
| def this(shape: Shape) = this(shape.inlets.size, shape.outlets.size) | ||
|
|
||
| val concurrentSet = ConcurrentHashMap.newKeySet[Promise[_]]() |
| */ | ||
| private[this] final val lock = new ReentrantLock | ||
|
|
||
| private[this] val callbackState = new AtomicReference[CallbackState](NotInitialized(Nil)) |
There was a problem hiding this comment.
This is only accessed inside the lock so then there shouldn't be any need for an AtomicReference.
|
|
||
| ac.ack.map(ack ⇒ { | ||
| ack.future.andThen { case _ ⇒ concurrentSet.remove(ack) }(ec) | ||
| concurrentSet.add(ack) |
There was a problem hiding this comment.
place this above previous line, so that things are not optimized and remove happens before add
| } | ||
|
|
||
| @throws(classOf[Exception]) | ||
| override def postStop(): Unit = { |
There was a problem hiding this comment.
easy to forget calling super.postStop from subclass
|
|
||
| override def postStop(): Unit = stopCallback { | ||
| override def postStop(): Unit = stopWithStoppedCallbackLogic { | ||
| case Pull(promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached")) |
| * To preserve message order when switching between not initialized / initialized states | ||
| * lock is used. Case is similar to RepointableActorRef | ||
| */ | ||
| private[this] final val lock = new ReentrantLock |
There was a problem hiding this comment.
I'd prefer synchronized if we don't use any specific features from ReentrantLock
|
|
||
| @throws(classOf[Exception]) | ||
| override def postStop(): Unit = { | ||
| concurrentSet.asScala.foreach(_.failure(new IllegalStateException("Stream is stopped. Callback is detached."))) |
|
Looking at how ordinary
Could we introduce a and in What am I missing? |
|
It's a little bit more complex. Source.queue returns futures for each element. Future succeeded when an element is pulling by a stream. The issue here is that some data elements already went through runAsyncInput and waiting to be processed by the actor. At this particular time actor finishes because of stream completion. In this situation, some Futures will not be completed at all. This is what I'm trying to solve here. |
|
Still I can try "HasCallbackPromise as trait" approach instead of "AckedCallback abstract class" it you think this will give some benefits. |
|
Isn't another issue that it's not safe to return a callback as materialized value, because it can be inoked "too early". That is what the calback wrapper is trying to solve. |
|
@patriknw , callback wrapper solves the issue of invoking materialized value too early - current PR is enhancing wrapper by adding tracking of what messages being processed by stream to complete all futures |
|
Right, Optimal solution would be to have callbacks that "just works", and not a special one for when you use them as materialized value. This is difficult and it's great that we start thinking about how to solve this. |
ca8285c to
1c5fe18
Compare
|
rebased and seems like added all feedback changes. |
|
Test PASSed. |
|
Test PASSed. |
patriknw
left a comment
There was a problem hiding this comment.
LGTM, with 2 small things
| ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$Initialized$") | ||
| ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$NotInitialized$") | ||
| ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$CallbackState") | ||
| ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper") No newline at end of file |
There was a problem hiding this comment.
The file should now be for 2.5.6, and something was added there. Should this be removed?
| // not started yet | ||
| case list @ Pending(l) ⇒ if (!currentState.compareAndSet(list, Pending(Event(event, OptionVal.None) :: l))) internalInvoke(event) | ||
| // started - can just send message to stream | ||
| case Initialized ⇒ onAsyncInput(event) |
There was a problem hiding this comment.
move this case to the top
e672e45 to
9b43ce7
Compare
|
pushed changes per recent feedback |
|
Test PASSed. |
|
Test PASSed. |
johanandren
left a comment
There was a problem hiding this comment.
LGTM, excellent work @agolubev !
|
Yay, great work @agolubev |
jrudolph
left a comment
There was a problem hiding this comment.
Seems I missed the train on reviewing soon enough...
It seems that I seem to be missing something. Where are the promises ever completed? I think we also need tests for the new features. A few smaller comments below.
| /** | ||
| * Dispatch an asynchronous notification. | ||
| * This method is thread-safe and may be invoked from external execution contexts. | ||
| * Promise in `HasCallbackPromise` will fail if stream is already closed or closed before |
There was a problem hiding this comment.
Seems, the description is based on an outdated version that still had HasCallbackPromise.
| (currentState.getAndSet(Initializing): @unchecked) match { | ||
| case Pending(l) ⇒ l.reverse.foreach(ack ⇒ { | ||
| onAsyncInput(ack.e) | ||
| }) |
There was a problem hiding this comment.
Instead of adding @unchecked, we usually add a catch-all clause that throws an IllegalStageException to make it explicit that missing a state was not an oversight.
| interpreter.onAsyncInput(GraphStageLogic.this, event, handler.asInstanceOf[Any ⇒ Unit]) | ||
| val result = new ConcurrentAsyncCallback[T](handler) | ||
| asyncCallbacksInProgress.add(result) | ||
| if (_interpreter != null) result.onStart() |
There was a problem hiding this comment.
Is it prevented (or does it need to be prevented) that onStart is called twice, once here, and once below in beforePreStart? Is it the assumption, that if _interpreter != null we currently are executing preStart? But wouldn't then the onStart triggered in beforePreStart already be called?
| case list @ Pending(l) ⇒ if (!currentState.compareAndSet(list, Pending(Event(event, OptionVal.None) :: l))) internalInvoke(event) | ||
| // initializing is in progress in another thread (initializing thread is managed by akka) | ||
| case Initializing ⇒ if (!currentState.compareAndSet(Initializing, Pending(Event(event, OptionVal.None) :: Nil))) { | ||
| (currentState.get(): @unchecked) match { |
There was a problem hiding this comment.
What's the benefit of not just recursing directly? Isn't it now missing matching for Completed?
| // started - can just send message to stream | ||
| case Initialized ⇒ sendEvent(event, promise) | ||
| // initializing is in progress in another thread (initializing thread is managed by Akka) | ||
| case Initializing ⇒ if (!currentState.compareAndSet(Initializing, Pending(Event(event, OptionVal(promise)) :: Nil))) { |
There was a problem hiding this comment.
See below, why not recurse directly?
|
|
||
| // external call | ||
| override def invokeWithFeedback(event: T): Future[Done] = { | ||
| val promise: Promise[Done] = Promise[Done]() |
There was a problem hiding this comment.
Where is the promise ever completed? Isn't that missing?
* Add reproduction unit test * Port akka/akka-core#23970 * Update API Approval list * Update API Approval list * Add async callback unit tests * skip non-implemented feature * Fix TaskCompletionSource allocation problem * Code cleanup * Implement early callback from akka/akka-core#23185 * Make sure early callbacks also get cancelled on stop * Fix naming and copyright header * Update API Approval list * Fix missing feedback callback * rerun tests * Implement locks * cleanup lock code * extend locking to `_callbackWaitingForInterpreter` * Fix unit test * rerun tests * xml-doc and TBD cleanup * more TBD and XML-DOC cleanup * defined `ConcurrentAsyncCallback` * introduced `ConcurrentAsyncCallback` into `GraphStage` * fixed all compilation errors * cleanup hub code * added API approvals * cleaned up reverse --------- Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
Ref #23111
Please review approach of adding all incoming requests to queue with acknowledge future.
In case the stream is finished but some futures stay not completed - we are failing them automatically.
There is a plan to substitute CallBackWapper with ConcurrentGraphStage everywhere (perhaps need a new name to explain that stage allows making callbacks not taking into account stream lifecycle.)
Let me know what do you think
There still some sync points in new GraphStage this is because we need to synchronize ideal Akka world with the not ideal external world. Initially, it was considered to introduce special actor with extended lifecycle to avoid sync points - but this was rejected.
Any ideas as to avoid synchronization are welcome.