Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ project(':iceberg-core') {
compile "com.fasterxml.jackson.core:jackson-databind"
compile "com.fasterxml.jackson.core:jackson-core"
compile "com.github.ben-manes.caffeine:caffeine"
compile "org.rocksdb:rocksdbjni"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a WIP, but is it possible to avoid bringing this into the class path once this is done?

I believe Flink brings in their own RocksDB fork (frocksdb maybe?) and I imagine others might too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the StructLikeMap is in the core code path, which don't depend on any specific compute engines(spark/flink/hive/presto etc), so in theory we could not assume that the engine runtime will include this dependency jar. It's better to include it in iceberg's jar if possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

I'd advocate for possibly sharing it though, so that multiple versions can exist together.

With Spark 3.2 bringing support for RocksDB statestore as well, it might be wise to shade it in the finl outcome. But that is admittedly a long ways off.

compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ public void write(Object s, Encoder encoder) throws IOException {
encoder.writeString((Utf8) s);
} else if (s instanceof String) {
encoder.writeString(new Utf8((String) s));
} else if (s instanceof CharSequence) {
encoder.writeString((CharSequence) s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? We generally want to avoid writing CharSequence directly because it will require conversion.

} else if (s == null) {
throw new IllegalArgumentException("Cannot write null to required string column");
} else {
Expand Down
76 changes: 63 additions & 13 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.StructLikeMapUtil;
import org.apache.iceberg.util.StructProjection;
import org.apache.iceberg.util.Tasks;

Expand All @@ -51,15 +52,18 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSize;
private final Map<String, String> properties;

protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
OutputFileFactory fileFactory, FileIO io, long targetFileSize,
Map<String, String> properties) {
this.spec = spec;
this.format = format;
this.appenderFactory = appenderFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSize = targetFileSize;
this.properties = properties;
}

protected PartitionSpec spec() {
Expand Down Expand Up @@ -96,7 +100,7 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable {
private RollingFileWriter dataWriter;
private RollingEqDeleteWriter eqDeleteWriter;
private SortedPosDeleteWriter<T> posDeleteWriter;
private Map<StructLike, PathOffset> insertedRowMap;
private Map<StructLike, StructLike> insertedRowMap;

protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) {
Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null.");
Expand All @@ -106,7 +110,8 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de
this.dataWriter = new RollingFileWriter(partition);
this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());

this.insertedRowMap = StructLikeMapUtil.load(deleteSchema.asStruct(), PATH_OFFSET_SCHEMA.asStruct(), properties);
}

/**
Expand All @@ -121,26 +126,31 @@ public void write(T row) throws IOException {
StructLike copiedKey = StructCopy.copy(structProjection.wrap(asStructLike(row)));

// Adding a pos-delete to replace the old path-offset.
PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
StructLike previous = insertedRowMap.put(copiedKey, pathOffset);
if (previous != null) {
// TODO attach the previous row if has a positional-delete row schema in appender factory.
posDeleteWriter.delete(previous.path, previous.rowOffset, null);
writePosDelete(previous, null);
}

dataWriter.write(row);
}

private void writePosDelete(StructLike pathOffset, T row) {
Preconditions.checkNotNull(pathOffset, "StructLike pathOffset cannot be null.");
CharSequence path = pathOffset.get(0, CharSequence.class);
long offset = pathOffset.get(1, Long.class);
posDeleteWriter.delete(path, offset, row);
}

/**
* Write the pos-delete if there's an existing row matching the given key.
*
* @param key has the same columns with the equality fields.
*/
private void internalPosDelete(StructLike key) {
PathOffset previous = insertedRowMap.remove(key);
StructLike previous = insertedRowMap.remove(key);

if (previous != null) {
// TODO attach the previous row if has a positional-delete row schema in appender factory.
posDeleteWriter.delete(previous.path, previous.rowOffset, null);
writePosDelete(previous, null);
}
}

Expand Down Expand Up @@ -196,9 +206,14 @@ public void close() throws IOException {
}
}

private static class PathOffset {
private final CharSequence path;
private final long rowOffset;
private static final Schema PATH_OFFSET_SCHEMA = new Schema(
Types.NestedField.required(0, "path", Types.StringType.get()),
Types.NestedField.required(1, "row_offset", Types.LongType.get())
);

private static class PathOffset implements StructLike {
private CharSequence path;
private long rowOffset;

private PathOffset(CharSequence path, long rowOffset) {
this.path = path;
Expand All @@ -216,6 +231,41 @@ public String toString() {
.add("row_offset", rowOffset)
.toString();
}

@Override
public int size() {
return 2;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
switch (pos) {
case 0:
return javaClass.cast(path);
case 1:
return javaClass.cast(rowOffset);
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
}

private void put(int pos, Object value) {
switch (pos) {
case 0:
this.path = (CharSequence) value;
break;
case 1:
this.rowOffset = (long) value;
break;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
}

@Override
public <T> void set(int pos, T value) {
put(pos, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a private put method instead of moving the switch here in set? Won't this need a SuppressWarnings either way?

}
}

private abstract class BaseRollingWriter<W extends Closeable> implements Closeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();

protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
OutputFileFactory fileFactory, FileIO io, long targetFileSize,
Map<String, String> properties) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
Expand All @@ -38,8 +39,9 @@ public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> {
private RollingFileWriter currentWriter = null;

protected PartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
OutputFileFactory fileFactory, FileIO io, long targetFileSize,
Map<String, String> properties) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;

Expand All @@ -28,8 +29,9 @@ public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
private final RollingFileWriter currentWriter;

public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
OutputFileFactory fileFactory, FileIO io, long targetFileSize,
Map<String, String> properties) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties);
currentWriter = new RollingFileWriter(null);
}

Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/iceberg/types/Serializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.types;

public interface Serializer<T> {
byte[] serialize(T object);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use byte[]? Normally we use ByteBuffer to avoid the copies necessarily introduced by byte[].


T deserialize(byte[] data);
}
Loading