Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9f238d6
Flink: Add the iceberg files committer to collect data files and comm…
openinx Aug 11, 2020
47db75f
Add comments for the critical members
openinx Aug 14, 2020
0fcf8c4
Read the max-committed-checkpoint-id only when restoring the job.
openinx Aug 14, 2020
2ca3e45
Fix the broken unit tests.
openinx Aug 14, 2020
02ece7e
Refactor to use the SortedMapTypeInfo
openinx Aug 14, 2020
24c9e80
Handle the cases that two different jobs write the the same table.
openinx Aug 19, 2020
ad83367
Minor fix
openinx Aug 19, 2020
6f5307e
Make IcebergFilesCommitter extends from AbstractStreamOperator for co…
openinx Aug 19, 2020
527ea4c
Add the unit tests for ordered and disordered events between two cont…
openinx Aug 19, 2020
8542b70
Fix the checkstyle
openinx Aug 19, 2020
d14aa08
Add a check to validate the parsed maxCommittedCheckpointId.
openinx Aug 19, 2020
2cb9658
Improve the flink data stream unit tests.
openinx Aug 20, 2020
7c53260
Addressing comments.
openinx Aug 20, 2020
951a0ce
Minor fixes
openinx Aug 20, 2020
65c1921
Introduce orc to the filesCommitter unit tests.
openinx Aug 21, 2020
7158583
Use the TableLoader to load table lazily inside icebergFilesCommitter
openinx Aug 21, 2020
fbc30fc
Advancing the max-committed-checkpoint-id when there's no data files …
openinx Aug 21, 2020
35fa7d3
Fix the broken unit tests after ORC was introduced
openinx Aug 21, 2020
3e3fa49
Keep the increased max-committed-checkpoint-id
openinx Aug 21, 2020
ac9d1e2
Refactor the FlinkSink builder to support both DataStream<Row> dan Da…
openinx Aug 21, 2020
56d0ed6
Close the table loader once we disposed in iceberg files committer
openinx Aug 24, 2020
d7b42c3
Addressing the comments
openinx Aug 24, 2020
31d8436
Add parquet to the unit tests.
openinx Aug 25, 2020
64f0573
Add unit tests for bounded stream
openinx Aug 26, 2020
826ef56
Rebase to master and fix the complie error
openinx Aug 27, 2020
fd6a22e
Move the writer classes to sink package
openinx Aug 27, 2020
78a0d36
Addressing comments from Ryan
openinx Aug 28, 2020
65076d7
Minor changes: removing stale comments etc.
openinx Aug 28, 2020
675977b
Refactor the FlinkSink API and provide detailed javadoc.
openinx Aug 28, 2020
e674ed7
Fix the broken unit tests
openinx Aug 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 0 additions & 81 deletions flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

class FlinkParquetWriters {
public class FlinkParquetWriters {
private FlinkParquetWriters() {
}

Expand Down Expand Up @@ -207,6 +207,7 @@ private static ParquetValueWriters.PrimitiveWriter<DecimalData> decimalAsInteger
" wrong precision %s", precision);
return new IntegerDecimalWriter(desc, precision, scale);
}

private static ParquetValueWriters.PrimitiveWriter<DecimalData> decimalAsLong(ColumnDescriptor desc,
int precision, int scale) {
Preconditions.checkArgument(precision <= 18, "Cannot write decimal value as long with precision larger than 18, " +
Expand Down
220 changes: 220 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.sink;

import java.util.Locale;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;

public class FlinkSink {

private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();

private FlinkSink() {
}

/**
* Initialize a {@link Builder} to export the data from generic input data stream into iceberg table. We use
* {@link RowData} inside the sink connector, so users need to provide a mapper function and a
* {@link TypeInformation} to convert those generic records to a RowData DataStream.
*
* @param input the generic source input data stream.
* @param mapper function to convert the generic data to {@link RowData}
* @param outputType to define the {@link TypeInformation} for the input data.
* @param <T> the data type of records.
* @return {@link Builder} to connect the iceberg table.
*/
public static <T> Builder builderFor(DataStream<T> input,
MapFunction<T, RowData> mapper,
TypeInformation<RowData> outputType) {
DataStream<RowData> dataStream = input.map(mapper, outputType);
return forRowData(dataStream);
}

/**
* Initialize a {@link Builder} to export the data from input data stream with {@link Row}s into iceberg table. We use
* {@link RowData} inside the sink connector, so users need to provide a {@link TableSchema} for builder to convert
* those {@link Row}s to a {@link RowData} DataStream.
*
* @param input the source input data stream with {@link Row}s.
* @param tableSchema defines the {@link TypeInformation} for input data.
* @return {@link Builder} to connect the iceberg table.
*/
public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();

DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
return builderFor(input, rowConverter::toInternal, RowDataTypeInfo.of(rowType))
.tableSchema(tableSchema);
}

/**
* Initialize a {@link Builder} to export the data from input data stream with {@link RowData}s into iceberg table.
*
* @param input the source input data stream with {@link RowData}s.
* @return {@link Builder} to connect the iceberg table.
*/
public static Builder forRowData(DataStream<RowData> input) {
return new Builder().forRowData(input);
}

public static class Builder {
private DataStream<RowData> rowDataInput = null;
private TableLoader tableLoader;
private Configuration hadoopConf;
private Table table;
private TableSchema tableSchema;

private Builder() {
}

private Builder forRowData(DataStream<RowData> newRowDataInput) {
this.rowDataInput = newRowDataInput;
return this;
}

/**
* This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all
* the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so many
* table loading from each separate task.
*
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder table(Table newTable) {
this.table = newTable;
return this;
}

/**
* The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader because
* {@link Table} is not serializable and could not just use the loaded table from Builder#table in the remote task
* manager.
*
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
}

public Builder hadoopConf(Configuration newHadoopConf) {
this.hadoopConf = newHadoopConf;
return this;
}

public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@SuppressWarnings("unchecked")
public DataStreamSink<RowData> build() {
Preconditions.checkArgument(rowDataInput != null,
"Please use forRowData() to initialize the input DataStream.");
Preconditions.checkNotNull(table, "Table shouldn't be null");
Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
Preconditions.checkNotNull(hadoopConf, "Hadoop configuration shouldn't be null");

IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema);
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, hadoopConf);

DataStream<Void> returnStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter)
Copy link
Contributor

@stevenzwu stevenzwu Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there can be multiple Iceberg sinks in the same job. we probably should add the table identifier string suffix to make operator name and id unique. We have a unique sinkName within a job and we add the sinkName suffix for operator name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me, it's necessary to support multiple iceberg sinks in the same job, I will open an issue and provide a patch with unit tests to address this thing.

.setParallelism(rowDataInput.getParallelism())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good default value for writer parallelism. we have users who want to explicitly control the writer parallelism to control the number of written files. in the future, we may want to allow user to set parallelism in the builder.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me, could be a following issue.

.transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

committer is a stateful operator, we should probably explicitly set uid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind. saw earlier comment that this is a todo item.

.setParallelism(1)
.setMaxParallelism(1);

return returnStream.addSink(new DiscardingSink())
Copy link
Contributor

@stevenzwu stevenzwu Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious about the reason why don't we make the committer a sink function and instead add a dummy DiscardingSink.

Conceptually, this writer-committer combo is the reverse/mirror of split enumerator-reader FLIP-27 source interface. It will be nice to run committer on jobmanager (similar to enumerator). This way, Iceberg sink won't change the nature of the embarrassingly-parallel DAG.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would answer your question about why did we use an operator rather than sink function.

Copy link
Contributor

@stevenzwu stevenzwu Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx thx. that does answer my question.

Still adding a DiscardingSink may confuse users. It seems that we really need a unified/improved sink interface (similar to FLIP-27) to supported bounded input jobs in sink function.

What about any other Flink sinks used by bounded streaming/batch job? Do they all have to go through this model?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're three cases: 1> unbounded streaming job; 2> bounded streaming job; 3> batch job. If users only need the unbounded streaming ability, then only need to implement the SinkFunction, otherwise if need both unbounded streaming and bounded streaming ability, then we need to extend/implement the AbstractStreamOperator & BoundedOneInput and add the DiscardingSink to the tail. If want to batch ability, then need to provide an OutputFormat implementation. In future flink, we will unify the case#2 and case#3 in one sink interface, but for now we have to implement separately for the bounded streaming and batch cases.

Flink hive connector is a good case, which have support case1, case2 and case3. It also use the similar way to the current iceberg sink connector now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough. we can't solve this problem until Flink improved sink interface.

.name(String.format("IcebergSink %s", table.toString()))
.setParallelism(1);
}
}

static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");

RowType flinkSchema;
if (requestedSchema != null) {
// Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg schema.
Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);

// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted to
// iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'), we will
// read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must use flink
// schema.
flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
} else {
flinkSchema = FlinkSchemaUtil.convert(table.schema());
}

Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
FileFormat fileFormat = getFileFormat(props);

TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema,
table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props);

return new IcebergStreamWriter<>(table.toString(), taskWriterFactory);
}

private static FileFormat getFileFormat(Map<String, String> properties) {
String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}

private static long getTargetFileSizeBytes(Map<String, String> properties) {
return PropertyUtil.propertyAsLong(properties,
WRITE_TARGET_FILE_SIZE_BYTES,
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
}
}
Loading