Skip to content

Conversation

@kbendick
Copy link
Contributor

@kbendick kbendick commented Aug 12, 2022

Adds a bucket function that performs the Iceberg partition transformation to the Spark FunctionCatalog, for usage from SQL and for usage with storage partitioned joins.

This closes #5349

@kbendick kbendick force-pushed the kb-add-spark-bucket-function-squash branch 2 times, most recently from 24a8c1d to 18e5904 Compare August 12, 2022 18:18
@kbendick kbendick changed the title Spark - Add Buket Function for FunctionCatalog Spark - Add Bucket Function for FunctionCatalog Aug 12, 2022
@kbendick kbendick marked this pull request as ready for review August 12, 2022 18:20
@kbendick kbendick changed the title Spark - Add Bucket Function for FunctionCatalog Spark 3.3 - Support bucket in FunctionCatalog Aug 12, 2022
Comment on lines 234 to 238
public Integer produceResult(InternalRow input) {
// return null for null input to match what Spark does in the code-generated versions.
return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
? null
: invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getUTF8String(VALUE_ORDINAL));
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason when produceResult was defined in the super class, Spark complained that it was not defined and errored out.

So I've moved all of the logic to each subclass.

@kbendick kbendick force-pushed the kb-add-spark-bucket-function-squash branch from 06207b3 to 4634d0e Compare August 12, 2022 18:36
@kbendick kbendick force-pushed the kb-add-spark-bucket-function-squash branch from 4eebf93 to bb29cc3 Compare August 12, 2022 22:05
.hashBytes(
value.array(),
value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining())
Copy link
Contributor

@rdblue rdblue Aug 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this isn't correct. It has been wrong for years, evidently.

HashFunction.hashBytes accepts a length. value.remaining() is that length.

Looks like this was working because arrayOffset was never non-zero. That's because the only way to get a ByteBuffer with a non-zero arrayOffset (as far as I can tell) is to use ByteBuffer.slice(), which creates a copy of the ByteBuffer and sets it. Since slice doesn't allow setting the position and limit, everywhere that I've been able to find uses duplicate() and then sets position and limit because there's no need to limit the start or capacity of the ByteBuffer when the backing array is not limited. Allocation and wrapping byte arrays always produces arrayOffset=0.

Here's a test that catches this. @kbendick, can you add this to TestBucketing along with the fix here (remove value.arrayOffset())?

  @Test
  public void testByteBufferOnHeapArrayOffset() {
    byte[] bytes = randomBytes(128);
    ByteBuffer raw = ByteBuffer.wrap(bytes, 5, 100);
    ByteBuffer buffer = raw.slice();
    Assert.assertEquals("Buffer arrayOffset should be 5", 5, buffer.arrayOffset());

    Bucket<ByteBuffer> bucketFunc = Bucket.get(Types.BinaryType.get(), 100);

    Assert.assertEquals(
        "HeapByteBuffer hash should match hash for correct slice",
        hashBytes(bytes, 5, 100),
        bucketFunc.hash(buffer));

    // verify that the buffer was not modified
    Assert.assertEquals("Buffer position should be 0", 0, buffer.position());
    Assert.assertEquals("Buffer limit should not change", 100, buffer.limit());
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah absolutely I'll add this test this evening.

I was potentially going to use a different hash function for the byte[] that Spark passes (or investigate it at least), but I'll make this change this evening!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the fix and the test.

I can put it in a separate PR if we think that would be better for people who cherry-pick.

return (BucketUtil.hashDecimal(value.toJavaBigDecimal()) & Integer.MAX_VALUE) % numBuckets;
}

public BucketDecimal(int precision, int scale) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hash function is actually independent of precision and scale. It looks like the only reason to pass precision and scale (other than to preserve the input type) is for the canonical name. I think instead this can pass in the Spark type and pass that back through inputTypes. The canonical name should be iceberg.bucket(decimal).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hash function is indeed independent of precision and scale. I've updated the canonical name.

But we do also need the precision and scale to get the Decimal value in produceResult. And it seems we need it for creating the correct inputType as mentioned. Let me see about not passing those in and passing in the spark type instead.

public void testBucketIntegers() {
Assert.assertEquals(
"Byte type should bucket similarly to integer",
3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably validate the result against the transform result. That seems safer to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test suite, testValuesFromSpec, that tests against the hashed values.

I could add more tests against just the transform result if we want now but we do at least have tests for the hashed transform!

Assert.assertEquals(10, scalarSql("SELECT system.bucket(12, %s)", asBytesLiteral("abcdefg")));
Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, %s)", asBytesLiteral("abc\0\0")));
Assert.assertEquals(42, scalarSql("SELECT system.bucket(48, %s)", asBytesLiteral("abc")));
Assert.assertEquals(3, scalarSql("SELECT system.bucket(16, %s)", asBytesLiteral("测试_")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these all of the tests from the bucket function tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres not that many test cases in the Bucket transform iirc.

I'll be sure to grab all of the test cases that are in the bucket transform to ensure consistency, in addition to checking the transform result instead of just the final bucketed result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction. The bucket function test suite uses a seeded random to generate data.

I did however test the output of all of the values from the spec against their hashed output. Let me know if that's sufficient or if I should update these to be against the hash output (or both).

@kbendick kbendick force-pushed the kb-add-spark-bucket-function-squash branch from bb29cc3 to f14c2e7 Compare August 15, 2022 18:09
@kbendick kbendick force-pushed the kb-add-spark-bucket-function-squash branch from 009a27e to 1fb393d Compare August 15, 2022 18:52
…etInteger, where we explicitly want Spark to cast short and byte to integer type
@kbendick
Copy link
Contributor Author

@rdblue is there anything else you want me to update before this is ready to merge or are we just giving other time for people to review?

Also cc @huaxingao @aokolnychyi the Spark bucket function that can be used for storage partitioned joins on bucket columns. PTAL. 🙂

}

// TODO - We can probably hash the bytes directly given they're already UTF-8 input.
return apply(numBuckets, hash(value.toString()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since this is already a UTF-8 String we can hash the bytes directly instead of converting to Java String which is UTF-16 (for CharSequence) -> converting to UTF-8 bytes -> hashing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this might make the testing of the hash function harder. Let me check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah if we change hash here to be public static int hash(ByteBuffer value) we could then return apply(numBuckets, hash(value.getByteBuffer())); which returns the same results.

I think this change is worth it, thought it makes the tests slightly wonkier for the test suite that checks the hash function output directly as we have to call ByteBuffer.wrap("iceberg".getBytes("UTF-8")).

But it would arguably make the bucket string function faster on UTF8String input which is what will get passed in at runtime.

@rdblue
Copy link
Contributor

rdblue commented Aug 18, 2022

@kbendick, I think I was waiting for tests to pass. Looks good now.

@rdblue rdblue merged commit 69bcf05 into apache:master Aug 18, 2022
@kbendick kbendick deleted the kb-add-spark-bucket-function-squash branch August 18, 2022 16:54
zhongyujiang pushed a commit to zhongyujiang/iceberg that referenced this pull request Apr 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Spark’s FunctionCatalog for Existing Transformations

2 participants