Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d42ad5f
First version of the Iceberg sink for Apache Beam
Fokko Dec 21, 2020
cd8f687
Merge branch 'master' of https://github.com/apache/incubator-iceberg …
Fokko Dec 23, 2020
98e1004
Some docs
Fokko Dec 23, 2020
0320353
Emit the Snapshot
Fokko Dec 26, 2020
480016a
Make the RAT checker happy
Fokko Dec 26, 2020
085068a
Add streaming test as well
Fokko Dec 28, 2020
f6d97ef
Cleanup of the tests
Fokko Dec 30, 2020
3580531
Merge branch 'master' of https://github.com/apache/incubator-iceberg …
Fokko Jan 2, 2021
c4ad6d7
Cleanup the test a bit
Fokko Jan 3, 2021
f62941f
Refactor the code
Fokko Jan 4, 2021
b0ddc18
Merge branch 'master' of https://github.com/apache/incubator-iceberg …
Fokko Jan 5, 2021
a92df28
Cleanup and make write() public
Fokko Jan 7, 2021
c6b417f
Merge branch 'master' of https://github.com/apache/incubator-iceberg …
Fokko Jan 7, 2021
7e77a3b
Allow to pass configuration
Jan 11, 2021
8e26fe2
Merge branch 'master' of https://github.com/apache/iceberg into fd-be…
Jan 11, 2021
099ed36
Add a nice builder
Jan 12, 2021
32a4bec
Merge branch 'master' of https://github.com/apache/iceberg into fd-be…
Jan 12, 2021
98433c5
Enable tests in the CI
Jan 12, 2021
e0adcb1
Fix the voilations
Jan 12, 2021
4105b5e
Merge branch 'master' of https://github.com/apache/iceberg into fd-be…
Jan 14, 2021
b57def4
Give it some nice names
Jan 14, 2021
a006045
The window will fire even if the list is empty
Jan 15, 2021
3a99c74
Merge branch 'master' of https://github.com/apache/iceberg into fd-be…
Jan 15, 2021
45a3716
Always return the current snapshot
Jan 15, 2021
b5ba1ac
Move to the Iceberg writers
Fokko Jan 21, 2021
6fdb60e
Merge branch 'fd-beam-sink' of github.com:Fokko/incubator-iceberg int…
Fokko Jan 21, 2021
d5ae150
Merge branch 'master' of https://github.com/apache/incubator-iceberg …
Fokko Jan 21, 2021
a6116b2
Move to the Iceberg writer
Fokko Jan 21, 2021
a11eb04
Merge branch 'master' of https://github.com/apache/incubator-iceberg …
Fokko Jan 21, 2021
76016be
Add more tests
Fokko Jan 24, 2021
7c88e68
Merge branch 'master' of https://github.com/apache/incubator-iceberg …
Fokko Jan 24, 2021
70f4733
Revert unrelated changes
Fokko Jan 24, 2021
a2ab6e3
Bump to Beam 0.27
Fokko Jan 24, 2021
2d7ce2e
Fix the checkstyle voilations
Fokko Jan 24, 2021
3399000
Revert unrelated changes
Fokko Jan 25, 2021
9e5f8f1
Restore the builder
Fokko Jan 25, 2021
e5499b9
Fix test
Fokko Jan 25, 2021
5f8badd
Merge branch 'master' of https://github.com/apache/iceberg into fd-be…
Jan 26, 2021
cefb84a
Make the API a bit more flexible
Jan 27, 2021
49b4340
Add testcases
Jan 28, 2021
edcb3e7
Merge branch 'master' of https://github.com/apache/iceberg into fd-be…
Jan 28, 2021
9bb635b
Clean up the code
Jan 28, 2021
48f9f4c
Merge branch 'master' of https://github.com/apache/iceberg into fd-be…
Jan 28, 2021
c6e239e
Add checks
Jan 28, 2021
f5c0167
Add missing start()
Jan 28, 2021
76dbb26
Add Rat exclusion
Jan 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg;

import java.io.Serializable;
import java.util.List;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.BinaryType;
Expand All @@ -35,7 +36,7 @@
/**
* Interface for data files listed in a table manifest.
*/
public interface DataFile extends ContentFile<DataFile> {
public interface DataFile extends ContentFile<DataFile>, Serializable {
// fields for adding delete data files
Types.NestedField CONTENT = optional(134, "content", IntegerType.get(),
"Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/catalog/Namespace.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

package org.apache.iceberg.catalog;

import java.io.Serializable;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;

/**
* A namespace in a {@link Catalog}.
*/
public class Namespace {
public class Namespace implements Serializable {
private static final Namespace EMPTY_NAMESPACE = new Namespace(new String[] {});
private static final Joiner DOT = Joiner.on('.');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.catalog;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -28,7 +29,7 @@
/**
* Identifies a table in iceberg catalog.
*/
public class TableIdentifier {
public class TableIdentifier implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some of these classes where they are implementing Serializable, should we consider adding a serialVersionUID?

I have many users (specifically for Flink) that encounter subtle issues when some Serializable class does not specify serialVersionUID and the compiler changes the assigned seriallVersionUID. Very often, these users need to allow some or all of their jobs existing state to be discarded when this happens.

I'm not sure if TableIdentifier gets placed into state anywhere ourselves, but for classes that are part of the public api is there an argument against including them?

Additionally, does anybody have any arguments for or against including serial version uid on the interfaces which are also extending serializable? For example, DataFile stands out to me in particular as things that could potentially cause issue. I could imagine that something along the lines of Class<DataFile> clazz = DataFile.class could potentially get serialized and cause issues when the interface changes.


private static final Splitter DOT = Splitter.on('.');

Expand Down
113 changes: 113 additions & 0 deletions beam/src/main/java/org/apache/iceberg/beam/BeamAppenderFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.beam;

import java.io.IOException;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.GenericAvroWriter;
import org.apache.iceberg.beam.writers.GenericAvroOrcWriter;
import org.apache.iceberg.beam.writers.GenericAvroParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;

public class BeamAppenderFactory<T extends GenericRecord> implements FileAppenderFactory<T> {
private final Schema schema;
private final Map<String, String> properties;
private final PartitionSpec spec;

BeamAppenderFactory(Schema schema, Map<String, String> properties, PartitionSpec spec) {
this.schema = schema;
this.properties = properties;
this.spec = spec;
}

@Override
public FileAppender<T> newAppender(OutputFile file, FileFormat fileFormat) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
try {
switch (fileFormat) {
case AVRO:
return Avro
.write(file)
.createWriterFunc(GenericAvroWriter::new)
.setAll(properties)
.schema(schema)
.overwrite()
.build();

case PARQUET:
return Parquet.write(file)
.createWriterFunc(GenericAvroParquetWriter::buildWriter)
.setAll(properties)
.metricsConfig(metricsConfig)
.schema(schema)
.overwrite()
.build();

case ORC:
return ORC.write(file)
.createWriterFunc(GenericAvroOrcWriter::buildWriter)
.setAll(properties)
.metricsConfig(metricsConfig)
.schema(schema)
.overwrite()
.build();

default:
throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat);
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}

@Override
public DataWriter<T> newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(newAppender(file.encryptingOutputFile(), format), format,
file.encryptingOutputFile().location(), spec, partition, file.keyMetadata());
}

@Override
public EqualityDeleteWriter<T> newEqDeleteWriter(EncryptedOutputFile file, FileFormat format,
StructLike partition) {
return null;
}

@Override
public PositionDeleteWriter<T> newPosDeleteWriter(EncryptedOutputFile file, FileFormat format,
StructLike partition) {
return null;
}
}
178 changes: 178 additions & 0 deletions beam/src/main/java/org/apache/iceberg/beam/FileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.beam;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileWriter<T extends GenericRecord> extends DoFn<T, DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);

private final PartitionSpec spec;
private final TableIdentifier tableIdentifier;
private final String hiveMetastoreUrl;

private Map<String, String> properties;
private Schema schema;
private LocationProvider locations;
private FileIO io;
private EncryptionManager encryptionManager;
private HiveCatalog catalog;
private FileFormat fileFormat;

private transient TaskWriter<T> writer;
private transient BoundedWindow lastSeenWindow;

public FileWriter(
TableIdentifier tableIdentifier,
Schema schema,
PartitionSpec spec,
String hiveMetastoreUrl,
Map<String, String> properties
) {
this.tableIdentifier = tableIdentifier;
this.spec = spec;
this.hiveMetastoreUrl = hiveMetastoreUrl;
this.schema = schema;
this.properties = properties;
}

@StartBundle
public void startBundle(StartBundleContext sbc) {
start();
}

@ProcessElement
public void processElement(ProcessContext context, BoundedWindow window) {
appendRecord(
context.element(),
window,
(int) context.pane().getIndex(),
context.pane().getIndex()
);
}

@FinishBundle
public void finishBundle(FinishBundleContext fbc) {
DataFile[] files = finish();
for (DataFile file : files) {
fbc.output(file, Instant.now(), lastSeenWindow);
}
}

@VisibleForTesting
public void start() {
Configuration conf = new Configuration();
for (String key : this.properties.keySet()) {
conf.set(key, this.properties.get(key));
}
catalog = new HiveCatalog(
HiveCatalog.DEFAULT_NAME,
this.hiveMetastoreUrl,
1,
conf
);
Table table = HiveCatalogHelper.loadOrCreateTable(
catalog,
tableIdentifier,
schema,
spec,
properties
);
this.schema = table.schema();
this.locations = table.locationProvider();
this.properties = table.properties();
this.io = table.io();
this.encryptionManager = table.encryption();
String formatString = table.properties().getOrDefault(
TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
this.fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}

@VisibleForTesting
public void appendRecord(T element, BoundedWindow window, int partitionId, long taskId) {
if (writer == null) {
LOG.info("Setting up the writer");
// We would rather do this in the startBundle, but we don't know the pane

BeamAppenderFactory<T> appenderFactory = new BeamAppenderFactory<>(schema, properties, spec);
OutputFileFactory fileFactory = new OutputFileFactory(
spec, fileFormat, locations, io, encryptionManager, partitionId, taskId);

if (spec.isUnpartitioned()) {
writer = new UnpartitionedWriter<>(spec, fileFormat, appenderFactory, fileFactory, io, Long.MAX_VALUE);
} else {
writer = new PartitionedWriter<T>(
spec, fileFormat, appenderFactory, fileFactory, io, Long.MAX_VALUE) {
@Override
protected PartitionKey partition(T row) {
return new PartitionKey(spec, schema);
}
};
}
}
try {
lastSeenWindow = window;
writer.write(element);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we get records for multiple partitions? I think PartitionedWriter will fail with a runtime exception.

} catch (IOException e) {
throw new RuntimeException(e);
}
}

@VisibleForTesting
public DataFile[] finish() {
LOG.info("Closing the writer");
try {
writer.close();
return writer.dataFiles();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
writer = null;
catalog.close();
catalog = null;
}
}
}
Loading