From 611dcd2cf8d08a5764e931d0b7bc0c48f2d144a7 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Tue, 20 Aug 2019 20:37:34 +0800 Subject: [PATCH 1/3] [ARROW-6113][Java] Support vector deduplicate function --- .../deduplicate/DeduplicationUtils.java | 95 ++++++++++++ .../deduplicate/VectorRunDeduplicator.java | 109 ++++++++++++++ .../deduplicate/TestDeduplicationUtils.java | 136 ++++++++++++++++++ .../TestVectorRunDeduplicator.java | 132 +++++++++++++++++ 4 files changed, 472 insertions(+) create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestDeduplicationUtils.java create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestVectorRunDeduplicator.java diff --git a/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java new file mode 100644 index 00000000000..22558480a79 --- /dev/null +++ b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.algorithm.deduplicate; + +import org.apache.arrow.util.DataSizeRoundingUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.BitVectorHelper; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.compare.RangeEqualsVisitor; + +import io.netty.buffer.ArrowBuf; + +/** + * Utilities for vector deduplication. + */ +class DeduplicationUtils { + + /** + * Gets the start positions of the first distinct values in a vector. + * @param vector the target vector. + * @param runStarts the bit set to hold the start positions. + * @param vector type. + */ + public static void populateRunStartIndicators(V vector, ArrowBuf runStarts) { + int bufSize = DataSizeRoundingUtil.divideBy8Ceil(vector.getValueCount()); + Preconditions.checkArgument(runStarts.capacity() >= bufSize); + runStarts.setZero(0, bufSize); + + BitVectorHelper.setValidityBitToOne(runStarts, 0); + + for (int i = 1; i < vector.getValueCount(); i++) { + RangeEqualsVisitor visitor = new RangeEqualsVisitor(vector, i - 1, i, 1, false); + if (!visitor.equals(vector)) { + BitVectorHelper.setValidityBitToOne(runStarts, i); + } + } + } + + /** + * Gets the run lengths, given the start positions. + * @param runStarts the bit set for start positions. + * @param runLengths the run length vector to populate. + * @param valueCount the number of values in the bit set. + */ + public static void populateRunLengths(ArrowBuf runStarts, IntVector runLengths, int valueCount) { + int curStart = 0; + int lengthIndex = 0; + for (int i = 1; i < valueCount; i++) { + if (BitVectorHelper.get(runStarts, i) != 0) { + // we get a new distinct value + runLengths.setSafe(lengthIndex++, i - curStart); + curStart = i; + } + } + + // process the last value + runLengths.setSafe(lengthIndex++, valueCount - curStart); + runLengths.setValueCount(lengthIndex); + } + + /** + * Gets distinct values from the input vector by removing adjacent + * duplicated values. + * @param indicators the bit set containing the start positions of disctinct values. + * @param inputVector the input vector. + * @param outputVector the output vector. + * @param vector type. + */ + public static void populateDeduplicatedValues( + ArrowBuf indicators, V inputVector, V outputVector) { + int dstIdx = 0; + for (int srcIdx = 0; srcIdx < inputVector.getValueCount(); srcIdx++) { + if (BitVectorHelper.get(indicators, srcIdx) != 0) { + outputVector.copyFromSafe(srcIdx, dstIdx++, inputVector); + } + } + outputVector.setValueCount(dstIdx); + } +} diff --git a/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java new file mode 100644 index 00000000000..cad1db9d1ae --- /dev/null +++ b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.algorithm.deduplicate; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.DataSizeRoundingUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.BitVectorHelper; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.ValueVector; + +import io.netty.buffer.ArrowBuf; + +/** + * Remove adjacent equal elements from a vector. + * If the vector is sorted, it removes all duplicated values in the vector. + * @param vector type. + */ +public class VectorRunDeduplicator implements AutoCloseable { + + /** + * Bit set for distinct values. + * If the value at some index is not equal to the previous value, + * its bit is set to 1, otherwise its bit is set to 0. + */ + private ArrowBuf distinctValueBuffer; + + /** + * The vector to deduplicate. + */ + private final V vector; + + private final BufferAllocator allocator; + + /** + * Constructs a vector run deduplicator for a given vector. + * @param vector the given vector. + * @param allocator the allocator used for allocating buffers for start indices. + */ + public VectorRunDeduplicator(V vector, BufferAllocator allocator) { + this.vector = vector; + this.allocator = allocator; + } + + private void createDistinctValueBuffer() { + Preconditions.checkArgument(distinctValueBuffer == null); + int bufSize = DataSizeRoundingUtil.divideBy8Ceil(vector.getValueCount()); + distinctValueBuffer = allocator.buffer(bufSize); + DeduplicationUtils.populateRunStartIndicators(vector, distinctValueBuffer); + } + + /** + * Gets the number of values which are different from their predecessor. + * @return the run count. + */ + public int getRunCount() { + if (distinctValueBuffer == null) { + createDistinctValueBuffer(); + } + return vector.getValueCount() - BitVectorHelper.getNullCount(distinctValueBuffer, vector.getValueCount()); + } + + /** + * Gets the vector with deduplicated adjacent values removed. + * @param outVector the output vector. + */ + public void populateDeduplicatedValues(V outVector) { + if (distinctValueBuffer == null) { + createDistinctValueBuffer(); + } + + DeduplicationUtils.populateDeduplicatedValues(distinctValueBuffer, vector, outVector); + } + + /** + * Gets the length of each distinct value. + * @param lengthVector the vector for holding length values. + */ + public void populateRunLengths(IntVector lengthVector) { + if (distinctValueBuffer == null) { + createDistinctValueBuffer(); + } + + DeduplicationUtils.populateRunLengths(distinctValueBuffer, lengthVector, vector.getValueCount()); + } + + @Override + public void close() { + if (distinctValueBuffer != null) { + distinctValueBuffer.close(); + distinctValueBuffer = null; + } + } +} diff --git a/java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestDeduplicationUtils.java b/java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestDeduplicationUtils.java new file mode 100644 index 00000000000..52340f52624 --- /dev/null +++ b/java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestDeduplicationUtils.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.algorithm.deduplicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.DataSizeRoundingUtil; +import org.apache.arrow.vector.BitVectorHelper; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.netty.buffer.ArrowBuf; + +/** + * Test cases for {@link DeduplicationUtils}. + */ +public class TestDeduplicationUtils { + + private static final int VECTOR_LENGTH = 100; + + private static final int REPETITION_COUNT = 3; + + private BufferAllocator allocator; + + @Before + public void prepare() { + allocator = new RootAllocator(1024 * 1024); + } + + @After + public void shutdown() { + allocator.close(); + } + + @Test + public void testDeduplicateFixedWidth() { + try (IntVector origVec = new IntVector("original vec", allocator); + IntVector dedupVec = new IntVector("deduplicated vec", allocator); + IntVector lengthVec = new IntVector("length vec", allocator); + ArrowBuf distinctBuf = allocator.buffer( + DataSizeRoundingUtil.divideBy8Ceil(VECTOR_LENGTH * REPETITION_COUNT))) { + origVec.allocateNew(VECTOR_LENGTH * REPETITION_COUNT); + origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT); + lengthVec.allocateNew(); + + // prepare data + for (int i = 0; i < VECTOR_LENGTH; i++) { + for (int j = 0; j < REPETITION_COUNT; j++) { + origVec.set(i * REPETITION_COUNT + j, i); + } + } + + DeduplicationUtils.populateRunStartIndicators(origVec, distinctBuf); + assertEquals( VECTOR_LENGTH, + VECTOR_LENGTH * REPETITION_COUNT - + BitVectorHelper.getNullCount(distinctBuf, VECTOR_LENGTH * REPETITION_COUNT)); + + DeduplicationUtils.populateDeduplicatedValues(distinctBuf, origVec, dedupVec); + assertEquals(VECTOR_LENGTH, dedupVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertEquals(i, dedupVec.get(i)); + } + + DeduplicationUtils.populateRunLengths(distinctBuf, lengthVec, VECTOR_LENGTH * REPETITION_COUNT); + assertEquals(VECTOR_LENGTH, lengthVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertEquals(REPETITION_COUNT, lengthVec.get(i)); + } + } + } + + @Test + public void testDeduplicateVariableWidth() { + try (VarCharVector origVec = new VarCharVector("original vec", allocator); + VarCharVector dedupVec = new VarCharVector("deduplicated vec", allocator); + IntVector lengthVec = new IntVector("length vec", allocator); + ArrowBuf distinctBuf = allocator.buffer( + DataSizeRoundingUtil.divideBy8Ceil(VECTOR_LENGTH * REPETITION_COUNT))) { + origVec.allocateNew( + VECTOR_LENGTH * REPETITION_COUNT * 10, VECTOR_LENGTH * REPETITION_COUNT); + origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT); + lengthVec.allocateNew(); + + // prepare data + for (int i = 0; i < VECTOR_LENGTH; i++) { + String str = String.valueOf(i * i); + for (int j = 0; j < REPETITION_COUNT; j++) { + origVec.set(i * REPETITION_COUNT + j, str.getBytes()); + } + } + + DeduplicationUtils.populateRunStartIndicators(origVec, distinctBuf); + assertEquals(VECTOR_LENGTH, + VECTOR_LENGTH * REPETITION_COUNT - + BitVectorHelper.getNullCount(distinctBuf, VECTOR_LENGTH * REPETITION_COUNT)); + + DeduplicationUtils.populateDeduplicatedValues(distinctBuf, origVec, dedupVec); + assertEquals(VECTOR_LENGTH, dedupVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertArrayEquals(String.valueOf(i * i).getBytes(), dedupVec.get(i)); + } + + DeduplicationUtils.populateRunLengths( + distinctBuf, lengthVec, VECTOR_LENGTH * REPETITION_COUNT); + assertEquals(VECTOR_LENGTH, lengthVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertEquals(REPETITION_COUNT, lengthVec.get(i)); + } + } + } +} diff --git a/java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestVectorRunDeduplicator.java b/java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestVectorRunDeduplicator.java new file mode 100644 index 00000000000..4008c1e61f0 --- /dev/null +++ b/java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestVectorRunDeduplicator.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.algorithm.deduplicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; + +import org.apache.arrow.vector.VarCharVector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test cases for {@link VectorRunDeduplicator}. + */ +public class TestVectorRunDeduplicator { + + private static final int VECTOR_LENGTH = 100; + + private static final int REPETITION_COUNT = 3; + + private BufferAllocator allocator; + + @Before + public void prepare() { + allocator = new RootAllocator(1024 * 1024); + } + + @After + public void shutdown() { + allocator.close(); + } + + @Test + public void testDeduplicateFixedWidth() { + try (IntVector origVec = new IntVector("original vec", allocator); + IntVector dedupVec = new IntVector("deduplicated vec", allocator); + IntVector lengthVec = new IntVector("length vec", allocator); + VectorRunDeduplicator deduplicator = + new VectorRunDeduplicator<>(origVec, allocator)) { + origVec.allocateNew(VECTOR_LENGTH * REPETITION_COUNT); + origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT); + lengthVec.allocateNew(); + + // prepare data + for (int i = 0; i < VECTOR_LENGTH; i++) { + for (int j = 0; j < REPETITION_COUNT; j++) { + origVec.set(i * REPETITION_COUNT + j, i); + } + } + + int distinctCount = deduplicator.getRunCount(); + assertEquals(VECTOR_LENGTH, distinctCount); + + dedupVec.allocateNew(distinctCount); + + deduplicator.populateDeduplicatedValues(dedupVec); + assertEquals(VECTOR_LENGTH, dedupVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertEquals(i, dedupVec.get(i)); + } + + deduplicator.populateRunLengths(lengthVec); + assertEquals(VECTOR_LENGTH, lengthVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertEquals(REPETITION_COUNT, lengthVec.get(i)); + } + } + } + + @Test + public void testDeduplicateVariableWidth() { + try (VarCharVector origVec = new VarCharVector("original vec", allocator); + VarCharVector dedupVec = new VarCharVector("deduplicated vec", allocator); + IntVector lengthVec = new IntVector("length vec", allocator); + VectorRunDeduplicator deduplicator = + new VectorRunDeduplicator<>(origVec, allocator)) { + origVec.allocateNew( + VECTOR_LENGTH * REPETITION_COUNT * 10, VECTOR_LENGTH * REPETITION_COUNT); + origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT); + lengthVec.allocateNew(); + + // prepare data + for (int i = 0; i < VECTOR_LENGTH; i++) { + String str = String.valueOf(i * i); + for (int j = 0; j < REPETITION_COUNT; j++) { + origVec.set(i * REPETITION_COUNT + j, str.getBytes()); + } + } + + int distinctCount = deduplicator.getRunCount(); + assertEquals(VECTOR_LENGTH, distinctCount); + + dedupVec.allocateNew(distinctCount * 10, distinctCount); + + deduplicator.populateDeduplicatedValues(dedupVec); + assertEquals(VECTOR_LENGTH, dedupVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertArrayEquals(String.valueOf(i * i).getBytes(), dedupVec.get(i)); + } + + deduplicator.populateRunLengths(lengthVec); + assertEquals(VECTOR_LENGTH, lengthVec.getValueCount()); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + assertEquals(REPETITION_COUNT, lengthVec.get(i)); + } + } + } +} From bd1fabd4bd8f7167fe75490a49e7964f1fd24de5 Mon Sep 17 00:00:00 2001 From: emkornfield Date: Tue, 20 Aug 2019 23:03:56 -0700 Subject: [PATCH 2/3] Update VectorRunDeduplicator.java --- .../arrow/algorithm/deduplicate/VectorRunDeduplicator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java index cad1db9d1ae..491b1f41f05 100644 --- a/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java +++ b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java @@ -49,7 +49,7 @@ public class VectorRunDeduplicator implements AutoCloseab /** * Constructs a vector run deduplicator for a given vector. - * @param vector the given vector. + * @param vector the vector to dedulicate. Ownership is NOT taken. * @param allocator the allocator used for allocating buffers for start indices. */ public VectorRunDeduplicator(V vector, BufferAllocator allocator) { From c53cbdb8085c88580252d6e3f965a7e9d98c5a42 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Fri, 23 Aug 2019 14:49:05 +0800 Subject: [PATCH 3/3] [ARROW-6113][Java] Add comment for parameters --- .../apache/arrow/algorithm/deduplicate/DeduplicationUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java index 22558480a79..a9fe328125f 100644 --- a/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java +++ b/java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java @@ -45,7 +45,8 @@ public static void populateRunStartIndicators(V vector, BitVectorHelper.setValidityBitToOne(runStarts, 0); for (int i = 1; i < vector.getValueCount(); i++) { - RangeEqualsVisitor visitor = new RangeEqualsVisitor(vector, i - 1, i, 1, false); + RangeEqualsVisitor visitor = new RangeEqualsVisitor( + vector, i - 1, i, /* length */1, /* need check type*/false); if (!visitor.equals(vector)) { BitVectorHelper.setValidityBitToOne(runStarts, i); }