-
Notifications
You must be signed in to change notification settings - Fork 40
[TFMonhHx] Forces new version of netty #450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a592576
2f0a268
68b2e38
1383134
5bbe3ea
4139cd7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,13 +49,22 @@ dependencies { | |
| implementation group: 'com.opencsv', name: 'opencsv', version: '5.7.1' | ||
| implementation group: 'org.roaringbitmap', name: 'RoaringBitmap', version: '0.7.17' | ||
| implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' | ||
| implementation group: 'org.apache.arrow', name: 'arrow-vector', version: '10.0.1', { | ||
|
|
||
| def arrowExclusions = { | ||
| exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' | ||
| exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' | ||
| exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' | ||
| exclude group: 'io.netty', module: 'netty-common' | ||
| } | ||
|
|
||
| implementation group: 'org.apache.arrow', name: 'arrow-vector', version: '12.0.1', arrowExclusions | ||
| implementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '12.0.1', arrowExclusions | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't forget to update LICENSES :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This puzzled me cause I thought the CI was checking for those because of the work you did. I think you were right we need to add an extra step: #451
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm this is weird, because I tested it out by pushing with missing parts and it had worked then 🤔 |
||
| implementation group: 'io.netty', name: 'netty-buffer', { | ||
| version { | ||
| strictly '4.1.94.Final' | ||
| } | ||
| } | ||
|
|
||
| // These will be dependencies not packaged with the .jar | ||
| // They need to be provided either through the database or in an extra .jar | ||
| compileOnly group: 'org.neo4j', name: 'neo4j', version: neo4jVersionEffective | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,278 @@ | ||
| /* | ||
| * Copyright 2012 The Netty Project | ||
| * | ||
| * The Netty Project licenses this file to you under the Apache License, | ||
| * version 2.0 (the "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at: | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package io.netty.buffer; | ||
|
|
||
| import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; | ||
|
|
||
| import java.lang.reflect.Field; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import org.apache.arrow.memory.OutOfMemoryException; | ||
| import org.apache.arrow.memory.util.LargeMemoryUtil; | ||
|
|
||
| import io.netty.util.internal.OutOfDirectMemoryError; | ||
| import io.netty.util.internal.StringUtil; | ||
|
|
||
| /** | ||
| * The base allocator that we use for all of Arrow's memory management. Returns | ||
| * UnsafeDirectLittleEndian buffers. | ||
| */ | ||
| public class PooledByteBufAllocatorL { | ||
|
|
||
| private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); | ||
|
|
||
| private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; | ||
| public final UnsafeDirectLittleEndian empty; | ||
| private final AtomicLong hugeBufferSize = new AtomicLong(0); | ||
| private final AtomicLong hugeBufferCount = new AtomicLong(0); | ||
| private final AtomicLong normalBufferSize = new AtomicLong(0); | ||
| private final AtomicLong normalBufferCount = new AtomicLong(0); | ||
| private final InnerAllocator allocator; | ||
|
|
||
| public PooledByteBufAllocatorL() { | ||
| allocator = new InnerAllocator(); | ||
| empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size. | ||
| */ | ||
| public UnsafeDirectLittleEndian allocate(long size) { | ||
| try { | ||
| return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE); | ||
| } catch (OutOfMemoryError e) { | ||
| /* | ||
| * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by | ||
| * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by | ||
| * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice | ||
| * as Netty is expected to throw an OutOfDirectMemoryError first. | ||
| */ | ||
| if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) { | ||
| throw new OutOfMemoryException("Failure allocating buffer.", e); | ||
| } | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| public int getChunkSize() { | ||
| return allocator.chunkSize; | ||
| } | ||
|
|
||
| public long getHugeBufferSize() { | ||
| return hugeBufferSize.get(); | ||
| } | ||
|
|
||
| public long getHugeBufferCount() { | ||
| return hugeBufferCount.get(); | ||
| } | ||
|
|
||
| public long getNormalBufferSize() { | ||
| return normalBufferSize.get(); | ||
| } | ||
|
|
||
| public long getNormalBufferCount() { | ||
| return normalBufferSize.get(); | ||
| } | ||
|
|
||
| private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { | ||
|
|
||
| private final long initialCapacity; | ||
| private final AtomicLong count; | ||
| private final AtomicLong size; | ||
|
|
||
| private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) { | ||
| super(buf); | ||
| this.initialCapacity = buf.capacity(); | ||
| this.count = count; | ||
| this.size = size; | ||
| } | ||
|
|
||
| private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, | ||
| AtomicLong size) { | ||
| super(buf); | ||
| this.initialCapacity = buf.capacity(); | ||
| this.count = count; | ||
| this.size = size; | ||
| } | ||
|
|
||
| @Override | ||
| public ByteBuf copy() { | ||
| throw new UnsupportedOperationException("copy method is not supported"); | ||
| } | ||
|
|
||
| @Override | ||
| public ByteBuf copy(int index, int length) { | ||
| throw new UnsupportedOperationException("copy method is not supported"); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean release(int decrement) { | ||
| boolean released = super.release(decrement); | ||
| if (released) { | ||
| count.decrementAndGet(); | ||
| size.addAndGet(-initialCapacity); | ||
| } | ||
| return released; | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private class InnerAllocator extends PooledByteBufAllocator { | ||
|
|
||
| private final PoolArena<ByteBuffer>[] directArenas; | ||
| private final MemoryStatusThread statusThread; | ||
| private final int chunkSize; | ||
|
|
||
| public InnerAllocator() { | ||
| super(true); | ||
|
|
||
| try { | ||
| Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); | ||
| f.setAccessible(true); | ||
| this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); | ||
| } | ||
|
|
||
| this.chunkSize = directArenas[0].chunkSize; | ||
|
|
||
| if (memoryLogger.isTraceEnabled()) { | ||
| statusThread = new MemoryStatusThread(); | ||
| statusThread.start(); | ||
| } else { | ||
| statusThread = null; | ||
| } | ||
| } | ||
|
|
||
| private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { | ||
| PoolArenasCache cache = threadCache(); | ||
| PoolArena<ByteBuffer> directArena = cache.directArena; | ||
|
|
||
| if (directArena != null) { | ||
|
|
||
| if (initialCapacity > directArena.chunkSize) { | ||
| // This is beyond chunk size so we'll allocate separately. | ||
| ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); | ||
|
|
||
| hugeBufferSize.addAndGet(buf.capacity()); | ||
| hugeBufferCount.incrementAndGet(); | ||
|
|
||
| // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); | ||
| return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, | ||
| hugeBufferSize); | ||
| } else { | ||
| // within chunk, use arena. | ||
| ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); | ||
| if (!(buf instanceof PooledUnsafeDirectByteBuf)) { | ||
| fail(); | ||
| } | ||
|
|
||
| if (!ASSERT_ENABLED) { | ||
| return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf); | ||
| } | ||
|
|
||
| normalBufferSize.addAndGet(buf.capacity()); | ||
| normalBufferCount.incrementAndGet(); | ||
|
|
||
| return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, | ||
| normalBufferCount, normalBufferSize); | ||
| } | ||
|
|
||
| } else { | ||
| throw fail(); | ||
| } | ||
| } | ||
|
|
||
| private UnsupportedOperationException fail() { | ||
| return new UnsupportedOperationException( | ||
| "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " + | ||
| "didn't provide that functionality."); | ||
| } | ||
|
|
||
| @Override | ||
| public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { | ||
| if (initialCapacity == 0 && maxCapacity == 0) { | ||
| newDirectBuffer(initialCapacity, maxCapacity); | ||
| } | ||
| validate(initialCapacity, maxCapacity); | ||
| return newDirectBufferL(initialCapacity, maxCapacity); | ||
| } | ||
|
|
||
| @Override | ||
| public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { | ||
| throw new UnsupportedOperationException("Arrow doesn't support using heap buffers."); | ||
| } | ||
|
|
||
|
|
||
| private void validate(int initialCapacity, int maxCapacity) { | ||
| if (initialCapacity < 0) { | ||
| throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)"); | ||
| } | ||
| if (initialCapacity > maxCapacity) { | ||
| throw new IllegalArgumentException(String.format( | ||
| "initialCapacity: %d (expected: not greater than maxCapacity(%d)", | ||
| initialCapacity, maxCapacity)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| StringBuilder buf = new StringBuilder(); | ||
| buf.append(directArenas.length); | ||
| buf.append(" direct arena(s):"); | ||
| buf.append(StringUtil.NEWLINE); | ||
| for (PoolArena<ByteBuffer> a : directArenas) { | ||
| buf.append(a); | ||
| } | ||
|
|
||
| buf.append("Large buffers outstanding: "); | ||
| buf.append(hugeBufferCount.get()); | ||
| buf.append(" totaling "); | ||
| buf.append(hugeBufferSize.get()); | ||
| buf.append(" bytes."); | ||
| buf.append('\n'); | ||
| buf.append("Normal buffers outstanding: "); | ||
| buf.append(normalBufferCount.get()); | ||
| buf.append(" totaling "); | ||
| buf.append(normalBufferSize.get()); | ||
| buf.append(" bytes."); | ||
| return buf.toString(); | ||
| } | ||
|
|
||
| private class MemoryStatusThread extends Thread { | ||
|
|
||
| public MemoryStatusThread() { | ||
| super("allocation.logger"); | ||
| this.setDaemon(true); | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| while (true) { | ||
| memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString()); | ||
| try { | ||
| Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); | ||
| } catch (InterruptedException e) { | ||
| return; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any hint on why do we get two versions in the LICENSES @gem-neo4j even if we are overwritting with the most recent one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LICENSE file is a mix of core and common, so I believe common doesn't have 4.1.94 yet in its dependency tree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be from test-utils actually, I see arrow is on 10.0.1 there, should that be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that, I've tried overriding there as well. This should heal automatically anyway when the database updates to 4.1.94
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, should do :)