diff --git a/build.gradle b/build.gradle index dd99804b3aef..01c0bb4f8309 100644 --- a/build.gradle +++ b/build.gradle @@ -225,6 +225,7 @@ project(':iceberg-core') { compile "com.fasterxml.jackson.core:jackson-databind" compile "com.fasterxml.jackson.core:jackson-core" compile "com.github.ben-manes.caffeine:caffeine" + compile "org.rocksdb:rocksdbjni" compileOnly("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java index 4e4d375ddd30..8f879cbf4363 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java @@ -238,6 +238,8 @@ public void write(Object s, Encoder encoder) throws IOException { encoder.writeString((Utf8) s); } else if (s instanceof String) { encoder.writeString(new Utf8((String) s)); + } else if (s instanceof CharSequence) { + encoder.writeString((CharSequence) s); } else if (s == null) { throw new IllegalArgumentException("Cannot write null to required string column"); } else { diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index d787c7cd465a..8e8feb85dbbd 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -35,8 +35,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; -import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.StructLikeMapUtil; import org.apache.iceberg.util.StructProjection; import org.apache.iceberg.util.Tasks; @@ -51,15 +52,18 @@ public abstract class BaseTaskWriter implements TaskWriter { private final OutputFileFactory fileFactory; private final FileIO io; private final long targetFileSize; + private final Map properties; protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties) { this.spec = spec; this.format = format; this.appenderFactory = appenderFactory; this.fileFactory = fileFactory; this.io = io; this.targetFileSize = targetFileSize; + this.properties = properties; } protected PartitionSpec spec() { @@ -96,7 +100,7 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private RollingFileWriter dataWriter; private RollingEqDeleteWriter eqDeleteWriter; private SortedPosDeleteWriter posDeleteWriter; - private Map insertedRowMap; + private Map insertedRowMap; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null."); @@ -106,7 +110,8 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de this.dataWriter = new RollingFileWriter(partition); this.eqDeleteWriter = new RollingEqDeleteWriter(partition); this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition); - this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct()); + + this.insertedRowMap = StructLikeMapUtil.load(deleteSchema.asStruct(), PATH_OFFSET_SCHEMA.asStruct(), properties); } /** @@ -121,26 +126,31 @@ public void write(T row) throws IOException { StructLike copiedKey = StructCopy.copy(structProjection.wrap(asStructLike(row))); // Adding a pos-delete to replace the old path-offset. - PathOffset previous = insertedRowMap.put(copiedKey, pathOffset); + StructLike previous = insertedRowMap.put(copiedKey, pathOffset); if (previous != null) { - // TODO attach the previous row if has a positional-delete row schema in appender factory. - posDeleteWriter.delete(previous.path, previous.rowOffset, null); + writePosDelete(previous, null); } dataWriter.write(row); } + private void writePosDelete(StructLike pathOffset, T row) { + Preconditions.checkNotNull(pathOffset, "StructLike pathOffset cannot be null."); + CharSequence path = pathOffset.get(0, CharSequence.class); + long offset = pathOffset.get(1, Long.class); + posDeleteWriter.delete(path, offset, row); + } + /** * Write the pos-delete if there's an existing row matching the given key. * * @param key has the same columns with the equality fields. */ private void internalPosDelete(StructLike key) { - PathOffset previous = insertedRowMap.remove(key); + StructLike previous = insertedRowMap.remove(key); if (previous != null) { - // TODO attach the previous row if has a positional-delete row schema in appender factory. - posDeleteWriter.delete(previous.path, previous.rowOffset, null); + writePosDelete(previous, null); } } @@ -196,9 +206,14 @@ public void close() throws IOException { } } - private static class PathOffset { - private final CharSequence path; - private final long rowOffset; + private static final Schema PATH_OFFSET_SCHEMA = new Schema( + Types.NestedField.required(0, "path", Types.StringType.get()), + Types.NestedField.required(1, "row_offset", Types.LongType.get()) + ); + + private static class PathOffset implements StructLike { + private CharSequence path; + private long rowOffset; private PathOffset(CharSequence path, long rowOffset) { this.path = path; @@ -216,6 +231,41 @@ public String toString() { .add("row_offset", rowOffset) .toString(); } + + @Override + public int size() { + return 2; + } + + @Override + public T get(int pos, Class javaClass) { + switch (pos) { + case 0: + return javaClass.cast(path); + case 1: + return javaClass.cast(rowOffset); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + private void put(int pos, Object value) { + switch (pos) { + case 0: + this.path = (CharSequence) value; + break; + case 1: + this.rowOffset = (long) value; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public void set(int pos, T value) { + put(pos, value); + } } private abstract class BaseRollingWriter implements Closeable { diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java index a49fe199cfc9..ed2193727696 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java @@ -30,8 +30,9 @@ public abstract class PartitionedFanoutWriter extends BaseTaskWriter { private final Map writers = Maps.newHashMap(); protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); } /** diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java index a551b8e2686e..5ea925671d08 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java @@ -20,6 +20,7 @@ package org.apache.iceberg.io; import java.io.IOException; +import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; @@ -38,8 +39,9 @@ public abstract class PartitionedWriter extends BaseTaskWriter { private RollingFileWriter currentWriter = null; protected PartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); } /** diff --git a/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java b/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java index 2e98706816c7..8fa141373f68 100644 --- a/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java @@ -20,6 +20,7 @@ package org.apache.iceberg.io; import java.io.IOException; +import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; @@ -28,8 +29,9 @@ public class UnpartitionedWriter extends BaseTaskWriter { private final RollingFileWriter currentWriter; public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); currentWriter = new RollingFileWriter(null); } diff --git a/core/src/main/java/org/apache/iceberg/types/Serializer.java b/core/src/main/java/org/apache/iceberg/types/Serializer.java new file mode 100644 index 000000000000..0b74dd247439 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/types/Serializer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +public interface Serializer { + byte[] serialize(T object); + + T deserialize(byte[] data); +} diff --git a/core/src/main/java/org/apache/iceberg/types/Serializers.java b/core/src/main/java/org/apache/iceberg/types/Serializers.java new file mode 100644 index 000000000000..f99b83f47ce5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/types/Serializers.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.function.IntFunction; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ByteBuffers; + +public class Serializers { + private static final int NULL_MARK = -1; + + private Serializers() { + } + + public static Serializer forType(Types.StructType struct) { + return new StructLikeSerializer(struct); + } + + public static Serializer> forType(Types.ListType list) { + return new ListSerializer<>(list); + } + + public static Serializer forType(Type.PrimitiveType type) { + return new PrimitiveSerializer<>(type); + } + + @SuppressWarnings("unchecked") + private static Serializer internal(Type type) { + if (type.isPrimitiveType()) { + return forType(type.asPrimitiveType()); + } else if (type.isStructType()) { + return (Serializer) forType(type.asStructType()); + } else if (type.isListType()) { + return (Serializer) forType(type.asListType()); + } + throw new UnsupportedOperationException("Cannot determine serializer for type: " + type); + } + + @SuppressWarnings("unchecked") + private static Class internalClass(Type type) { + if (type.isPrimitiveType()) { + return (Class) type.typeId().javaClass(); + } else if (type.isStructType()) { + return (Class) StructLike.class; + } else if (type.isListType()) { + return (Class) List.class; + } else if (type.isMapType()) { + return (Class) Map.class; + } + + throw new UnsupportedOperationException("Cannot determine expected class for type: " + type); + } + + private static class StructLikeSerializer implements Serializer { + private final Types.StructType struct; + private final Serializer[] serializers; + private final Class[] classes; + + private StructLikeSerializer(Types.StructType struct) { + this.struct = struct; + this.serializers = struct.fields().stream() + .map(field -> internal(field.type())) + .toArray((IntFunction[]>) Serializer[]::new); + this.classes = struct.fields().stream() + .map(field -> internalClass(field.type())) + .toArray(Class[]::new); + } + + @Override + public byte[] serialize(StructLike object) { + if (object == null) { + return null; + } + + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out)) { + + for (int i = 0; i < serializers.length; i += 1) { + Class valueClass = classes[i]; + + byte[] fieldData = serializers[i].serialize(object.get(i, valueClass)); + if (fieldData == null) { + dos.writeInt(NULL_MARK); + } else { + dos.writeInt(fieldData.length); + dos.write(fieldData); + } + } + return out.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public StructLike deserialize(byte[] data) { + if (data == null) { + return null; + } + + try (ByteArrayInputStream in = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(in)) { + + GenericRecord record = GenericRecord.create(struct); + for (int i = 0; i < serializers.length; i += 1) { + int length = dis.readInt(); + + if (length == NULL_MARK) { + record.set(i, null); + } else { + byte[] fieldData = new byte[length]; + int fieldDataSize = dis.read(fieldData); + Preconditions.checkState(length == fieldDataSize, "%s != %s", length, fieldDataSize); + record.set(i, serializers[i].deserialize(fieldData)); + } + } + + return record; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + private static class ListSerializer implements Serializer> { + private final Serializer elementSerializer; + + private ListSerializer(Types.ListType list) { + this.elementSerializer = internal(list.elementType()); + } + + @Override + public byte[] serialize(List object) { + if (object == null) { + return null; + } + + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out)) { + + dos.writeInt(object.size()); + for (T elem : object) { + byte[] data = elementSerializer.serialize(elem); + + if (data == null) { + dos.writeInt(NULL_MARK); + } else { + dos.writeInt(data.length); + dos.write(data); + } + } + + return out.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List deserialize(byte[] data) { + if (data == null) { + return null; + } + + try (ByteArrayInputStream in = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(in)) { + + int size = dis.readInt(); + List result = Lists.newArrayListWithExpectedSize(size); + for (int i = 0; i < size; i++) { + int length = dis.readInt(); + + if (length == NULL_MARK) { + result.add(null); + } else { + byte[] fieldData = new byte[length]; + int fieldDataSize = dis.read(fieldData); + Preconditions.checkState(length == fieldDataSize, "%s != %s", length, fieldDataSize); + result.add(elementSerializer.deserialize(fieldData)); + } + } + + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static class PrimitiveSerializer implements Serializer { + + private final Type.PrimitiveType type; + + private PrimitiveSerializer(Type.PrimitiveType type) { + this.type = type; + } + + @Override + public byte[] serialize(Object object) { + return ByteBuffers.toByteArray(Conversions.toByteBuffer(type, object)); + } + + @Override + public T deserialize(byte[] data) { + return Conversions.fromByteBuffer(type, data == null ? null : ByteBuffer.wrap(data)); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeMap.java b/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeMap.java new file mode 100644 index 000000000000..9d565cfba892 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeMap.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Serializer; +import org.apache.iceberg.types.Serializers; +import org.apache.iceberg.types.Types; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +public class RocksDBStructLikeMap extends AbstractMap implements Map { + + static { + RocksDB.loadLibrary(); + } + + public static RocksDBStructLikeMap create(String path, + Types.StructType keyType, + Types.StructType valType) { + // Create the RocksDB directory if not exists. + Path rocksDBDir = Paths.get(path); + if (!Files.exists(rocksDBDir)) { + try { + Files.createDirectory(rocksDBDir); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (!Files.isDirectory(rocksDBDir)) { + throw new ValidationException("The existing path %s is not a directory", path); + } + + return new RocksDBStructLikeMap(path, keyType, valType); + } + + private final String path; + private final WriteOptions writeOptions; + private final RocksDB db; + private final Types.StructType keyType; + private final Types.StructType valType; + + private final Serializer keySerializer; + private final Serializer valSerializer; + + // It's expensive to get the RocksDB's data size, so we maintain the size when put/delete rows. + private int size = 0; + + private RocksDBStructLikeMap(String path, Types.StructType keyType, Types.StructType valType) { + this.path = path; + this.writeOptions = new WriteOptions().setDisableWAL(true); + try { + Options options = new Options().setCreateIfMissing(true); + this.db = RocksDB.open(options, path); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + this.keyType = keyType; + this.valType = valType; + this.keySerializer = Serializers.forType(keyType); + this.valSerializer = Serializers.forType(valType); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size <= 0; + } + + @Override + public boolean containsKey(Object key) { + if (key instanceof StructLike) { + byte[] keyData = keySerializer.serialize((StructLike) key); + try { + return db.get(keyData) != null; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + return false; + } + + @Override + public boolean containsValue(Object value) { + if (value instanceof StructLike) { + byte[] valData = valSerializer.serialize((StructLike) value); + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + if (Arrays.equals(valData, iter.value())) { + return true; + } + } + } + } + return false; + } + + @Override + public StructLike get(Object key) { + if (key instanceof StructLike) { + byte[] keyData = keySerializer.serialize((StructLike) key); + try { + byte[] valData = db.get(keyData); + if (valData == null) { + return null; + } + + return valSerializer.deserialize(valData); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + return null; + } + + @Override + public StructLike put(StructLike key, StructLike value) { + byte[] keyData = keySerializer.serialize(key); + byte[] newValue = valSerializer.serialize(value); + try { + byte[] oldValue = db.get(keyData); + db.put(writeOptions, keyData, newValue); + + if (oldValue == null) { + // Add a new row into the map. + size += 1; + return null; + } else { + // Replace the old row with the new row. + return valSerializer.deserialize(oldValue); + } + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + @Override + public StructLike remove(Object key) { + if (key instanceof StructLike) { + byte[] keyData = keySerializer.serialize((StructLike) key); + try { + byte[] valData = db.get(keyData); + if (valData != null) { + db.delete(writeOptions, keyData); + + size -= 1; + return valSerializer.deserialize(valData); + } + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + return null; + } + + @Override + public void clear() { + size = 0; + db.close(); + try { + FileUtils.cleanDirectory(new File(path)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public Set keySet() { + StructLikeSet keySet = StructLikeSet.create(keyType); + + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + keySet.add(keySerializer.deserialize(iter.key())); + } + } + + return keySet; + } + + @Override + public Collection values() { + Set valueSet = Sets.newHashSet(); + + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + valueSet.add(valSerializer.deserialize(iter.value())); + } + } + + return valueSet; + } + + @Override + public Set> entrySet() { + Set> entrySet = Sets.newHashSet(); + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + StructLikeEntry entry = new StructLikeEntry( + keySerializer.deserialize(iter.key()), + valSerializer.deserialize(iter.value())); + entrySet.add(entry); + } + return entrySet; + } + } + + private class StructLikeEntry implements Entry { + + private final StructLike key; + private final StructLike value; + + private StructLikeEntry(StructLike key, StructLike value) { + this.key = key; + this.value = value; + } + + @Override + public StructLike getKey() { + return key; + } + + @Override + public StructLike getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof StructLikeEntry)) { + return false; + } else { + StructLikeEntry that = (StructLikeEntry) o; + return Objects.equals( + StructLikeWrapper.forType(keyType).set(key), + StructLikeWrapper.forType(keyType).set(that.key)) && + Objects.equals( + StructLikeWrapper.forType(valType).set(value), + StructLikeWrapper.forType(valType).set(that.value) + ); + } + } + + @Override + public int hashCode() { + int hashCode = 0; + if (key != null) { + hashCode ^= StructLikeWrapper.forType(keyType).set(key).hashCode(); + } + if (value != null) { + hashCode ^= StructLikeWrapper.forType(valType).set(value).hashCode(); + } + return hashCode; + } + + @Override + public StructLike setValue(StructLike newValue) { + throw new UnsupportedOperationException("Does not support setValue."); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMapUtil.java b/core/src/main/java/org/apache/iceberg/util/StructLikeMapUtil.java new file mode 100644 index 000000000000..3778f5201933 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMapUtil.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StructLikeMapUtil { + + private static final Logger LOG = LoggerFactory.getLogger(StructLikeMap.class); + private static final String ROCKSDB_DIR_PREFIX = "iceberg-rocksdb-"; + + public static final String IMPL = "spill-disk-impl"; + public static final String IN_MEMORY_MAP = "in-memory"; + public static final String ROCKSDB_MAP = "rocksdb"; + public static final String ROCKSDB_DIR = "rocksdb-dir"; + + private StructLikeMapUtil() { + } + + public static Map load(Types.StructType keyType, + Types.StructType valType, + Map properties) { + String impl = properties.getOrDefault(IMPL, IN_MEMORY_MAP); + LOG.info("Loading StructLikeMap implementation: {}", impl); + + switch (impl) { + case IN_MEMORY_MAP: + return StructLikeMap.create(keyType); + case ROCKSDB_MAP: + return RocksDBStructLikeMap.create(rocksDBDir(properties), keyType, valType); + default: + throw new UnsupportedOperationException("Unknown StructLikeMap implementation: " + impl); + } + } + + private static String rocksDBDir(Map properties) { + String dir = properties.getOrDefault(ROCKSDB_DIR, System.getProperty("java.io.tmpdir")); + try { + return Files.createTempDirectory(Paths.get(dir), ROCKSDB_DIR_PREFIX).toString(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create the temporary directory", e); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java b/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java index 7aba8ed7a754..fc483dbdbebb 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java +++ b/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java @@ -27,106 +27,183 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class TestStructLikeMap { - private static final Types.StructType STRUCT_TYPE = Types.StructType.of( + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private final String mapType; + private Map map; + + @Parameterized.Parameters(name = "StructLikeType = {0}") + public static Iterable parameters() { + return Lists.newArrayList( + new Object[] {"rocksdb"}, + new Object[] {"in-memory"} + ); + } + + public TestStructLikeMap(String mapType) { + this.mapType = mapType; + } + + @Before + public void before() { + Map props = Maps.newHashMap(); + props.put(StructLikeMapUtil.IMPL, mapType); + + if (StructLikeMapUtil.ROCKSDB_MAP.equals(mapType)) { + props.put(StructLikeMapUtil.ROCKSDB_DIR, temp.getRoot().getAbsolutePath()); + } + + this.map = StructLikeMapUtil.load(KEY_TYPE, VAL_TYPE, props); + } + + @After + public void after() { + map.clear(); + } + + private static final Types.StructType KEY_TYPE = Types.StructType.of( Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "data", Types.LongType.get()) + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + private static final Types.StructType VAL_TYPE = Types.StructType.of( + Types.NestedField.optional(1, "value", Types.StringType.get()) ); + private static GenericRecord key(int id, CharSequence data) { + return (GenericRecord) GenericRecord.create(KEY_TYPE) + .copy("id", id, "data", data); + } + + private static GenericRecord val(CharSequence value) { + return (GenericRecord) GenericRecord.create(VAL_TYPE) + .copy("value", value); + } + + private static void assertKey(StructLike key1, StructLike key2) { + StructLikeWrapper expected = StructLikeWrapper.forType(KEY_TYPE).set(key1); + StructLikeWrapper actual = StructLikeWrapper.forType(VAL_TYPE).set(key2); + Assert.assertEquals(expected, actual); + } + + private static void assertValue(StructLike val1, StructLike val2) { + StructLikeWrapper expected = StructLikeWrapper.forType(VAL_TYPE).set(val1); + StructLikeWrapper actual = StructLikeWrapper.forType(VAL_TYPE).set(val2); + Assert.assertEquals(expected, actual); + } + + private static void assertKeySet(Collection expected, Collection actual) { + StructLikeSet expectedSet = StructLikeSet.create(KEY_TYPE); + StructLikeSet actualSet = StructLikeSet.create(KEY_TYPE); + expectedSet.addAll(expected); + actualSet.addAll(actual); + Assert.assertEquals(expectedSet, actualSet); + } + + private static void assertValueSet(Collection expected, Collection actual) { + StructLikeSet expectedSet = StructLikeSet.create(VAL_TYPE); + StructLikeSet actualSet = StructLikeSet.create(VAL_TYPE); + expectedSet.addAll(expected); + actualSet.addAll(actual); + Assert.assertEquals(expectedSet, actualSet); + } + @Test public void testSingleRecord() { - Record gRecord = GenericRecord.create(STRUCT_TYPE); - Record record1 = gRecord.copy(ImmutableMap.of("id", 1, "data", "aaa")); - - Map map = StructLikeMap.create(STRUCT_TYPE); Assert.assertEquals(0, map.size()); - map.put(record1, "1-aaa"); + map.put(key(1, "aaa"), val("1-aaa")); Assert.assertEquals(1, map.size()); Assert.assertFalse(map.isEmpty()); - Assert.assertTrue(map.containsKey(record1)); - Assert.assertTrue(map.containsValue("1-aaa")); - Assert.assertEquals("1-aaa", map.get(record1)); + Assert.assertTrue(map.containsKey(key(1, "aaa"))); + Assert.assertTrue(map.containsValue(val("1-aaa"))); + assertValue(val("1-aaa"), map.get(key(1, "aaa"))); Set keySet = map.keySet(); Assert.assertEquals(1, keySet.size()); - Assert.assertTrue(keySet.contains(record1)); + Assert.assertTrue(keySet.contains(key(1, "aaa"))); - Collection values = map.values(); + Collection values = map.values(); Assert.assertEquals(1, values.size()); - Assert.assertEquals("1-aaa", values.toArray(new String[0])[0]); + assertValue(val("1-aaa"), values.toArray(new StructLike[0])[0]); - Set> entrySet = map.entrySet(); + Set> entrySet = map.entrySet(); Assert.assertEquals(1, entrySet.size()); - for (Map.Entry entry : entrySet) { - Assert.assertEquals(record1, entry.getKey()); - Assert.assertEquals("1-aaa", entry.getValue()); + for (Map.Entry entry : entrySet) { + assertKey(key(1, "aaa"), entry.getKey()); + assertValue(val("1-aaa"), entry.getValue()); break; } } @Test public void testMultipleRecord() { - Record gRecord = GenericRecord.create(STRUCT_TYPE); - Record record1 = gRecord.copy(ImmutableMap.of("id", 1, "data", "aaa")); - Record record2 = gRecord.copy(ImmutableMap.of("id", 2, "data", "bbb")); - Record record3 = gRecord.copy(); - record3.setField("id", 3); - record3.setField("data", null); - - Map map = StructLikeMap.create(STRUCT_TYPE); + Record record1 = key(1, "aaa"); + Record record2 = key(2, "bbb"); + Record record3 = key(3, null); + Assert.assertEquals(0, map.size()); - map.putAll(ImmutableMap.of(record1, "1-aaa", record2, "2-bbb", record3, "3-null")); + map.putAll(ImmutableMap.of(record1, val("1-aaa"), record2, val("2-bbb"), record3, val("3-null"))); Assert.assertEquals(3, map.size()); Assert.assertTrue(map.containsKey(record1)); Assert.assertTrue(map.containsKey(record2)); Assert.assertTrue(map.containsKey(record3)); - Assert.assertTrue(map.containsValue("1-aaa")); - Assert.assertTrue(map.containsValue("2-bbb")); - Assert.assertTrue(map.containsValue("3-null")); - Assert.assertEquals("1-aaa", map.get(record1)); - Assert.assertEquals("2-bbb", map.get(record2)); - Assert.assertEquals("3-null", map.get(record3)); + Assert.assertTrue(map.containsValue(val("1-aaa"))); + Assert.assertTrue(map.containsValue(val("2-bbb"))); + Assert.assertTrue(map.containsValue(val("3-null"))); + assertValue(val("1-aaa"), map.get(record1)); + assertValue(val("2-bbb"), map.get(record2)); + assertValue(val("3-null"), map.get(record3)); Set keySet = map.keySet(); Assert.assertEquals(3, keySet.size()); - Assert.assertEquals(ImmutableSet.of(record1, record2, record3), keySet); + assertKeySet(ImmutableSet.of(record1, record2, record3), keySet); - Collection values = map.values(); + Collection values = map.values(); Assert.assertEquals(3, values.size()); - Assert.assertEquals(ImmutableSet.of("1-aaa", "2-bbb", "3-null"), Sets.newHashSet(values)); + assertValueSet(ImmutableSet.of(val("1-aaa"), val("2-bbb"), val("3-null")), Sets.newHashSet(values)); - Set> entrySet = map.entrySet(); + Set> entrySet = map.entrySet(); Assert.assertEquals(3, entrySet.size()); Set structLikeSet = Sets.newHashSet(); - Set valueSet = Sets.newHashSet(); - for (Map.Entry entry : entrySet) { + Set valueSet = Sets.newHashSet(); + for (Map.Entry entry : entrySet) { structLikeSet.add(entry.getKey()); valueSet.add(entry.getValue()); } - Assert.assertEquals(ImmutableSet.of(record1, record2, record3), structLikeSet); - Assert.assertEquals(ImmutableSet.of("1-aaa", "2-bbb", "3-null"), valueSet); + assertKeySet(ImmutableSet.of(record1, record2, record3), structLikeSet); + assertValueSet(ImmutableSet.of(val("1-aaa"), val("2-bbb"), val("3-null")), valueSet); } @Test public void testRemove() { - Record gRecord = GenericRecord.create(STRUCT_TYPE); - Record record = gRecord.copy(ImmutableMap.of("id", 1, "data", "aaa")); + StructLike record = key(1, "aaa"); - Map map = StructLikeMap.create(STRUCT_TYPE); - map.put(record, "1-aaa"); + map.put(record, val("1-aaa")); Assert.assertEquals(1, map.size()); - Assert.assertEquals("1-aaa", map.get(record)); - Assert.assertEquals("1-aaa", map.remove(record)); + assertValue(val("1-aaa"), map.get(record)); + assertValue(val("1-aaa"), map.remove(record)); Assert.assertEquals(0, map.size()); - map.put(record, "1-aaa"); - Assert.assertEquals("1-aaa", map.get(record)); + map.put(record, val("1-aaa")); + assertValue(val("1-aaa"), map.get(record)); } } diff --git a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java index 89356b231006..cbf0f4cdbd4a 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java @@ -37,6 +37,7 @@ import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; import org.junit.Before; @@ -217,7 +218,7 @@ private TestTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, Maps.newHashMap()); this.dataWriter = new RollingFileWriter(null); this.deleteWriter = new RollingEqDeleteWriter(null); } diff --git a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java index 6231a569cd2d..2ca52c9b4f81 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.function.Function; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -47,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeMapUtil; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; import org.junit.Before; @@ -60,6 +62,7 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase { private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L; private final FileFormat format; + private final String structLikeMap; private final GenericRecord gRecord = GenericRecord.create(SCHEMA); private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema()); @@ -67,17 +70,20 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase { private int idFieldId; private int dataFieldId; - @Parameterized.Parameters(name = "FileFormat = {0}") + @Parameterized.Parameters(name = "FileFormat = {0}, StructLikeMap = {1}") public static Object[][] parameters() { return new Object[][] { - {"avro"}, - {"parquet"} + {"avro", StructLikeMapUtil.IN_MEMORY_MAP}, + {"avro", StructLikeMapUtil.ROCKSDB_MAP}, + {"parquet", StructLikeMapUtil.IN_MEMORY_MAP}, + {"parquet", StructLikeMapUtil.ROCKSDB_MAP} }; } - public TestTaskEqualityDeltaWriter(String fileFormat) { + public TestTaskEqualityDeltaWriter(String fileFormat, String structLikeMap) { super(FORMAT_V2); this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + this.structLikeMap = structLikeMap; } @Before @@ -95,6 +101,7 @@ public void setupTable() throws IOException { table.updateProperties() .defaultFormat(format) + .set(StructLikeMapUtil.IMPL, structLikeMap) .commit(); } @@ -441,7 +448,7 @@ private GenericTaskDeltaWriter createTaskWriter(List equalityFieldIds, Schema deleteSchema = table.schema().select(columns); return new GenericTaskDeltaWriter(table.schema(), deleteSchema, table.spec(), format, appenderFactory, - fileFactory, table.io(), TARGET_FILE_SIZE); + fileFactory, table.io(), TARGET_FILE_SIZE, table.properties()); } private static class GenericTaskDeltaWriter extends BaseTaskWriter { @@ -449,8 +456,9 @@ private static class GenericTaskDeltaWriter extends BaseTaskWriter { private GenericTaskDeltaWriter(Schema schema, Schema deleteSchema, PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); this.deltaWriter = new GenericEqualityDeltaWriter(null, schema, deleteSchema); } diff --git a/data/src/test/java/org/apache/iceberg/types/TestSerializers.java b/data/src/test/java/org/apache/iceberg/types/TestSerializers.java new file mode 100644 index 000000000000..f3e508596cc4 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/types/TestSerializers.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +import java.util.Comparator; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestSerializers { + + private static final Types.StructType REQUIRED_PRIMITIVES = Types.StructType.of( + required(100, "id", Types.LongType.get()), + required(101, "data", Types.StringType.get()), + required(102, "b", Types.BooleanType.get()), + required(103, "i", Types.IntegerType.get()), + required(104, "l", Types.LongType.get()), + required(105, "f", Types.FloatType.get()), + required(106, "d", Types.DoubleType.get()), + required(107, "date", Types.DateType.get()), + required(108, "ts_tz", Types.TimestampType.withZone()), + required(109, "ts", Types.TimestampType.withoutZone()), + required(110, "s", Types.StringType.get()), + required(112, "fixed", Types.FixedType.ofLength(7)), + required(113, "bytes", Types.BinaryType.get()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), + required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision + required(117, "time", Types.TimeType.get()) + ); + + private static final Types.StructType OPTIONAL_PRIMITIVES = Types.StructType.of( + optional(200, "_id", Types.LongType.get()), + optional(201, "_data", Types.StringType.get()), + optional(202, "_b", Types.BooleanType.get()), + optional(203, "_i", Types.IntegerType.get()), + optional(204, "_l", Types.LongType.get()), + optional(205, "_f", Types.FloatType.get()), + optional(206, "_d", Types.DoubleType.get()), + optional(207, "_date", Types.DateType.get()), + optional(208, "_ts_tz", Types.TimestampType.withZone()), + optional(209, "_ts", Types.TimestampType.withoutZone()), + optional(210, "_s", Types.StringType.get()), + optional(212, "_fixed", Types.FixedType.ofLength(7)), + optional(213, "_bytes", Types.BinaryType.get()), + optional(214, "_dec_9_0", Types.DecimalType.of(9, 0)), + optional(215, "_dec_11_2", Types.DecimalType.of(11, 2)), + optional(216, "_dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision + optional(217, "_time", Types.TimeType.get()) + ); + + @Test + public void testRequiredPrimitives() { + generateAndValidate(new Schema(REQUIRED_PRIMITIVES.fields())); + } + + @Test + public void testOptionalPrimitives() { + generateAndValidate(new Schema(OPTIONAL_PRIMITIVES.fields())); + } + + @Test + public void testListType() { + Types.StructType structType = Types.StructType.of( + required(0, "id", Types.LongType.get()), + optional(1, "optional_list", Types.ListType.ofOptional( + 2, Types.ListType.ofRequired( + 3, Types.LongType.get() + ) + )), + required(6, "required_list", Types.ListType.ofRequired( + 7, Types.ListType.ofOptional( + 8, Types.LongType.get() + ) + )) + ); + + generateAndValidate(new Schema(structType.fields())); + } + + @Test + @Ignore("The InternalRecordWrapper does not support nested ListType.") + public void testNestedList() { + Types.StructType structType = Types.StructType.of( + optional(1, "list_struct", Types.ListType.ofOptional( + 2, Types.ListType.ofRequired( + 3, Types.StructType.of(REQUIRED_PRIMITIVES.fields()) + ) + )), + optional(4, "struct_list", Types.StructType.of( + Types.NestedField.required(5, "inner_list", + Types.ListType.ofOptional( + 6, Types.StructType.of(OPTIONAL_PRIMITIVES.fields()) + )) + )) + ); + + generateAndValidate(new Schema(structType.fields())); + } + + @Test + public void testNestedSchema() { + Types.StructType structType = Types.StructType.of( + required(0, "id", Types.LongType.get()), + required(1, "level1", Types.StructType.of( + optional(2, "level2", Types.StructType.of( + required(3, "level3", Types.StructType.of( + optional(4, "level4", Types.StructType.of( + required(5, "level5", Types.StructType.of( + REQUIRED_PRIMITIVES.fields() + )) + )) + )) + )) + )) + ); + + generateAndValidate(new Schema(structType.fields())); + } + + private void generateAndValidate(Schema schema) { + List records = RandomGenericData.generate(schema, 100, 1_000_000_1); + + InternalRecordWrapper recordWrapper = new InternalRecordWrapper(schema.asStruct()); + Serializer serializer = Serializers.forType(schema.asStruct()); + Comparator comparator = Comparators.forType(schema.asStruct()); + + for (Record expectedRecord : records) { + StructLike expectedStructLike = recordWrapper.wrap(expectedRecord); + + byte[] expectedData = serializer.serialize(expectedStructLike); + StructLike actualStructLike = serializer.deserialize(expectedData); + + Assert.assertEquals("Should produce the equivalent StructLike", 0, + comparator.compare(expectedStructLike, actualStructLike)); + + byte[] actualData = serializer.serialize(actualStructLike); + Assert.assertArrayEquals("Should have the expected serialized data", expectedData, actualData); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 10dab416091c..681f5cab32b2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -48,10 +49,11 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties, Schema schema, RowType flinkSchema, List equalityFieldIds) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); this.schema = schema; this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index b2f8ceece9f8..8f2046ee6733 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -47,10 +47,12 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties, Schema schema, RowType flinkSchema, List equalityFieldIds) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds); + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties, schema, flinkSchema, + equalityFieldIds); this.partitionKey = new PartitionKey(spec, schema); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index b0776f49d190..22b3daa86736 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -48,6 +48,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { private final EncryptionManager encryptionManager; private final long targetFileSizeBytes; private final FileFormat format; + private final Map tableProperties; private final List equalityFieldIds; private final FileAppenderFactory appenderFactory; @@ -71,6 +72,7 @@ public RowDataTaskWriterFactory(Schema schema, this.encryptionManager = encryptionManager; this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; + this.tableProperties = tableProperties; this.equalityFieldIds = equalityFieldIds; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { @@ -95,19 +97,20 @@ public TaskWriter create() { if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { // Initialize a task writer to write INSERT only. if (spec.isUnpartitioned()) { - return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); + return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes, + tableProperties); } else { return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, - io, targetFileSizeBytes, schema, flinkSchema); + io, targetFileSizeBytes, tableProperties, schema, flinkSchema); } } else { // Initialize a task writer to write both INSERT and equality DELETE. if (spec.isUnpartitioned()) { return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds); + targetFileSizeBytes, tableProperties, schema, flinkSchema, equalityFieldIds); } else { return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds); + targetFileSizeBytes, tableProperties, schema, flinkSchema, equalityFieldIds); } } } @@ -118,9 +121,10 @@ private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWri private final RowDataWrapper rowDataWrapper; RowDataPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties, Schema schema, RowType flinkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); this.partitionKey = new PartitionKey(spec, schema); this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java index 341e634df713..378b6f80d375 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -39,10 +40,12 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties, Schema schema, RowType flinkSchema, List equalityFieldIds) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds); + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties, schema, flinkSchema, + equalityFieldIds); this.writer = new RowDataDeltaWriter(null); } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java index 348580d2ffa0..3fe9eafe800a 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java @@ -86,6 +86,7 @@ private static HiveIcebergRecordWriter writer(JobConf jc) { String tableName = jc.get(Catalogs.NAME); return new HiveIcebergRecordWriter(schema, spec, fileFormat, - new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID, tableName); + new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, table.properties(), + taskAttemptID, tableName); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java index 2adf43d1211a..e8ccf9a9d400 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -64,8 +64,8 @@ static Map getWriters(TaskAttemptID taskAttempt HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize, - TaskAttemptID taskAttemptID, String tableName) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + Map properties, TaskAttemptID taskAttemptID, String tableName) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); this.io = io; this.currentKey = new PartitionKey(spec, schema); writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap()); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index 7b4dce54684d..6acf408470a1 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -278,7 +278,7 @@ private List writeRecords(String name, int taskNum, int attemptNum, bool .operationId(operationId) .build(); HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, fileFormat, - new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, + new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, table.properties(), TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME)); Container container = new Container<>(); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 63cc3a466c1a..f31ce12ceb2d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -97,16 +97,16 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio TaskWriter writer; if (spec.isUnpartitioned()) { writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, table.io(), - Long.MAX_VALUE); + Long.MAX_VALUE, properties); } else if (PropertyUtil.propertyAsBoolean(properties, TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) { writer = new SparkPartitionedFanoutWriter( - spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema, + spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, properties, schema, structType); } else { writer = new SparkPartitionedWriter( - spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema, + spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, properties, schema, structType); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java index d38ae2f40316..894bc4e07910 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; +import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; @@ -35,10 +36,10 @@ public class SparkPartitionedFanoutWriter extends PartitionedFanoutWriter appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties, Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); this.partitionKey = new PartitionKey(spec, schema); this.internalRowWrapper = new InternalRowWrapper(sparkSchema); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java index f81a09926d85..f03957dfcd8e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; +import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; @@ -37,8 +38,8 @@ public class SparkPartitionedWriter extends PartitionedWriter { public SparkPartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + Map properties, Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); this.partitionKey = new PartitionKey(spec, schema); this.internalRowWrapper = new InternalRowWrapper(sparkSchema); } diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java index 4a7e0c24cfa1..f2af9b16dc3f 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -268,15 +268,16 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo PartitionSpec spec = table.spec(); FileIO io = table.io(); + Map properties = table.properties(); if (spec.isUnpartitioned()) { - return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize); + return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); } else if (partitionedFanoutEnabled) { return new PartitionedFanout24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize, - writeSchema, dsSchema); + properties, writeSchema, dsSchema); } else { return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize, - writeSchema, dsSchema); + properties, writeSchema, dsSchema); } } } @@ -284,8 +285,9 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter { Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); + OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, + Map properties) { + super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, properties); } @Override @@ -300,8 +302,8 @@ private static class Partitioned24Writer extends SparkPartitionedWriter implemen Partitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema, sparkSchema); + Map properties, Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, properties, schema, sparkSchema); } @Override @@ -318,9 +320,8 @@ private static class PartitionedFanout24Writer extends SparkPartitionedFanoutWri PartitionedFanout24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema, - sparkSchema); + Map properties, Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, properties, schema, sparkSchema); } @Override diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index a3da366768de..405df655a39d 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -537,15 +537,16 @@ public DataWriter createWriter(int partitionId, long taskId, long e PartitionSpec spec = table.spec(); FileIO io = table.io(); + Map properties = table.properties(); if (spec.isUnpartitioned()) { - return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize); + return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); } else if (partitionedFanoutEnabled) { return new PartitionedFanout3Writer( - spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema); + spec, format, appenderFactory, fileFactory, io, targetFileSize, properties, writeSchema, dsSchema); } else { return new Partitioned3Writer( - spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema); + spec, format, appenderFactory, fileFactory, io, targetFileSize, properties, writeSchema, dsSchema); } } } @@ -553,8 +554,9 @@ public DataWriter createWriter(int partitionId, long taskId, long e private static class Unpartitioned3Writer extends UnpartitionedWriter implements DataWriter { Unpartitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties); } @Override @@ -568,8 +570,8 @@ public WriterCommitMessage commit() throws IOException { private static class Partitioned3Writer extends SparkPartitionedWriter implements DataWriter { Partitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); + Map properties, Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties, schema, sparkSchema); } @Override @@ -584,8 +586,8 @@ private static class PartitionedFanout3Writer extends SparkPartitionedFanoutWrit implements DataWriter { PartitionedFanout3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); + Map properties, Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, properties, schema, sparkSchema); } @Override diff --git a/versions.props b/versions.props index 40988337df07..e29e21d2d042 100644 --- a/versions.props +++ b/versions.props @@ -17,6 +17,7 @@ org.apache.arrow:arrow-vector = 2.0.0 org.apache.arrow:arrow-memory-netty = 2.0.0 com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 software.amazon.awssdk:* = 2.15.7 +org.rocksdb:rocksdbjni = 6.6.4 org.scala-lang:scala-library = 2.12.10 org.projectnessie:* = 0.8.3