Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove handlers dynamically will lost message #286

Closed
Rookiexu opened this issue Nov 18, 2019 · 0 comments
Closed

Remove handlers dynamically will lost message #286

Rookiexu opened this issue Nov 18, 2019 · 0 comments

Comments

@Rookiexu
Copy link
Contributor

Rookiexu commented Nov 18, 2019

This code is copy from my project and WorkProcessor;

@Override
    public void decrConsumer() {
        int nextUnUsed = getNextUsed();
        if (nextUnUsed != -1) {
            RingBuffer<HandlerEvent> ringBuffer = disruptor.getRingBuffer();
            WorkProcessor processor = processors[nextUnUsed];
            AbstractSentinelHandler handler = handlers[nextUnUsed];
            if (processor == null || handler == null) {
                System.out.println("remove disruptor thread ,handler == " + handler + " ,processor == " + processor);
            }
            if (processor != null && handler != null) {
               //  ******************** problem in here *********************
                processor.halt();
                try {
                    handler.awaitShutdown();
                } catch (InterruptedException e) {
                    System.out.println(e);
                }
                ringBuffer.removeGatingSequence(processor.getSequence());
            }

            processors[nextUnUsed] = null;
            handlers[nextUnUsed] = null;
            updateUseState(nextUnUsed, NU_USED);
        }
    }
  @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }

                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    //  ******************** problem in here *********************
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

After i use processor.halt() to stop workProcess, cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence) throws an AlertException exception, and the message of index nextSequence is lost

How can I solve this problem?

Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant