Skip to content

Commit

Permalink
Thanks Anton!
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed May 23, 2023
1 parent ca63c4b commit 14b1b65
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 25 deletions.
99 changes: 99 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/PartitionProjection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

public interface PartitionProjection extends StructLike {

PartitionProjection create(PartitionSpec spec, Types.StructType commonPartitionType);

PartitionProjection wrap(StructLike newStruct);

static PartitionProjection of(PartitionSpec spec, Types.StructType commonPartitionType) {
if (spec.isUnpartitioned()) {
return UnpartitionedSpecProjection.INSTANCE;
} else {
return new PartitionedSpecProjection().create(spec, commonPartitionType);
}
}

class UnpartitionedSpecProjection implements PartitionProjection {
static final PartitionProjection INSTANCE = new UnpartitionedSpecProjection();

@Override
public PartitionProjection create(PartitionSpec spec, Types.StructType commonPartitionType) {
return this;
}

@Override
public PartitionProjection wrap(StructLike newStruct) {
return null;
}

@Override
public int size() {
return 0;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
throw new IllegalArgumentException("UnpartitionedSpecProjection does not allow set");
}

@Override
public <T> void set(int pos, T value) {
throw new IllegalArgumentException("UnpartitionedSpecProjection does not allow get");
}
}

class PartitionedSpecProjection implements PartitionProjection {
private StructProjection structProjection;

@Override
public PartitionProjection create(PartitionSpec spec, Types.StructType commonPartitionType) {
this.structProjection = StructProjection.create(commonPartitionType, spec.partitionType());
return this;
}

@Override
public PartitionProjection wrap(StructLike newStruct) {
this.structProjection = structProjection.wrap(newStruct);
return this;
}

@Override
public int size() {
return this.structProjection.size();
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return this.structProjection.get(pos, javaClass);
}

@Override
public <T> void set(int pos, T value) {
this.structProjection.set(pos, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.io.FanoutDataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionProjection;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.PositionDeltaWriter;
import org.apache.iceberg.io.WriteResult;
Expand All @@ -64,7 +65,6 @@
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -378,20 +378,13 @@ protected InternalRowWrapper initPartitionRowWrapper(Types.StructType partitionT
return new InternalRowWrapper(sparkPartitionType);
}

protected Map<Integer, StructProjection> buildPartitionProjections(
protected Map<Integer, PartitionProjection> buildPartitionProjections(
Types.StructType partitionType, Map<Integer, PartitionSpec> specs) {
Map<Integer, StructProjection> partitionProjections = Maps.newHashMap();

for (int specId : specs.keySet()) {
PartitionSpec spec = specs.get(specId);
if (spec.isUnpartitioned()) {
partitionProjections.put(specId, null);
} else {
StructProjection projection =
StructProjection.create(partitionType, spec.partitionType());
partitionProjections.put(specId, projection);
}
}
Map<Integer, PartitionProjection> partitionProjections = Maps.newHashMap();

specs.forEach(
(specID, spec) ->
partitionProjections.put(specID, PartitionProjection.of(spec, partitionType)));

return partitionProjections;
}
Expand All @@ -403,7 +396,7 @@ private static class DeleteOnlyDeltaWriter extends BaseDeltaWriter {
private final FileIO io;
private final Map<Integer, PartitionSpec> specs;
private final InternalRowWrapper partitionRowWrapper;
private final Map<Integer, StructProjection> partitionProjections;
private final Map<Integer, PartitionProjection> partitionProjections;
private final int specIdOrdinal;
private final int partitionOrdinal;
private final int fileOrdinal;
Expand Down Expand Up @@ -442,16 +435,13 @@ public void delete(InternalRow metadata, InternalRow id) throws IOException {
PartitionSpec spec = specs.get(specId);

InternalRow partition = metadata.getStruct(partitionOrdinal, partitionRowWrapper.size());
StructProjection partitionProjection = partitionProjections.get(specId);

if (partitionProjection != null) {
partitionProjection.wrap(partitionRowWrapper.wrap(partition));
}
PartitionProjection partitionProjection = partitionProjections.get(specId);

String file = id.getString(fileOrdinal);
long position = id.getLong(positionOrdinal);
positionDelete.set(file, position, null);
delegate.write(positionDelete, spec, partitionProjection);
delegate.write(
positionDelete, spec, partitionProjection.wrap(partitionRowWrapper.wrap(partition)));
}

@Override
Expand Down Expand Up @@ -497,7 +487,7 @@ private abstract static class DeleteAndDataDeltaWriter extends BaseDeltaWriter {
private final FileIO io;
private final Map<Integer, PartitionSpec> specs;
private final InternalRowWrapper deletePartitionRowWrapper;
private final Map<Integer, StructProjection> deletePartitionProjections;
private final Map<Integer, PartitionProjection> deletePartitionProjections;
private final int specIdOrdinal;
private final int partitionOrdinal;
private final int fileOrdinal;
Expand Down Expand Up @@ -537,12 +527,15 @@ public void delete(InternalRow meta, InternalRow id) throws IOException {
PartitionSpec spec = specs.get(specId);

InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
StructProjection partitionProjection = deletePartitionProjections.get(specId);
partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
PartitionProjection partitionProjection = deletePartitionProjections.get(specId);

String file = id.getString(fileOrdinal);
long position = id.getLong(positionOrdinal);
delegate.delete(file, position, spec, partitionProjection);
delegate.delete(
file,
position,
spec,
partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition)));
}

@Override
Expand Down

0 comments on commit 14b1b65

Please sign in to comment.