Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
931adc6
Spark - Add Bucket Function for FunctionCatalog
kbendick Aug 12, 2022
a8439ed
Rename BucketHashUtil to BucketUtil and name methods hash instead
kbendick Aug 12, 2022
7365439
Make order of functions consistent for all subclasses
kbendick Aug 12, 2022
c2f0e9f
Give a concrete example with correct returned value for BucketFunctio…
kbendick Aug 12, 2022
6239d9c
Add note about missing UUID type
kbendick Aug 12, 2022
f14c2e7
Remove redundant comments
kbendick Aug 12, 2022
2f1680c
Update BucketUtil to name all functions hash and use primitive types …
kbendick Aug 15, 2022
15c5347
Remove Spark BucketFloat given that bucketing by float is disallowed …
kbendick Aug 15, 2022
f82f8e4
Fix hashBytes for slice with non-zero offset and add test
kbendick Aug 15, 2022
8907a6e
Remove comment regarding UUID type as that's not supported by spark
kbendick Aug 15, 2022
f4088e4
Update error message for invalid bucket column type
kbendick Aug 15, 2022
3009b86
Remove test for magic function being invoked when using tinyint or sh…
kbendick Aug 15, 2022
82df057
Update canonical name of decimal bucket function as precision and sca…
kbendick Aug 15, 2022
54bc337
Add static apply function in base class for handling post-hash logic
kbendick Aug 15, 2022
5ae07eb
Use static base class apply method for the post-hash bucketing logic …
kbendick Aug 15, 2022
df69f83
Pass in sqlType for BucketDecimal to be consistent with the others, a…
kbendick Aug 15, 2022
1fb393d
Pass in type directly to all Bucket function constructors except Buck…
kbendick Aug 15, 2022
e030d6b
Add all examples from the spec to the tests and check just the hashed…
kbendick Aug 15, 2022
117e840
Checkstyle
kbendick Aug 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 9 additions & 31 deletions api/src/main/java/org/apache/iceberg/transforms/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.expressions.BoundPredicate;
Expand All @@ -38,6 +37,7 @@
import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BucketUtil;

abstract class Bucket<T> implements Transform<T, Integer> {
private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
Expand Down Expand Up @@ -166,7 +166,7 @@ private BucketInteger(int numBuckets) {

@Override
public int hash(Integer value) {
return MURMUR3.hashLong(value.longValue()).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -182,7 +182,7 @@ private BucketLong(int numBuckets) {

@Override
public int hash(Long value) {
return MURMUR3.hashLong(value).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -202,7 +202,7 @@ static class BucketFloat extends Bucket<Float> {

@Override
public int hash(Float value) {
return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -220,7 +220,7 @@ static class BucketDouble extends Bucket<Double> {

@Override
public int hash(Double value) {
return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -236,7 +236,7 @@ private BucketString(int numBuckets) {

@Override
public int hash(CharSequence value) {
return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -254,24 +254,7 @@ private BucketByteBuffer(int numBuckets) {

@Override
public int hash(ByteBuffer value) {
if (value.hasArray()) {
return MURMUR3
.hashBytes(
value.array(),
value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining())
.asInt();
} else {
int position = value.position();
byte[] copy = new byte[value.remaining()];
try {
value.get(copy);
} finally {
// make sure the buffer position is unchanged
value.position(position);
}
return MURMUR3.hashBytes(copy).asInt();
}
return BucketUtil.hash(value);
}

@Override
Expand All @@ -287,12 +270,7 @@ private BucketUUID(int numBuckets) {

@Override
public int hash(UUID value) {
return MURMUR3
.newHasher(16)
.putLong(Long.reverseBytes(value.getMostSignificantBits()))
.putLong(Long.reverseBytes(value.getLeastSignificantBits()))
.hash()
.asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -308,7 +286,7 @@ private BucketDecimal(int numBuckets) {

@Override
public int hash(BigDecimal value) {
return MURMUR3.hashBytes(value.unscaledValue().toByteArray()).asInt();
return BucketUtil.hash(value);
}

@Override
Expand Down
88 changes: 88 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/BucketUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
import org.apache.iceberg.relocated.com.google.common.hash.Hashing;

/**
* Contains the logic for hashing various types for use with the {@code bucket} partition
* transformations
*/
public class BucketUtil {

private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();

private BucketUtil() {}

public static int hash(int value) {
return MURMUR3.hashLong((long) value).asInt();
}

public static int hash(long value) {
return MURMUR3.hashLong(value).asInt();
}

public static int hash(float value) {
return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
}

public static int hash(double value) {
return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
}

public static int hash(CharSequence value) {
return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
}

public static int hash(ByteBuffer value) {
if (value.hasArray()) {
return MURMUR3
.hashBytes(value.array(), value.arrayOffset() + value.position(), value.remaining())
.asInt();
} else {
int position = value.position();
byte[] copy = new byte[value.remaining()];
try {
value.get(copy);
} finally {
// make sure the buffer position is unchanged
value.position(position);
}
return MURMUR3.hashBytes(copy).asInt();
}
}

public static int hash(UUID value) {
return MURMUR3
.newHasher(16)
.putLong(Long.reverseBytes(value.getMostSignificantBits()))
.putLong(Long.reverseBytes(value.getLeastSignificantBits()))
.hash()
.asInt();
}

public static int hash(BigDecimal value) {
return MURMUR3.hashBytes(value.unscaledValue().toByteArray()).asInt();
}
}
19 changes: 19 additions & 0 deletions api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,25 @@ public void testByteBufferOnHeap() {
Assert.assertEquals("Buffer limit should not change", 105, buffer.limit());
}

@Test
public void testByteBufferOnHeapArrayOffset() {
byte[] bytes = randomBytes(128);
ByteBuffer raw = ByteBuffer.wrap(bytes, 5, 100);
ByteBuffer buffer = raw.slice();
Assert.assertEquals("Buffer arrayOffset should be 5", 5, buffer.arrayOffset());

Bucket<ByteBuffer> bucketFunc = Bucket.get(Types.BinaryType.get(), 100);

Assert.assertEquals(
"HeapByteBuffer hash should match hash for correct slice",
hashBytes(bytes, 5, 100),
bucketFunc.hash(buffer));

// verify that the buffer was not modified
Assert.assertEquals("Buffer position should be 0", 0, buffer.position());
Assert.assertEquals("Buffer limit should not change", 100, buffer.limit());
}

@Test
public void testByteBufferOffHeap() {
byte[] bytes = randomBytes(128);
Expand Down
Loading