Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions api/src/main/java/org/apache/iceberg/transforms/Truncate.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.TruncateUtil;
import org.apache.iceberg.util.UnicodeUtil;

abstract class Truncate<T> implements Transform<T, T> {
Expand Down Expand Up @@ -87,7 +88,7 @@ public Integer apply(Integer value) {
return null;
}

return value - (((value % width) + width) % width);
return TruncateUtil.truncateInt(width, value);
}

@Override
Expand Down Expand Up @@ -171,7 +172,7 @@ public Long apply(Long value) {
return null;
}

return value - (((value % width) + width) % width);
return TruncateUtil.truncateLong(width, value);
}

@Override
Expand Down Expand Up @@ -391,9 +392,7 @@ public ByteBuffer apply(ByteBuffer value) {
return null;
}

ByteBuffer ret = value.duplicate();
ret.limit(Math.min(value.limit(), value.position() + length));
return ret;
return TruncateUtil.truncateByteBuffer(length, value);
Copy link
Contributor Author

@kbendick kbendick Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's BinaryUtil#truncateBinary(ByteBuffer, int), but it doesn't exactly have the same code as is used here and it does a Preconditions check on the width on every call to it, which isn't needed here.

I can try to merge the two implementations in a follow up if we want.

}

@Override
Expand Down Expand Up @@ -480,16 +479,7 @@ public BigDecimal apply(BigDecimal value) {
return null;
}

BigDecimal remainder =
new BigDecimal(
value
.unscaledValue()
.remainder(unscaledWidth)
.add(unscaledWidth)
.remainder(unscaledWidth),
value.scale());

return value.subtract(remainder);
return TruncateUtil.truncateDecimal(unscaledWidth, value);
}

@Override
Expand Down
77 changes: 77 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/TruncateUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;

/**
* Contains the logic for various {@code truncate} transformations for various types.
*
* <p>This utility class allows for the logic to be reused in different scenarios where input
* validation is done at different times either in org.apache.iceberg.transforms.Truncate and within
* defined SQL functions for different compute engines for usage in SQL.
*
* <p>In general, the inputs to the functions should have already been validated by the calling
* code, as different classes use truncate with different preprocessing. This generally means that
* the truncation width is positive and the value to truncate is non-null.
*
* <p>See also {@linkplain UnicodeUtil#truncateString(CharSequence, int)} and {@link
* BinaryUtil#truncateBinary(ByteBuffer, int)}
*/
public class TruncateUtil {

private TruncateUtil() {}

public static ByteBuffer truncateByteBuffer(int width, ByteBuffer value) {
ByteBuffer ret = value.duplicate();
ret.limit(Math.min(value.limit(), value.position() + width));
return ret;
}

public static byte truncateByte(int width, byte value) {
return (byte) (value - (((value % width) + width) % width));
}

public static short truncateShort(int width, short value) {
return (short) (value - (((value % width) + width) % width));
}

public static int truncateInt(int width, int value) {
return value - (((value % width) + width) % width);
}

public static long truncateLong(int width, long value) {
return value - (((value % width) + width) % width);
}

public static BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) {
BigDecimal remainder =
new BigDecimal(
value
.unscaledValue()
.remainder(unscaledWidth)
.add(unscaledWidth)
.remainder(unscaledWidth),
value.scale());

return value.subtract(remainder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.spark.functions.SparkFunctions;
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;

abstract class BaseCatalog
implements StagingTableCatalog, ProcedureCatalog, SupportsNamespaces, HasIcebergCatalog {
implements StagingTableCatalog,
ProcedureCatalog,
SupportsNamespaces,
HasIcebergCatalog,
FunctionCatalog {

@Override
public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException {
Expand All @@ -47,4 +56,37 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException

throw new NoSuchProcedureException(ident);
}

@Override
public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
if (namespace.length == 0
|| (namespace.length == 1 && namespace[0].equalsIgnoreCase("system"))) {
return SparkFunctions.list().stream()
.map(name -> Identifier.of(namespace, name))
.toArray(Identifier[]::new);
} else if (namespaceExists(namespace)) {
return new Identifier[0];
}

throw new NoSuchNamespaceException(namespace);
}

@Override
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
String[] namespace = ident.namespace();
String name = ident.name();

// Allow for empty namespace as Spark's storage partitioned joins look up
// the corresponding functions to generate transforms for partitioning
// with an empty namespace, such as `bucket`.
if (namespace.length == 0
|| (namespace.length == 1 && namespace[0].equalsIgnoreCase("system"))) {
UnboundFunction func = SparkFunctions.load(name);
if (func != null) {
return func;
}
}

throw new NoSuchFunctionException(ident);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
Expand All @@ -42,7 +41,6 @@
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand Down Expand Up @@ -379,14 +377,4 @@ public Catalog icebergCatalog() {
"Cannot return underlying Iceberg Catalog, wrapped catalog does not contain an Iceberg Catalog");
return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
}

@Override
public Identifier[] listFunctions(String[] namespace) {
return new Identifier[0];
}

@Override
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
throw new NoSuchFunctionException(ident);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.functions;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;

public class SparkFunctions {

private SparkFunctions() {}

private static final Map<String, UnboundFunction> FUNCTIONS =
ImmutableMap.of("truncate", new TruncateFunction());

private static final List<String> FUNCTION_NAMES = ImmutableList.copyOf(FUNCTIONS.keySet());

// Functions that are added to all Iceberg catalogs should be accessed with either the `system`
// namespace or no namespace at all, so a list of names alone is returned.
public static List<String> list() {
return FUNCTION_NAMES;
}

public static UnboundFunction load(String name) {
// function resolution is case insensitive to match the existing Spark behavior for functions
UnboundFunction func = FUNCTIONS.get(name.toLowerCase(Locale.ROOT));
return func;
}
}
Loading