diff --git a/build.gradle b/build.gradle index f37800fe6..d90778247 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ apply plugin: 'idea' defaultTasks 'build' group = 'com.lmax' -version = new Version(major: 3, minor: 5, revision: 0) +version = new Version(major: 3, minor: 4, revision: 3) ext { fullName = 'Disruptor Framework' @@ -41,10 +41,8 @@ ext { if (!project.hasProperty('sonatypePassword')) sonatypePassword = '' } - sourceSets { perf.java.srcDir file('src/perftest/java') - java9.java.srcDir file('src/main/java9') } eclipse.classpath.plusConfigurations += [ sourceSets.perf.compileClasspath ] @@ -57,7 +55,6 @@ dependencies { // checkstyle 'com.puppycrawl.tools:checkstyle:8.12' testCompile 'junit:junit:4.12' perfCompile 'org.hdrhistogram:HdrHistogram:2.1.10' - java9Compile sourceSets.main.output } idea.module { @@ -65,6 +62,10 @@ idea.module { scopes.TEST.plus += [ configurations.perfCompile ] } +sourceCompatibility = 1.7 +targetCompatibility = 1.7 + + compileJava { // Suppress warnings about using Unsafe and sun.misc options.compilerArgs << '-XDignore.symbol.file' @@ -72,25 +73,6 @@ compileJava { options.debug = true options.forkOptions.executable = javaCompilerExecutable options.warnings = false - sourceCompatibility = 1.7 - targetCompatibility = 1.7 -} - -compileJava9Java { - sourceCompatibility = 1.9 - targetCompatibility = 1.9 -// options.compilerArgs.addAll(['--release', '9']) -} - -task testJava9(type: Test) { - dependsOn jar - def jdkHome = System.getenv("JAVA_9") - classpath = files(jar.archivePath, classpath) - sourceSets.main.output - executable = file("$jdkHome/bin/java") - doFirst { - println classpath.asPath - println "$name runs test using JDK 9" - } } tasks.withType(Test) { @@ -114,19 +96,12 @@ javadoc { } jar { - manifest.attributes( - 'Built-By': System.properties['user.name'], - 'Bundle-Name': fullName, - 'Bundle-Vendor': teamName, - 'Bundle-Description': fullDescription, - 'Bundle-DocURL': siteUrl, - 'Multi-Release': true, - 'Automatic-Module-Name': moduleName - ) - - into('META-INF/versions/9') { - from sourceSets.java9.output - } + manifest.attributes('Built-By': System.properties['user.name'], + 'Bundle-Name': fullName, + 'Bundle-Vendor': teamName, + 'Bundle-Description': fullDescription, + 'Bundle-DocURL': siteUrl, + 'Automatic-Module-Name': moduleName) } task sourcesJar(type: Jar) { diff --git a/src/main/java/com/lmax/disruptor/Main.java b/src/main/java/com/lmax/disruptor/Main.java deleted file mode 100644 index daf13c5ca..000000000 --- a/src/main/java/com/lmax/disruptor/Main.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.lmax.disruptor; - -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.util.DaemonThreadFactory; - -public class Main -{ - private static class MainEvent - { - public T t; - } - - public static void main(String[] args) - { - Disruptor> d = new Disruptor<>( - new EventFactory>() - { - @Override - public MainEvent newInstance() - { - return new MainEvent<>(); - } - }, - 1024, - DaemonThreadFactory.INSTANCE - ); - - d.publishEvent(new EventTranslator>() - { - @Override - public void translateTo(MainEvent event, long sequence) - { - event.t = "" + sequence; - } - }); - } -} diff --git a/src/main/java/com/lmax/disruptor/RingBuffer.java b/src/main/java/com/lmax/disruptor/RingBuffer.java index 75b8369fd..3799f3e8a 100644 --- a/src/main/java/com/lmax/disruptor/RingBuffer.java +++ b/src/main/java/com/lmax/disruptor/RingBuffer.java @@ -16,7 +16,10 @@ package com.lmax.disruptor; +import sun.misc.Unsafe; + import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.Util; abstract class RingBufferPad { @@ -25,7 +28,34 @@ abstract class RingBufferPad abstract class RingBufferFields extends RingBufferPad { + private static final int BUFFER_PAD; + private static final long REF_ARRAY_BASE; + private static final int REF_ELEMENT_SHIFT; + private static final Unsafe UNSAFE = Util.getUnsafe(); + + static + { + final int scale = UNSAFE.arrayIndexScale(Object[].class); + if (4 == scale) + { + REF_ELEMENT_SHIFT = 2; + } + else if (8 == scale) + { + REF_ELEMENT_SHIFT = 3; + } + else + { + throw new IllegalStateException("Unknown pointer size"); + } + BUFFER_PAD = 128 / scale; + // Including the buffer pad in the array base offset + REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128; + } + + private final long indexMask; private final Object[] entries; + protected final int bufferSize; protected final Sequencer sequencer; RingBufferFields( @@ -33,32 +63,34 @@ abstract class RingBufferFields extends RingBufferPad Sequencer sequencer) { this.sequencer = sequencer; + this.bufferSize = sequencer.getBufferSize(); - if (sequencer.getBufferSize() < 1) + if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } - if (Integer.bitCount(sequencer.getBufferSize()) != 1) + if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } - this.entries = new Object[sequencer.getBufferSize()]; + this.indexMask = bufferSize - 1; + this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory eventFactory) { - for (int i = 0; i < entries.length; i++) + for (int i = 0; i < bufferSize; i++) { - entries[i] = eventFactory.newInstance(); + entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } @SuppressWarnings("unchecked") protected final E elementAt(long sequence) { - return (E) entries[(int) (sequence & (entries.length - 1))]; + return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } } @@ -405,7 +437,7 @@ public long getCursor() */ public int getBufferSize() { - return sequencer.getBufferSize(); + return bufferSize; } /** @@ -878,9 +910,9 @@ private void checkBatchSizing(int batchStartsAt, int batchSize) { throw new IllegalArgumentException("Both batchStartsAt and batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and batchSize " + batchSize); } - else if (batchSize > sequencer.getBufferSize()) + else if (batchSize > bufferSize) { - throw new IllegalArgumentException("The ring buffer cannot accommodate " + batchSize + " it only has space for " + sequencer.getBufferSize() + " entities."); + throw new IllegalArgumentException("The ring buffer cannot accommodate " + batchSize + " it only has space for " + bufferSize + " entities."); } } @@ -1092,7 +1124,8 @@ private void translateAndPublishBatch( public String toString() { return "RingBuffer{" + - "sequencer=" + sequencer + + "bufferSize=" + bufferSize + + ", sequencer=" + sequencer + "}"; } } diff --git a/src/main/java9/com/lmax/disruptor/MultiProducerSequencer.java b/src/main/java9/com/lmax/disruptor/MultiProducerSequencer.java deleted file mode 100644 index c2070d9ac..000000000 --- a/src/main/java9/com/lmax/disruptor/MultiProducerSequencer.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Copyright 2011 LMAX Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lmax.disruptor; - -import com.lmax.disruptor.util.Util; - -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; -import java.util.concurrent.locks.LockSupport; - - -/** - *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s. - * Suitable for use for sequencing across multiple publisher threads.

- * - *

* Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call - * to {@link Sequencer#next()}, to determine the highest available sequence that can be read, then - * {@link Sequencer#getHighestPublishedSequence(long, long)} should be used.

- */ -public final class MultiProducerSequencer extends AbstractSequencer -{ - private static final VarHandle AVAILABLE_ARRAY; - - static - { - try - { - AVAILABLE_ARRAY = MethodHandles.arrayElementVarHandle(int[].class); - } - catch (final Exception e) - { - throw new RuntimeException(e); - } - } - - private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); - - // availableBuffer tracks the state of each ringbuffer slot - // see below for more details on the approach - private final int[] availableBuffer; - private final int indexMask; - private final int indexShift; - - /** - * Construct a Sequencer with the selected wait strategy and buffer size. - * - * @param bufferSize the size of the buffer that this will sequence over. - * @param waitStrategy for those waiting on sequences. - */ - public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) - { - super(bufferSize, waitStrategy); - availableBuffer = new int[bufferSize]; - indexMask = bufferSize - 1; - indexShift = Util.log2(bufferSize); - initialiseAvailableBuffer(); - } - - /** - * @see Sequencer#hasAvailableCapacity(int) - */ - @Override - public boolean hasAvailableCapacity(final int requiredCapacity) - { - return hasAvailableCapacity(gatingSequences, requiredCapacity, cursor.get()); - } - - private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) - { - long wrapPoint = (cursorValue + requiredCapacity) - bufferSize; - long cachedGatingSequence = gatingSequenceCache.get(); - - if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) - { - long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); - gatingSequenceCache.set(minSequence); - - if (wrapPoint > minSequence) - { - return false; - } - } - - return true; - } - - /** - * @see Sequencer#claim(long) - */ - @Override - public void claim(long sequence) - { - cursor.set(sequence); - } - - /** - * @see Sequencer#next() - */ - @Override - public long next() - { - return next(1); - } - - /** - * @see Sequencer#next(int) - */ - @Override - public long next(int n) - { - if (n < 1 || n > bufferSize) - { - throw new IllegalArgumentException("n must be > 0 and < bufferSize"); - } - - long current; - long next; - - do - { - current = cursor.get(); - next = current + n; - - long wrapPoint = next - bufferSize; - long cachedGatingSequence = gatingSequenceCache.get(); - - if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) - { - long gatingSequence = Util.getMinimumSequence(gatingSequences, current); - - if (wrapPoint > gatingSequence) - { - LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? - continue; - } - - gatingSequenceCache.set(gatingSequence); - } - else if (cursor.compareAndSet(current, next)) - { - break; - } - } - while (true); - - return next; - } - - /** - * @see Sequencer#tryNext() - */ - @Override - public long tryNext() throws InsufficientCapacityException - { - return tryNext(1); - } - - /** - * @see Sequencer#tryNext(int) - */ - @Override - public long tryNext(int n) throws InsufficientCapacityException - { - if (n < 1) - { - throw new IllegalArgumentException("n must be > 0"); - } - - long current; - long next; - - do - { - current = cursor.get(); - next = current + n; - - if (!hasAvailableCapacity(gatingSequences, n, current)) - { - throw InsufficientCapacityException.INSTANCE; - } - } - while (!cursor.compareAndSet(current, next)); - - return next; - } - - /** - * @see Sequencer#remainingCapacity() - */ - @Override - public long remainingCapacity() - { - long consumed = Util.getMinimumSequence(gatingSequences, cursor.get()); - long produced = cursor.get(); - return getBufferSize() - (produced - consumed); - } - - private void initialiseAvailableBuffer() - { - for (int i = availableBuffer.length - 1; i != 0; i--) - { - setAvailableBufferValue(i, -1); - } - - setAvailableBufferValue(0, -1); - } - - /** - * @see Sequencer#publish(long) - */ - @Override - public void publish(final long sequence) - { - setAvailable(sequence); - waitStrategy.signalAllWhenBlocking(); - } - - /** - * @see Sequencer#publish(long, long) - */ - @Override - public void publish(long lo, long hi) - { - for (long l = lo; l <= hi; l++) - { - setAvailable(l); - } - waitStrategy.signalAllWhenBlocking(); - } - - /** - * The below methods work on the availableBuffer flag. - *

- * The prime reason is to avoid a shared sequence object between publisher threads. - * (Keeping single pointers tracking start and end would require coordination - * between the threads). - *

- * -- Firstly we have the constraint that the delta between the cursor and minimum - * gating sequence will never be larger than the buffer size (the code in - * next/tryNext in the Sequence takes care of that). - * -- Given that; take the sequence value and mask off the lower portion of the - * sequence as the index into the buffer (indexMask). (aka modulo operator) - * -- The upper portion of the sequence becomes the value to check for availability. - * ie: it tells us how many times around the ring buffer we've been (aka division) - * -- Because we can't wrap without the gating sequences moving forward (i.e. the - * minimum gating sequence is effectively our last available position in the - * buffer), when we have new data and successfully claimed a slot we can simply - * write over the top. - */ - private void setAvailable(final long sequence) - { - setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); - } - - private void setAvailableBufferValue(int index, int flag) - { - AVAILABLE_ARRAY.setRelease(availableBuffer, index, flag); - } - - /** - * @see Sequencer#isAvailable(long) - */ - @Override - public boolean isAvailable(long sequence) - { - int index = calculateIndex(sequence); - int flag = calculateAvailabilityFlag(sequence); - return flag == (int) AVAILABLE_ARRAY.getVolatile(availableBuffer, index); - } - - @Override - public long getHighestPublishedSequence(long lowerBound, long availableSequence) - { - for (long sequence = lowerBound; sequence <= availableSequence; sequence++) - { - if (!isAvailable(sequence)) - { - return sequence - 1; - } - } - - return availableSequence; - } - - private int calculateAvailabilityFlag(final long sequence) - { - return (int) (sequence >>> indexShift); - } - - private int calculateIndex(final long sequence) - { - return ((int) sequence) & indexMask; - } -} diff --git a/src/main/java9/com/lmax/disruptor/Sequence.java b/src/main/java9/com/lmax/disruptor/Sequence.java deleted file mode 100644 index c61a1fea6..000000000 --- a/src/main/java9/com/lmax/disruptor/Sequence.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 2012 LMAX Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lmax.disruptor; - -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; - -class LhsPadding -{ - protected long p1, p2, p3, p4, p5, p6, p7; -} - -class Value extends LhsPadding -{ - protected volatile long value; -} - -class RhsPadding extends Value -{ - protected long p9, p10, p11, p12, p13, p14, p15; -} - -/** - *

Concurrent sequence class used for tracking the progress of - * the ring buffer and event processors. Support a number - * of concurrent operations including CAS and order writes. - * - *

Also attempts to be more efficient with regards to false - * sharing by adding padding around the volatile field. - */ -public class Sequence extends RhsPadding -{ - private static final VarHandle VALUE_FIELD; - static final long INITIAL_VALUE = -1L; - - static - { - try - { - VALUE_FIELD = MethodHandles.lookup().in(Sequence.class).findVarHandle(Sequence.class, "value", long.class); - } - catch (final Exception e) - { - throw new RuntimeException(e); - } - - } - - /** - * Create a sequence initialised to -1. - */ - public Sequence() - { - this(INITIAL_VALUE); - } - - /** - * Create a sequence with a specified initial value. - * - * @param initialValue The initial value for this sequence. - */ - public Sequence(final long initialValue) - { - VALUE_FIELD.setRelease(this, initialValue); - } - - /** - * Perform a volatile read of this sequence's value. - * - * @return The current value of the sequence. - */ - public long get() - { - return (long) (Long) VALUE_FIELD.getAcquire(this); - } - - /** - * Perform an ordered write of this sequence. The intent is - * a Store/Store barrier between this write and any previous - * store. - * - * @param value The new value for the sequence. - */ - public void set(final long value) - { - VALUE_FIELD.setRelease(this, value); - } - - /** - * Performs a volatile write of this sequence. The intent is - * a Store/Store barrier between this write and any previous - * write and a Store/Load barrier between this write and any - * subsequent volatile read. - * - * @param value The new value for the sequence. - */ - public void setVolatile(final long value) - { - VALUE_FIELD.setVolatile(this, value); - } - - /** - * Perform a compare and set operation on the sequence. - * - * @param expectedValue The expected current value. - * @param newValue The value to update to. - * @return true if the operation succeeds, false otherwise. - */ - public boolean compareAndSet(final long expectedValue, final long newValue) - { - return (boolean) VALUE_FIELD.compareAndSet(this, expectedValue, newValue); - } - - /** - * Atomically increment the sequence by one. - * - * @return The value after the increment - */ - public long incrementAndGet() - { - return addAndGet(1L); - } - - /** - * Atomically add the supplied value. - * - * @param increment The value to add to the sequence. - * @return The value after the increment. - */ - public long addAndGet(final long increment) - { - final long oldValue = (Long) VALUE_FIELD.getAndAdd(this, increment); - return oldValue + increment; - } - - @Override - public String toString() - { - return Long.toString(get()); - } -} diff --git a/src/main/java9/com/lmax/disruptor/util/Util.java b/src/main/java9/com/lmax/disruptor/util/Util.java deleted file mode 100644 index b782c1a9b..000000000 --- a/src/main/java9/com/lmax/disruptor/util/Util.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2011 LMAX Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lmax.disruptor.util; - -import com.lmax.disruptor.EventProcessor; -import com.lmax.disruptor.Sequence; -import sun.misc.Unsafe; - -import java.lang.reflect.Field; -import java.security.AccessController; -import java.security.PrivilegedExceptionAction; - -/** - * Set of common functions used by the Disruptor - */ -public final class Util -{ - /** - * Calculate the next power of 2, greater than or equal to x.

- * From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - * - * @param x Value to round up - * @return The next power of 2 from x inclusive - */ - public static int ceilingNextPowerOfTwo(final int x) - { - return 1 << (32 - Integer.numberOfLeadingZeros(x - 1)); - } - - /** - * Get the minimum sequence from an array of {@link com.lmax.disruptor.Sequence}s. - * - * @param sequences to compare. - * @return the minimum sequence found or Long.MAX_VALUE if the array is empty. - */ - public static long getMinimumSequence(final Sequence[] sequences) - { - return getMinimumSequence(sequences, Long.MAX_VALUE); - } - - /** - * Get the minimum sequence from an array of {@link com.lmax.disruptor.Sequence}s. - * - * @param sequences to compare. - * @param minimum an initial default minimum. If the array is empty this value will be - * returned. - * @return the smaller of minimum sequence value found in {@code sequences} and {@code minimum}; - * {@code minimum} if {@code sequences} is empty - */ - public static long getMinimumSequence(final Sequence[] sequences, long minimum) - { - for (int i = 0, n = sequences.length; i < n; i++) - { - long value = sequences[i].get(); - minimum = Math.min(minimum, value); - } - - return minimum; - } - - /** - * Get an array of {@link Sequence}s for the passed {@link EventProcessor}s - * - * @param processors for which to get the sequences - * @return the array of {@link Sequence}s - */ - public static Sequence[] getSequencesFor(final EventProcessor... processors) - { - Sequence[] sequences = new Sequence[processors.length]; - for (int i = 0; i < sequences.length; i++) - { - sequences[i] = processors[i].getSequence(); - } - - return sequences; - } - - /** - * Get a handle on the Unsafe instance, used for accessing low-level concurrency - * and memory constructs. - * - * @return The Unsafe - */ - public static Unsafe getUnsafe() - { - throw new UnsupportedOperationException(); - } - - /** - * Calculate the log base 2 of the supplied integer, essentially reports the location - * of the highest bit. - * - * @param i Value to calculate log2 for. - * @return The log2 value - */ - public static int log2(int i) - { - int r = 0; - while ((i >>= 1) != 0) - { - ++r; - } - return r; - } - - public static long awaitNanos(Object mutex, long timeoutNanos) throws InterruptedException - { - long millis = timeoutNanos / 1_000_000; - long nanos = timeoutNanos % 1_000_000; - - long t0 = System.nanoTime(); - mutex.wait(millis, (int) nanos); - long t1 = System.nanoTime(); - - return timeoutNanos - (t1 - t0); - } -}