Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.io.Serializable;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.transforms.Transform;

/**
* A struct of partition values.
* <p>
* Instances of this class can produce partition values from a data row passed to {@link #partition(StructLike)}.
*/
public class PartitionKey implements StructLike, Serializable {

private final PartitionSpec spec;
private final int size;
private final Object[] partitionTuple;
private final Transform[] transforms;
private final Accessor<StructLike>[] accessors;

@SuppressWarnings("unchecked")
public PartitionKey(PartitionSpec spec, Schema inputSchema) {
this.spec = spec;

List<PartitionField> fields = spec.fields();
this.size = fields.size();
this.partitionTuple = new Object[size];
this.transforms = new Transform[size];
this.accessors = (Accessor<StructLike>[]) Array.newInstance(Accessor.class, size);

Schema schema = spec.schema();
for (int i = 0; i < size; i += 1) {
PartitionField field = fields.get(i);
Accessor<StructLike> accessor = inputSchema.accessorForField(field.sourceId());
Preconditions.checkArgument(accessor != null,
"Cannot build accessor for field: " + schema.findField(field.sourceId()));
this.accessors[i] = accessor;
this.transforms[i] = field.transform();
}
}

private PartitionKey(PartitionKey toCopy) {
this.spec = toCopy.spec;
this.size = toCopy.size;
this.partitionTuple = new Object[toCopy.partitionTuple.length];
this.transforms = toCopy.transforms;
this.accessors = toCopy.accessors;

System.arraycopy(toCopy.partitionTuple, 0, this.partitionTuple, 0, partitionTuple.length);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
for (int i = 0; i < partitionTuple.length; i += 1) {
if (i > 0) {
sb.append(", ");
}
sb.append(partitionTuple[i]);
}
sb.append("]");
return sb.toString();
}

public PartitionKey copy() {
return new PartitionKey(this);
}

public String toPath() {
return spec.partitionToPath(this);
}

/**
* Replace this key's partition values with the partition values for the row.
*
* @param row a StructLike row
*/
@SuppressWarnings("unchecked")
public void partition(StructLike row) {
for (int i = 0; i < partitionTuple.length; i += 1) {
Transform<Object, Object> transform = transforms[i];
partitionTuple[i] = transform.apply(accessors[i].get(row));
}
}

@Override
public int size() {
return size;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(partitionTuple[pos]);
}

@Override
public <T> void set(int pos, T value) {
partitionTuple[pos] = value;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (!(o instanceof PartitionKey)) {
return false;
}

PartitionKey that = (PartitionKey) o;
return Arrays.equals(partitionTuple, that.partitionTuple);
}

@Override
public int hashCode() {
return Arrays.hashCode(partitionTuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.FileAppender;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import java.nio.ByteBuffer;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.iceberg.StructLike;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* Class to adapt a Spark {@code InternalRow} to Iceberg {@link StructLike} for uses like
* {@link org.apache.iceberg.PartitionKey#partition(StructLike)}
*/
class InternalRowWrapper implements StructLike {
private final DataType[] types;
private final BiFunction<InternalRow, Integer, ?>[] getters;
private InternalRow row = null;

@SuppressWarnings("unchecked")
InternalRowWrapper(StructType rowType) {
this.types = Stream.of(rowType.fields())
.map(StructField::dataType)
.toArray(DataType[]::new);
this.getters = Stream.of(types)
.map(InternalRowWrapper::getter)
.toArray(BiFunction[]::new);
}

InternalRowWrapper wrap(InternalRow internalRow) {
this.row = internalRow;
return this;
}

@Override
public int size() {
return types.length;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
if (row.isNullAt(pos)) {
return null;
} else if (getters[pos] != null) {
return javaClass.cast(getters[pos].apply(row, pos));
}

return javaClass.cast(row.get(pos, types[pos]));
}

@Override
public <T> void set(int pos, T value) {
row.update(pos, value);
}

private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
if (type instanceof StringType) {
return (row, pos) -> row.getUTF8String(pos).toString();
} else if (type instanceof DecimalType) {
DecimalType decimal = (DecimalType) type;
return (row, pos) ->
row.getDecimal(pos, decimal.precision(), decimal.scale()).toJavaBigDecimal();
} else if (type instanceof BinaryType) {
return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
} else if (type instanceof StructType) {
StructType structType = (StructType) type;
InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos, structType.size()));
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
Expand Down
Loading