diff --git a/build.gradle b/build.gradle index c5bed3291b45..34ebeb9f51c9 100644 --- a/build.gradle +++ b/build.gradle @@ -32,6 +32,7 @@ buildscript { classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.14.0' classpath 'gradle.plugin.org.inferred:gradle-processors:2.1.0' classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8' + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.18.0' } } @@ -137,7 +138,7 @@ project(':iceberg-bundled-guava') { tasks.jar.dependsOn tasks.shadowJar - dependencies { + dependencies { compileOnly('com.google.guava:guava') { exclude group: 'com.google.code.findbugs' // may be LGPL - use ALv2 findbugs-annotations instead @@ -186,7 +187,7 @@ project(':iceberg-common') { compile project(path: ':iceberg-bundled-guava', configuration: 'shadow') } } - + project(':iceberg-core') { dependencies { compile project(':iceberg-api') @@ -545,6 +546,105 @@ project(':iceberg-spark-runtime') { } } +project(':iceberg-flink') { + apply plugin: 'com.commercehub.gradle.plugin.avro' + + dependencies { + compile project(':iceberg-data') + compile project(path: ':iceberg-data', configuration: 'testArtifacts') // TODO: to remove, for RandomGenericData + compile project(':iceberg-hive') + compile project(':iceberg-parquet') + + compileOnly("org.apache.flink:flink-streaming-java_2.11") { + exclude group: 'com.esotericsoftware.kryo', module: 'kryo' + exclude group: 'com.github.scopt', module: 'scopt_2.11' + exclude group: 'com.typesafe' + exclude group: 'com.typesafe.akka' + exclude group: 'org.clapper', module: 'grizzled-slf4j_2.11' + exclude group: 'org.reactivestreams', module: 'reactive-streams' + } + compileOnly("org.apache.flink:flink-connector-filesystem_2.11") + compileOnly("org.apache.hadoop:hadoop-client") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + compileOnly("org.apache.hive:hive-metastore") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.apache.hbase' + exclude group: 'org.apache.logging.log4j' + exclude group: 'co.cask.tephra' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all' + exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet' + exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle' + exclude group: 'com.tdunning', module: 'json' + exclude group: 'javax.transaction', module: 'transaction-api' + exclude group: 'com.zaxxer', module: 'HikariCP' + } + } +} + +// the runtime uber jar for Flink +project(':iceberg-flink-runtime') { + apply plugin: 'com.github.johnrengelman.shadow' + + tasks.jar.dependsOn tasks.shadowJar + + configurations { + compile { + exclude group: 'org.slf4j' + exclude group: 'org.apache.commons' + exclude group: 'commons-pool' + exclude group: 'org.xerial.snappy' + exclude group: 'javax.annotation' + exclude group: 'com.google.errorprone' + } + } + + dependencies { + compile project(':iceberg-flink') + } + + shadowJar { + configurations = [project.configurations.compile] + + zip64 true + + // include the LICENSE and NOTICE files for the shaded Jar + from(projectDir) { + include 'LICENSE' + include 'NOTICE' + } + + // Relocate dependencies to avoid conflicts + relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml' + relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes' + relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework' + relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' + relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded' // TODO: not used + relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer' // TODO: not used + relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' + relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' // TODO: not used + // relocate Avro's jackson dependency to share parquet-jackson locations + // TODO: not used + relocate 'org.codehaus.jackson', 'org.apache.iceberg.shaded.org.apache.parquet.shaded.org.codehaus.jackson' + relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' // of help when ORC is supported + relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' // TODO: not used + // relocate Arrow and related deps to shade Iceberg specific version + relocate 'io.netty.buffer', 'org.apache.iceberg.shaded.io.netty.buffer' // TODO: not used + relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow' // TODO: not used + relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch' // TODO: not used + + classifier null + } + + jar { + classifier = 'empty' + } +} + @Memoized boolean isVersionFileExists() { return file('version.txt').exists() diff --git a/dev/source-release.sh b/dev/source-release.sh index 26ff039f9885..b96822f2a49d 100755 --- a/dev/source-release.sh +++ b/dev/source-release.sh @@ -64,7 +64,7 @@ tarball=$tag.tar.gz # be conservative and use the release hash, even though git produces the same # archive (identical hashes) using the scm tag -git archive $release_hash --prefix $tag/ -o $tarball .baseline api arrow common core data dev gradle gradlew hive mr orc parquet pig project spark spark-runtime LICENSE NOTICE README.md build.gradle baseline.gradle deploy.gradle tasks.gradle jmh.gradle gradle.properties settings.gradle versions.lock versions.props version.txt +git archive $release_hash --prefix $tag/ -o $tarball .baseline api arrow common core data dev gradle gradlew flink flink-runtime hive mr orc parquet pig project spark spark-runtime LICENSE NOTICE README.md build.gradle baseline.gradle deploy.gradle tasks.gradle jmh.gradle gradle.properties settings.gradle versions.lock versions.props version.txt # sign the archive gpg --armor --output ${tarball}.asc --detach-sig $tarball diff --git a/flink-runtime/LICENSE b/flink-runtime/LICENSE new file mode 100644 index 000000000000..e5fa87dd6e76 --- /dev/null +++ b/flink-runtime/LICENSE @@ -0,0 +1,543 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +-------------------------------------------------------------------------------- + +This binary artifact contains Apache Avro. + +Copyright: 2014-2017 The Apache Software Foundation. +Home page: https://parquet.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains the Jackson JSON processor. + +Copyright: 2007-2019 Tatu Saloranta and other contributors +Home page: http://jackson.codehaus.org/ +License: http://www.apache.org/licenses/LICENSE-2.0.txt + +-------------------------------------------------------------------------------- + +This binary artifact contains Paranamer. + +Copyright: 2000-2007 INRIA, France Telecom, 2006-2018 Paul Hammant & ThoughtWorks Inc +Home page: https://github.com/paul-hammant/paranamer +License: https://github.com/paul-hammant/paranamer/blob/master/LICENSE.txt (BSD) + +License text: +| Portions copyright (c) 2006-2018 Paul Hammant & ThoughtWorks Inc +| Portions copyright (c) 2000-2007 INRIA, France Telecom +| All rights reserved. +| +| Redistribution and use in source and binary forms, with or without +| modification, are permitted provided that the following conditions +| are met: +| 1. Redistributions of source code must retain the above copyright +| notice, this list of conditions and the following disclaimer. +| 2. Redistributions in binary form must reproduce the above copyright +| notice, this list of conditions and the following disclaimer in the +| documentation and/or other materials provided with the distribution. +| 3. Neither the name of the copyright holders nor the names of its +| contributors may be used to endorse or promote products derived from +| this software without specific prior written permission. +| +| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +| AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +| IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +| ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +| LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +| CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +| SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +| INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +| CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +| ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +| THE POSSIBILITY OF SUCH DAMAGE. + +-------------------------------------------------------------------------------- + +This binary artifact contains Apache Parquet. + +Copyright: 2014-2017 The Apache Software Foundation. +Home page: https://parquet.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Apache Thrift. + +Copyright: 2006-2010 The Apache Software Foundation. +Home page: https://thrift.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains code from Daniel Lemire's JavaFastPFOR project. + +Copyright: 2013 Daniel Lemire +Home page: https://github.com/lemire/JavaFastPFOR +License: Apache License Version 2.0 http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains fastutil. + +Copyright: 2002-2014 Sebastiano Vigna +Home page: http://fasutil.di.unimi.it/ +License: http://www.apache.org/licenses/LICENSE-2.0.html + +-------------------------------------------------------------------------------- + +This binary artifact contains Apache ORC. + +Copyright: 2013-2019 The Apache Software Foundation. +Home page: https://orc.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Apache Hive's storage API via ORC. + +Copyright: 2013-2019 The Apache Software Foundation. +Home page: https://hive.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Google protobuf via ORC. + +Copyright: 2008 Google Inc. +Home page: https://developers.google.com/protocol-buffers +License: https://github.com/protocolbuffers/protobuf/blob/master/LICENSE (BSD) + +License text: + +| Copyright 2008 Google Inc. All rights reserved. +| +| Redistribution and use in source and binary forms, with or without +| modification, are permitted provided that the following conditions are +| met: +| +| * Redistributions of source code must retain the above copyright +| notice, this list of conditions and the following disclaimer. +| * Redistributions in binary form must reproduce the above +| copyright notice, this list of conditions and the following disclaimer +| in the documentation and/or other materials provided with the +| distribution. +| * Neither the name of Google Inc. nor the names of its +| contributors may be used to endorse or promote products derived from +| this software without specific prior written permission. +| +| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +| "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +| LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +| A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +| OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +| SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +| LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +| DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +| THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +| (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +| OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +| +| Code generated by the Protocol Buffer compiler is owned by the owner +| of the input file used when generating it. This code is not +| standalone and requires a support library to be linked with it. This +| support library is itself covered by the above license. + +-------------------------------------------------------------------------------- + +This binary artifact contains Airlift Aircompressor. + +Copyright: 2011-2019 Aircompressor authors. +Home page: https://github.com/airlift/aircompressor +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Airlift Slice. + +Copyright: 2013-2019 Slice authors. +Home page: https://github.com/airlift/slice +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains JetBrains annotations. + +Copyright: 2000-2020 JetBrains s.r.o. +Home page: https://github.com/JetBrains/java-annotations +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains code from Cloudera Kite. + +Copyright: 2013-2017 Cloudera Inc. +Home page: https://kitesdk.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains code from Presto. + +Copyright: 2016 Facebook and contributors +Home page: https://prestodb.io/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Google Guava. + +Copyright: 2006-2019 The Guava Authors +Home page: https://github.com/google/guava +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Google Error Prone Annotations. + +Copyright: Copyright 2011-2019 The Error Prone Authors +Home page: https://github.com/google/error-prone +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains findbugs-annotations by Stephen Connolly. + +Copyright: 2011-2016 Stephen Connolly, Greg Lucas +Home page: https://github.com/stephenc/findbugs-annotations +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Google j2objc Annotations. + +Copyright: Copyright 2012-2018 Google Inc. +Home page: https://github.com/google/j2objc/tree/master/annotations +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains checkerframework checker-qual Annotations. + +Copyright: 2004-2019 the Checker Framework developers +Home page: https://github.com/typetools/checker-framework +License: https://github.com/typetools/checker-framework/blob/master/LICENSE.txt (MIT license) + +License text: +| The annotations are licensed under the MIT License. (The text of this +| license appears below.) More specifically, all the parts of the Checker +| Framework that you might want to include with your own program use the +| MIT License. This is the checker-qual.jar file and all the files that +| appear in it: every file in a qual/ directory, plus utility files such +| as NullnessUtil.java, RegexUtil.java, SignednessUtil.java, etc. +| In addition, the cleanroom implementations of third-party annotations, +| which the Checker Framework recognizes as aliases for its own +| annotations, are licensed under the MIT License. +| +| Permission is hereby granted, free of charge, to any person obtaining a copy +| of this software and associated documentation files (the "Software"), to deal +| in the Software without restriction, including without limitation the rights +| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +| copies of the Software, and to permit persons to whom the Software is +| furnished to do so, subject to the following conditions: +| +| The above copyright notice and this permission notice shall be included in +| all copies or substantial portions of the Software. +| +| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +| THE SOFTWARE. + +-------------------------------------------------------------------------------- + +This binary artifact contains Animal Sniffer Annotations. + +Copyright: 2009-2018 codehaus.org +Home page: https://www.mojohaus.org/animal-sniffer/animal-sniffer-annotations/ +License: https://www.mojohaus.org/animal-sniffer/animal-sniffer-annotations/license.html (MIT license) + +License text: +| The MIT License +| +| Copyright (c) 2009 codehaus.org. +| +| Permission is hereby granted, free of charge, to any person obtaining a copy +| of this software and associated documentation files (the "Software"), to deal +| in the Software without restriction, including without limitation the rights +| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +| copies of the Software, and to permit persons to whom the Software is +| furnished to do so, subject to the following conditions: +| +| The above copyright notice and this permission notice shall be included in +| all copies or substantial portions of the Software. +| +| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +| THE SOFTWARE. + +-------------------------------------------------------------------------------- + +This binary artifact contains Caffeine by Ben Manes. + +Copyright: 2014-2019 Ben Manes and contributors +Home page: https://github.com/ben-manes/caffeine +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Apache Arrow. + +Copyright: 2016-2019 The Apache Software Foundation. +Home page: https://arrow.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Netty's buffer library. + +Copyright: 2014-2020 The Netty Project +Home page: https://netty.io/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Google FlatBuffers. + +Copyright: 2013-2020 Google Inc. +Home page: https://google.github.io/flatbuffers/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Carrot Search Labs HPPC. + +Copyright: 2002-2019 Carrot Search s.c. +Home page: http://labs.carrotsearch.com/hppc.html +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains code from Apache Lucene via Carrot Search HPPC. + +Copyright: 2011-2020 The Apache Software Foundation. +Home page: https://lucene.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This binary artifact contains Apache Yetus audience annotations. + +Copyright: 2008-2020 The Apache Software Foundation. +Home page: https://yetus.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + diff --git a/flink-runtime/NOTICE b/flink-runtime/NOTICE new file mode 100644 index 000000000000..c9e00cc85528 --- /dev/null +++ b/flink-runtime/NOTICE @@ -0,0 +1,495 @@ + +Apache Iceberg (incubating) +Copyright 2017-2020 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +-------------------------------------------------------------------------------- + +This binary artifact contains code from Kite, developed at Cloudera, Inc. with +the following copyright notice: + +| Copyright 2013 Cloudera Inc. +| +| Licensed under the Apache License, Version 2.0 (the "License"); +| you may not use this file except in compliance with the License. +| You may obtain a copy of the License at +| +| http://www.apache.org/licenses/LICENSE-2.0 +| +| Unless required by applicable law or agreed to in writing, software +| distributed under the License is distributed on an "AS IS" BASIS, +| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +| See the License for the specific language governing permissions and +| limitations under the License. + +-------------------------------------------------------------------------------- + +This binary artifact includes Apache ORC with the following in its NOTICE file: + +| Apache ORC +| Copyright 2013-2019 The Apache Software Foundation +| +| This product includes software developed by The Apache Software +| Foundation (http://www.apache.org/). +| +| This product includes software developed by Hewlett-Packard: +| (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P + +-------------------------------------------------------------------------------- + +This binary artifact includes Airlift Aircompressor with the following in its +NOTICE file: + +| Snappy Copyright Notices +| ========================= +| +| * Copyright 2011 Dain Sundstrom +| * Copyright 2011, Google Inc. +| +| +| Snappy License +| =============== +| Copyright 2011, Google Inc. +| All rights reserved. +| +| Redistribution and use in source and binary forms, with or without +| modification, are permitted provided that the following conditions are +| met: +| +| * Redistributions of source code must retain the above copyright +| notice, this list of conditions and the following disclaimer. +| * Redistributions in binary form must reproduce the above +| copyright notice, this list of conditions and the following disclaimer +| in the documentation and/or other materials provided with the +| distribution. +| * Neither the name of Google Inc. nor the names of its +| contributors may be used to endorse or promote products derived from +| this software without specific prior written permission. +| +| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +| "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +| LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +| A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +| OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +| SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +| LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +| DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +| THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +| (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +| OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +-------------------------------------------------------------------------------- + +This binary artifact includes Carrot Search Labs HPPC with the following in its +NOTICE file: + +| ACKNOWLEDGEMENT +| =============== +| +| HPPC borrowed code, ideas or both from: +| +| * Apache Lucene, http://lucene.apache.org/ +| (Apache license) +| * Fastutil, http://fastutil.di.unimi.it/ +| (Apache license) +| * Koloboke, https://github.com/OpenHFT/Koloboke +| (Apache license) + +-------------------------------------------------------------------------------- + +This binary artifact includes Apache Yetus with the following in its NOTICE +file: + +| Apache Yetus +| Copyright 2008-2020 The Apache Software Foundation +| +| This product includes software developed at +| The Apache Software Foundation (https://www.apache.org/). +| +| --- +| Additional licenses for the Apache Yetus Source/Website: +| --- +| +| +| See LICENSE for terms. + +-------------------------------------------------------------------------------- + +This binary artifact includes Google Protobuf with the following copyright +notice: + +| Copyright 2008 Google Inc. All rights reserved. +| +| Redistribution and use in source and binary forms, with or without +| modification, are permitted provided that the following conditions are +| met: +| +| * Redistributions of source code must retain the above copyright +| notice, this list of conditions and the following disclaimer. +| * Redistributions in binary form must reproduce the above +| copyright notice, this list of conditions and the following disclaimer +| in the documentation and/or other materials provided with the +| distribution. +| * Neither the name of Google Inc. nor the names of its +| contributors may be used to endorse or promote products derived from +| this software without specific prior written permission. +| +| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +| "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +| LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +| A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +| OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +| SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +| LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +| DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +| THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +| (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +| OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +| +| Code generated by the Protocol Buffer compiler is owned by the owner +| of the input file used when generating it. This code is not +| standalone and requires a support library to be linked with it. This +| support library is itself covered by the above license. + +This binary artifact includes Apache Arrow with the following in its NOTICE file: + +| Apache Arrow +| Copyright 2016-2019 The Apache Software Foundation +| +| This product includes software developed at +| The Apache Software Foundation (http://www.apache.org/). +| +| This product includes software from the SFrame project (BSD, 3-clause). +| * Copyright (C) 2015 Dato, Inc. +| * Copyright (c) 2009 Carnegie Mellon University. +| +| This product includes software from the Feather project (Apache 2.0) +| https://github.com/wesm/feather +| +| This product includes software from the DyND project (BSD 2-clause) +| https://github.com/libdynd +| +| This product includes software from the LLVM project +| * distributed under the University of Illinois Open Source +| +| This product includes software from the google-lint project +| * Copyright (c) 2009 Google Inc. All rights reserved. +| +| This product includes software from the mman-win32 project +| * Copyright https://code.google.com/p/mman-win32/ +| * Licensed under the MIT License; +| +| This product includes software from the LevelDB project +| * Copyright (c) 2011 The LevelDB Authors. All rights reserved. +| * Use of this source code is governed by a BSD-style license that can be +| * Moved from Kudu http://github.com/cloudera/kudu +| +| This product includes software from the CMake project +| * Copyright 2001-2009 Kitware, Inc. +| * Copyright 2012-2014 Continuum Analytics, Inc. +| * All rights reserved. +| +| This product includes software from https://github.com/matthew-brett/multibuild (BSD 2-clause) +| * Copyright (c) 2013-2016, Matt Terry and Matthew Brett; all rights reserved. +| +| This product includes software from the Ibis project (Apache 2.0) +| * Copyright (c) 2015 Cloudera, Inc. +| * https://github.com/cloudera/ibis +| +| This product includes software from Dremio (Apache 2.0) +| * Copyright (C) 2017-2018 Dremio Corporation +| * https://github.com/dremio/dremio-oss +| +| This product includes software from Google Guava (Apache 2.0) +| * Copyright (C) 2007 The Guava Authors +| * https://github.com/google/guava +| +| This product include software from CMake (BSD 3-Clause) +| * CMake - Cross Platform Makefile Generator +| * Copyright 2000-2019 Kitware, Inc. and Contributors +| +| The web site includes files generated by Jekyll. +| +| -------------------------------------------------------------------------------- +| +| This product includes code from Apache Kudu, which includes the following in +| its NOTICE file: +| +| Apache Kudu +| Copyright 2016 The Apache Software Foundation +| +| This product includes software developed at +| The Apache Software Foundation (http://www.apache.org/). +| +| Portions of this software were developed at +| Cloudera, Inc (http://www.cloudera.com/). +| +| -------------------------------------------------------------------------------- +| +| This product includes code from Apache ORC, which includes the following in +| its NOTICE file: +| +| Apache ORC +| Copyright 2013-2019 The Apache Software Foundation +| +| This product includes software developed by The Apache Software +| Foundation (http://www.apache.org/). +| +| This product includes software developed by Hewlett-Packard: +| (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P + +-------------------------------------------------------------------------------- + +This binary artifact includes Netty buffers with the following in its NOTICE +file: + +| The Netty Project +| ================= +| +| Please visit the Netty web site for more information: +| +| * https://netty.io/ +| +| Copyright 2014 The Netty Project +| +| The Netty Project licenses this file to you under the Apache License, +| version 2.0 (the "License"); you may not use this file except in compliance +| with the License. You may obtain a copy of the License at: +| +| http://www.apache.org/licenses/LICENSE-2.0 +| +| Unless required by applicable law or agreed to in writing, software +| distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +| WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +| License for the specific language governing permissions and limitations +| under the License. +| +| Also, please refer to each LICENSE..txt file, which is located in +| the 'license' directory of the distribution file, for the license terms of the +| components that this product depends on. +| +| ------------------------------------------------------------------------------- +| This product contains the extensions to Java Collections Framework which has +| been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: +| +| * LICENSE: +| * license/LICENSE.jsr166y.txt (Public Domain) +| * HOMEPAGE: +| * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ +| * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ +| +| This product contains a modified version of Robert Harder's Public Domain +| Base64 Encoder and Decoder, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.base64.txt (Public Domain) +| * HOMEPAGE: +| * http://iharder.sourceforge.net/current/java/base64/ +| +| This product contains a modified portion of 'Webbit', an event based +| WebSocket and HTTP server, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.webbit.txt (BSD License) +| * HOMEPAGE: +| * https://github.com/joewalnes/webbit +| +| This product contains a modified portion of 'SLF4J', a simple logging +| facade for Java, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.slf4j.txt (MIT License) +| * HOMEPAGE: +| * http://www.slf4j.org/ +| +| This product contains a modified portion of 'Apache Harmony', an open source +| Java SE, which can be obtained at: +| +| * NOTICE: +| * license/NOTICE.harmony.txt +| * LICENSE: +| * license/LICENSE.harmony.txt (Apache License 2.0) +| * HOMEPAGE: +| * http://archive.apache.org/dist/harmony/ +| +| This product contains a modified portion of 'jbzip2', a Java bzip2 compression +| and decompression library written by Matthew J. Francis. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.jbzip2.txt (MIT License) +| * HOMEPAGE: +| * https://code.google.com/p/jbzip2/ +| +| This product contains a modified portion of 'libdivsufsort', a C API library to construct +| the suffix array and the Burrows-Wheeler transformed string for any input string of +| a constant-size alphabet written by Yuta Mori. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.libdivsufsort.txt (MIT License) +| * HOMEPAGE: +| * https://github.com/y-256/libdivsufsort +| +| This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, +| which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.jctools.txt (ASL2 License) +| * HOMEPAGE: +| * https://github.com/JCTools/JCTools +| +| This product optionally depends on 'JZlib', a re-implementation of zlib in +| pure Java, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.jzlib.txt (BSD style License) +| * HOMEPAGE: +| * http://www.jcraft.com/jzlib/ +| +| This product optionally depends on 'Compress-LZF', a Java library for encoding and +| decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.compress-lzf.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://github.com/ning/compress +| +| This product optionally depends on 'lz4', a LZ4 Java compression +| and decompression library written by Adrien Grand. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.lz4.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://github.com/jpountz/lz4-java +| +| This product optionally depends on 'lzma-java', a LZMA Java compression +| and decompression library, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.lzma-java.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://github.com/jponge/lzma-java +| +| This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression +| and decompression library written by William Kinney. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.jfastlz.txt (MIT License) +| * HOMEPAGE: +| * https://code.google.com/p/jfastlz/ +| +| This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data +| interchange format, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.protobuf.txt (New BSD License) +| * HOMEPAGE: +| * https://github.com/google/protobuf +| +| This product optionally depends on 'Bouncy Castle Crypto APIs' to generate +| a temporary self-signed X.509 certificate when the JVM does not provide the +| equivalent functionality. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.bouncycastle.txt (MIT License) +| * HOMEPAGE: +| * http://www.bouncycastle.org/ +| +| This product optionally depends on 'Snappy', a compression library produced +| by Google Inc, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.snappy.txt (New BSD License) +| * HOMEPAGE: +| * https://github.com/google/snappy +| +| This product optionally depends on 'JBoss Marshalling', an alternative Java +| serialization API, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.jboss-marshalling.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://github.com/jboss-remoting/jboss-marshalling +| +| This product optionally depends on 'Caliper', Google's micro- +| benchmarking framework, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.caliper.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://github.com/google/caliper +| +| This product optionally depends on 'Apache Commons Logging', a logging +| framework, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.commons-logging.txt (Apache License 2.0) +| * HOMEPAGE: +| * http://commons.apache.org/logging/ +| +| This product optionally depends on 'Apache Log4J', a logging framework, which +| can be obtained at: +| +| * LICENSE: +| * license/LICENSE.log4j.txt (Apache License 2.0) +| * HOMEPAGE: +| * http://logging.apache.org/log4j/ +| +| This product optionally depends on 'Aalto XML', an ultra-high performance +| non-blocking XML processor, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.aalto-xml.txt (Apache License 2.0) +| * HOMEPAGE: +| * http://wiki.fasterxml.com/AaltoHome +| +| This product contains a modified version of 'HPACK', a Java implementation of +| the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.hpack.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://github.com/twitter/hpack +| +| This product contains a modified version of 'HPACK', a Java implementation of +| the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.hyper-hpack.txt (MIT License) +| * HOMEPAGE: +| * https://github.com/python-hyper/hpack/ +| +| This product contains a modified version of 'HPACK', a Java implementation of +| the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at: +| +| * LICENSE: +| * license/LICENSE.nghttp2-hpack.txt (MIT License) +| * HOMEPAGE: +| * https://github.com/nghttp2/nghttp2/ +| +| This product contains a modified portion of 'Apache Commons Lang', a Java library +| provides utilities for the java.lang API, which can be obtained at: +| +| * LICENSE: +| * license/LICENSE.commons-lang.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://commons.apache.org/proper/commons-lang/ +| +| +| This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build. +| +| * LICENSE: +| * license/LICENSE.mvn-wrapper.txt (Apache License 2.0) +| * HOMEPAGE: +| * https://github.com/takari/maven-wrapper +| +| This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS. +| This private header is also used by Apple's open source +| mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/). +| +| * LICENSE: +| * license/LICENSE.dnsinfo.txt (Apache License 2.0) +| * HOMEPAGE: +| * http://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h + diff --git a/flink/src/main/avro/CommitMetadata.avsc b/flink/src/main/avro/CommitMetadata.avsc new file mode 100644 index 000000000000..84006798b25a --- /dev/null +++ b/flink/src/main/avro/CommitMetadata.avsc @@ -0,0 +1,11 @@ +{ + "type": "record", + "name": "CommitMetadata", + "namespace": "org.apache.iceberg.flink.connector.model", + "fields":[ + { "name":"lastCheckpointId", "type": "long"}, + { "name":"lastCheckpointTimestamp", "type": "long"}, + { "name":"lastCommitTimestamp", "type": "long"}, + { "name":"watermark", "type": ["null", "long"], "default": null} + ] +} \ No newline at end of file diff --git a/flink/src/main/avro/ManifestFileState.avsc b/flink/src/main/avro/ManifestFileState.avsc new file mode 100644 index 000000000000..af255f9a2e7c --- /dev/null +++ b/flink/src/main/avro/ManifestFileState.avsc @@ -0,0 +1,17 @@ +{ + "type": "record", + "name": "ManifestFileState", + "namespace": "org.apache.iceberg.flink.connector.model", + "fields": [ + {"name":"path", "type":"string"}, + {"name":"length", "type":"long"}, + {"name":"specId", "type":"int"}, + { "name":"checkpointId", "type": "long"}, + { "name":"checkpointTimestamp", "type": "long"}, + { "name":"dataFileCount", "type": "long"}, + { "name":"recordCount", "type": "long"}, + { "name":"byteCount", "type": "long"}, + { "name":"lowWatermark", "type": ["null", "long"], "default": null}, + { "name":"highWatermark", "type": ["null", "long"], "default": null} + ] +} \ No newline at end of file diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/IcebergConnectorConstant.java b/flink/src/main/java/org/apache/iceberg/flink/connector/IcebergConnectorConstant.java new file mode 100644 index 000000000000..7d52b99a3a31 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/IcebergConnectorConstant.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector; + +import java.util.concurrent.TimeUnit; + +public class IcebergConnectorConstant { + private IcebergConnectorConstant() {} + + public static final String TYPE = "iceberg"; + + public static final String ICEBERG_APP_TYPE = "flink"; + + public static final String CATALOG_TYPE = "catalog_type"; + public static final String HIVE_CATALOG = "HIVE"; + public static final String HADOOP_CATALOG = "HADOOP"; + public static final String CATALOG_TYPE_DEFAULT = HIVE_CATALOG; + public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "hadoop_catalog_warehouse_location"; + + public static final String NAMESPACE = "namespace"; + public static final String TABLE = "table"; + + public static final String WATERMARK_TIMESTAMP_FIELD = "watermark_timestamp_field"; + public static final String WATERMARK_TIMESTAMP_UNIT = "watermark_timestamp_unit"; + public static final String DEFAULT_WATERMARK_TIMESTAMP_UNIT = TimeUnit.MILLISECONDS.name(); + + public static final String SKIP_INCOMPATIBLE_RECORD = "skip_incompatible_record"; + + public static final String SNAPSHOT_RETENTION_HOURS = "snapshot_retention_hours"; + public static final String COMMIT_RESTORED_MANIFEST_FILES = "commit_restored_manifest_files"; + public static final String MAX_FILE_SIZE = "max_file_size"; + + public static final boolean DEFAULT_SKIP_INCOMPATIBLE_RECORD = false; + + public static final long DEFAULT_SNAPSHOT_RETENTION_HOURS = 70; + public static final boolean DEFAULT_COMMIT_RESTORED_MANIFEST_FILES = true; + public static final long DEFAULT_MAX_FILE_SIZE = 1024L * 1024L * 1024L * 4; + + public static final String SINK_TAG_KEY = "sink"; + public static final String OUTPUT_TAG_KEY = "output"; + public static final String OUTPUT_CLUSTER_TAG_KEY = "outputCluster"; + + public static final String SUBTASK_ID = "subtask_id"; + public static final String EXCEPTION_CLASS = "exception_class"; +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/model/CommitMetadataUtil.java b/flink/src/main/java/org/apache/iceberg/flink/connector/model/CommitMetadataUtil.java new file mode 100644 index 000000000000..5627e0ef8f77 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/model/CommitMetadataUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.model; + +import java.io.ByteArrayOutputStream; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Companion class of CommitMetadata + */ +public class CommitMetadataUtil { + private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataUtil.class); + private static final DatumWriter DATUM_WRITER = new SpecificDatumWriter<>(CommitMetadata.class); + + private CommitMetadataUtil() {} + + public static String encodeAsJson(CommitMetadata metadata) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try { + Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(CommitMetadata.getClassSchema(), outputStream); + DATUM_WRITER.write(metadata, jsonEncoder); + jsonEncoder.flush(); + return new String(outputStream.toByteArray()); + } catch (Exception e) { + LOG.error("failed to encode metadata to JSON", e); + return ""; + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/model/FlinkManifestFile.java b/flink/src/main/java/org/apache/iceberg/flink/connector/model/FlinkManifestFile.java new file mode 100644 index 000000000000..00ca477cdc87 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/model/FlinkManifestFile.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.model; + +import org.apache.iceberg.ManifestFile; + +public interface FlinkManifestFile extends ManifestFile { + + long checkpointId(); + + long checkpointTimestamp(); + + long dataFileCount(); + + long recordCount(); + + long byteCount(); + + Long lowWatermark(); + + Long highWatermark(); + + /** + * if implementation of this method changed, + * it may affect de-dup check and cause the same manifest file be committed twice. + */ + String hash(); + + ManifestFileState toState(); +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/model/FlinkManifestFileUtil.java b/flink/src/main/java/org/apache/iceberg/flink/connector/model/FlinkManifestFileUtil.java new file mode 100644 index 000000000000..307abc3ab0fc --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/model/FlinkManifestFileUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.model; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Strings; + +public class FlinkManifestFileUtil { + private FlinkManifestFileUtil() {} + + public static long getDataFileCount(List flinkManifestFiles) { + return flinkManifestFiles.stream().map(FlinkManifestFile::dataFileCount) + .collect(Collectors.summingLong(l -> l)); + } + + public static long getRecordCount(List flinkManifestFiles) { + return flinkManifestFiles.stream().map(FlinkManifestFile::recordCount) + .collect(Collectors.summingLong(l -> l)); + } + + public static long getByteCount(List flinkManifestFiles) { + return flinkManifestFiles.stream().map(FlinkManifestFile::byteCount) + .collect(Collectors.summingLong(l -> l)); + } + + public static Long getLowWatermark(List flinkManifestFiles) { + Long min = null; + for (FlinkManifestFile flinkManifestFile : flinkManifestFiles) { + Long curr = flinkManifestFile.lowWatermark(); + if ((null == min) || (null != curr && min > curr)) { + min = curr; + } + } + return min; + } + + public static Long getHighWatermark(List flinkManifestFiles) { + Long max = null; + for (FlinkManifestFile flinkManifestFile : flinkManifestFiles) { + Long curr = flinkManifestFile.highWatermark(); + if ((null == max) || (null != curr && max < curr)) { + max = curr; + } + } + return max; + } + + public static String hashesListToString(List hashes) { + return Joiner.on(",").join(hashes); + } + + public static List hashesStringToList(String hashesStr) { + if (Strings.isNullOrEmpty(hashesStr)) { + return Collections.emptyList(); + } else { + return Arrays.asList(hashesStr.split(",")); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/model/GenericFlinkManifestFile.java b/flink/src/main/java/org/apache/iceberg/flink/connector/model/GenericFlinkManifestFile.java new file mode 100644 index 000000000000..7493e90e9ac6 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/model/GenericFlinkManifestFile.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.model; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Objects; +import org.apache.commons.codec.binary.Hex; +import org.apache.iceberg.ManifestContent; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +public class GenericFlinkManifestFile implements FlinkManifestFile { + + private final String path; + private final long length; + private final int specId; + private final long checkpointId; + private final long checkpointTimestamp; + private final long dataFileCount; + private final long recordCount; + private final long byteCount; + private final Long lowWatermark; + private final Long highWatermark; + + public static class Builder { + + private String path; + private long length; + private int specId; + private long checkpointId; + private long checkpointTimestamp; + private long dataFileCount; + private long recordCount; + private long byteCount; + private Long lowWatermark; + private Long highWatermark; + + private Builder() { + + } + + public Builder setPath(String path) { + this.path = path; + return this; + } + + public Builder setLength(long length) { + this.length = length; + return this; + } + + public Builder setSpecId(int specId) { + this.specId = specId; + return this; + } + + public Builder setCheckpointId(long checkpointId) { + this.checkpointId = checkpointId; + return this; + } + + public Builder setCheckpointTimestamp(long checkpointTimestamp) { + this.checkpointTimestamp = checkpointTimestamp; + return this; + } + + public Builder setDataFileCount(long dataFileCount) { + this.dataFileCount = dataFileCount; + return this; + } + + public Builder setRecordCount(long recordCount) { + this.recordCount = recordCount; + return this; + } + + public Builder setByteCount(long byteCount) { + this.byteCount = byteCount; + return this; + } + + public Builder setLowWatermark(Long lowWatermark) { + this.lowWatermark = lowWatermark; + return this; + } + + public Builder setHighWatermark(Long highWatermark) { + this.highWatermark = highWatermark; + return this; + } + + public GenericFlinkManifestFile build() { + return new GenericFlinkManifestFile(this); + } + } + + public static Builder builder() { + return new Builder(); + } + + private GenericFlinkManifestFile(Builder builder) { + path = builder.path; + length = builder.length; + specId = builder.specId; + checkpointId = builder.checkpointId; + checkpointTimestamp = builder.checkpointTimestamp; + dataFileCount = builder.dataFileCount; + recordCount = builder.recordCount; + byteCount = builder.byteCount; + lowWatermark = builder.lowWatermark; + highWatermark = builder.highWatermark; + } + + + public static GenericFlinkManifestFile fromState(ManifestFileState state) { + return GenericFlinkManifestFile.builder() + .setPath(state.getPath().toString()) + .setLength(state.getLength()) + .setSpecId(state.getSpecId()) + .setCheckpointId(state.getCheckpointId()) + .setCheckpointTimestamp(state.getCheckpointTimestamp()) + .setDataFileCount(state.getDataFileCount()) + .setRecordCount(state.getRecordCount()) + .setByteCount(state.getByteCount()) + .setLowWatermark(state.getLowWatermark()) + .setHighWatermark(state.getHighWatermark()) + .build(); + } + + public ManifestFileState toState() { + return ManifestFileState.newBuilder() + .setPath(path) + .setLength(length) + .setSpecId(specId) + .setCheckpointId(checkpointId) + .setCheckpointTimestamp(checkpointTimestamp) + .setDataFileCount(dataFileCount) + .setRecordCount(recordCount) + .setByteCount(byteCount) + .setLowWatermark(lowWatermark) + .setHighWatermark(highWatermark) + .build(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("path", path) + .add("length", length) + .add("specId", specId) + .add("checkpointId", checkpointId) + .add("checkpointTimestamp", checkpointTimestamp) + .add("dataFileCount", dataFileCount) + .add("recordCount", recordCount) + .add("byteCount", byteCount) + .add("lowWatermark", lowWatermark) + .add("highWatermark", highWatermark) + .add("hash", hash()) + .toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final GenericFlinkManifestFile other = (GenericFlinkManifestFile) obj; + return Objects.equals(this.path, other.path) && + Objects.equals(this.length, other.length) && + Objects.equals(this.specId, other.specId) && + Objects.equals(this.checkpointId, other.checkpointId) && + Objects.equals(this.checkpointTimestamp, other.checkpointTimestamp) && + Objects.equals(this.dataFileCount, other.dataFileCount) && + Objects.equals(this.recordCount, other.recordCount) && + Objects.equals(this.lowWatermark, other.lowWatermark) && + Objects.equals(this.highWatermark, other.highWatermark); + } + + @Override + public int hashCode() { + final int prime = 31; + + int result = path == null ? 0 : path.hashCode(); + result = prime * result + (int) (length ^ (length >>> 32)); + result = prime * result + specId; + result = prime * result + (int) (checkpointId ^ (checkpointId >>> 32)); + result = prime * result + (int) (checkpointTimestamp ^ (checkpointTimestamp >>> 32)); + result = prime * result + (int) (dataFileCount ^ (dataFileCount >>> 32)); + result = prime * result + (int) (recordCount ^ (recordCount >>> 32)); + result = prime * result + (int) (lowWatermark ^ (lowWatermark >>> 32)); + result = prime * result + (int) (highWatermark ^ (highWatermark >>> 32)); + + return result; + } + + @Override + public String path() { + return path; + } + + @Override + public long length() { + return length; + } + + @Override + public int partitionSpecId() { + return specId; + } + + @Override + public Long snapshotId() { + return null; + } + + @Override + public Integer addedFilesCount() { + return null; + } + + // TODO: return null? + @Override + public Long addedRowsCount() { + return recordCount; + } + + @Override + public Integer existingFilesCount() { + return Integer.valueOf(0); + } + + @Override + public Long existingRowsCount() { + return Long.valueOf(0); + } + + @Override + public Integer deletedFilesCount() { + return Integer.valueOf(0); + } + + @Override + public Long deletedRowsCount() { + return Long.valueOf(0); + } + + @Override + public List partitions() { + return null; + } + + @Override + public ManifestFile copy() { + return null; + } + + @Override + public long checkpointId() { + return checkpointId; + } + + @Override + public long checkpointTimestamp() { + return checkpointTimestamp; + } + + @Override + public long dataFileCount() { + return dataFileCount; + } + + @Override + public long recordCount() { + return recordCount; + } + + @Override + public long byteCount() { + return byteCount; + } + + @Override + public Long lowWatermark() { + return lowWatermark; + } + + @Override + public Long highWatermark() { + return highWatermark; + } + + @Override + public String hash() { + try { + MessageDigest messageDigest = MessageDigest.getInstance("SHA-1"); + messageDigest.update(path().getBytes()); + byte[] md = messageDigest.digest(); + return Hex.encodeHexString(md); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("failed to create digest", e); + } + } + + @Override + public long sequenceNumber() { + return 0; // TODO + } + + @Override + public long minSequenceNumber() { + return 0; // TODO + } + + @Override + public ManifestContent content() { + return ManifestContent.DATA; // TODO + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/AbstractPartitioner.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/AbstractPartitioner.java new file mode 100644 index 000000000000..bd56f4eb2681 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/AbstractPartitioner.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.util.Arrays; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.transforms.Transform; + +@SuppressWarnings("checkstyle:VisibilityModifier") +public abstract class AbstractPartitioner implements Partitioner { + + protected final PartitionSpec spec; + protected final int size; + protected final Transform[] transforms; + protected final Object[] partitionTuple; + + AbstractPartitioner(PartitionSpec spec) { + this.spec = spec; + this.size = spec.fields().size(); + this.transforms = new Transform[size]; + this.partitionTuple = new Object[size]; + for (int i = 0; i < size; i += 1) { + PartitionField field = spec.fields().get(i); + this.transforms[i] = field.transform(); + } + } + + AbstractPartitioner(AbstractPartitioner toCopy) { + this.spec = toCopy.spec; + this.size = toCopy.size; + this.transforms = toCopy.transforms; + this.partitionTuple = new Object[toCopy.partitionTuple.length]; + for (int i = 0; i < partitionTuple.length; i += 1) { + this.partitionTuple[i] = toCopy.partitionTuple[i]; + } + } + + @Override + public String toString() { + return Joiner.on(", ").join(partitionTuple); + } + + @Override + public int size() { + return size; + } + + @Override + @SuppressWarnings("unchecked") + public T get(int pos, Class javaClass) { + if (null == partitionTuple[pos]) { + throw new IllegalArgumentException("partition column not found in data: pos = " + pos); + } + if (CharSequence.class.isAssignableFrom(javaClass)) { + return javaClass.cast(partitionTuple[pos].toString()); + } + return javaClass.cast(partitionTuple[pos]); + } + + @Override + public void set(int pos, T value) { + partitionTuple[pos] = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AbstractPartitioner that = (AbstractPartitioner) o; + return Arrays.equals(partitionTuple, that.partitionTuple); + } + + @Override + public int hashCode() { + return Arrays.hashCode(partitionTuple); + } + + @Override + public String toPath() { + return spec.partitionToPath(this); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/FileWriter.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/FileWriter.java new file mode 100644 index 000000000000..cdbbd71e7cdb --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/FileWriter.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class FileWriter implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + + private static final long serialVersionUID = 1L; + + private final FileFormat format; + private final Path path; + private final ProcessingTimeService timeService; + private final long creationTime; + private final Partitioner partitioner; + private final FileAppender appender; + private final org.apache.hadoop.conf.Configuration hadoopConfig; + private final PartitionSpec spec; + private final WatermarkTimeExtractor watermarkTimeExtractor; + + private long lastWrittenToTime; + private long lowWatermark = Long.MAX_VALUE; + private long highWatermark = Long.MIN_VALUE; + + @SuppressWarnings("checkstyle:HiddenField") + public static class Builder { + private FileFormat format; + private Path path; + private ProcessingTimeService timeService; + private Partitioner partitioner; + private FileAppender appender; + private org.apache.hadoop.conf.Configuration hadoopConfig; + private PartitionSpec spec; + private Schema schema; + private String timestampField; + private TimeUnit timestampUnit; + + private Builder() { + } + + public Builder withFileFormat(final FileFormat format) { + this.format = format; + return this; + } + + public Builder withPath(final Path path) { + this.path = path; + return this; + } + + public Builder withProcessingTimeService(final ProcessingTimeService timeService) { + this.timeService = timeService; + return this; + } + + public Builder withPartitioner(final Partitioner partitioner) { + this.partitioner = partitioner; + return this; + } + + public Builder withAppender(final FileAppender appender) { + this.appender = appender; + return this; + } + + public Builder withHadooopConfig(final org.apache.hadoop.conf.Configuration hadoopConfig) { + this.hadoopConfig = hadoopConfig; + return this; + } + + public Builder withSpec(final PartitionSpec spec) { + this.spec = spec; + return this; + } + + public Builder withSchema(final Schema schema) { + this.schema = schema; + return this; + } + + public Builder withTimestampField(final String timestampField) { + this.timestampField = timestampField; + return this; + } + + public Builder withTimestampUnit(final TimeUnit timestampUnit) { + this.timestampUnit = timestampUnit; + return this; + } + + public FileWriter build() { + Preconditions.checkArgument(this.format != null, "File format is required"); + Preconditions.checkArgument(this.path != null, "File path is required"); + Preconditions.checkArgument(this.timeService != null, "ProcessingTimeService is required"); + Preconditions.checkArgument(this.partitioner != null, "Partitioner is required"); + Preconditions.checkArgument(this.appender != null, "File appender is required"); + Preconditions.checkArgument(this.hadoopConfig != null, "Hadoop config is required"); + Preconditions.checkArgument(this.spec != null, "Partition spec is required"); + Preconditions.checkArgument(this.schema != null, "schema is required"); + Preconditions.checkArgument(this.timestampField != null, "timestampField is required"); + Preconditions.checkArgument(this.timestampUnit != null, "timestampUnit is required"); + return new FileWriter(this); + } + } + + private FileWriter(Builder builder) { + format = builder.format; + path = builder.path; + timeService = builder.timeService; + creationTime = timeService.getCurrentProcessingTime(); + partitioner = builder.partitioner; + appender = builder.appender; + hadoopConfig = builder.hadoopConfig; + spec = builder.spec; + watermarkTimeExtractor = new WatermarkTimeExtractor( + builder.schema, builder.timestampField, builder.timestampUnit); + } + + public static Builder builder() { + return new Builder(); + } + + public long write(Record record) { + // We want to distinguish (1) IOException v.s. (2)type/schema exception + // For type exception, we want to page table owner. + try { + appender.add(record); + } catch (RuntimeIOException e) { + // 1. IOException (file or S3 write failure) + throw e; + } catch (Exception e) { + // 2. schema/type mismatch + // Ideally, we would like Iceberg/Parquet throw a unified exception + // (e.g. IcebergTypeException) for type/schema error. + // Unfortunately, that is not the case and not an simple change. + // For now, we just assume all non-schema errors are + // RuntimeIOException based on discussion with Ryan. + throw e; + } + lastWrittenToTime = timeService.getCurrentProcessingTime(); + Long timeMs = watermarkTimeExtractor.getWatermarkTimeMs(record); + if (null != timeMs) { + if (timeMs < lowWatermark) { + lowWatermark = timeMs; + } + if (timeMs > highWatermark) { + highWatermark = timeMs; + } + } + return appender.length(); + } + + public FlinkDataFile close() throws IOException { + final long start = System.currentTimeMillis(); + try { + appender.close(); + } finally { + final long duration = System.currentTimeMillis() - start; + LOG.debug("File appender closed in {} milli-seconds", duration); + } + + // metrics are only valid after the appender is closed + Metrics metrics = appender.metrics(); + InputFile inputFile = HadoopInputFile.fromPath(path, hadoopConfig); + DataFile dataFile = DataFiles.builder(spec) + .withFormat(format) + .withInputFile(inputFile) + .withPartition(partitioner) + .withMetrics(metrics) + .build(); + return new FlinkDataFile(lowWatermark, highWatermark, dataFile); + } + + public Path abort() throws IOException { + // TODO: need an abort API from Iceberg + appender.close(); + return path; + } + + public Path getPath() { + return path; + } + + @Override + public String toString() { + return String.format("path @ %s, created @ %d, last written @ %d, " + + "low watermark @ %d, high watermark @ %d", + path.toString(), creationTime, lastWrittenToTime, lowWatermark, highWatermark); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/FlinkDataFile.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/FlinkDataFile.java new file mode 100644 index 000000000000..6834b54167ee --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/FlinkDataFile.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.io.Serializable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +public class FlinkDataFile implements Serializable { + private final long lowWatermark; + private final long highWatermark; + private final DataFile dataFile; + + public FlinkDataFile(long lowWatermark, long highWatermark, DataFile dataFile) { + this.lowWatermark = lowWatermark; + this.highWatermark = highWatermark; + this.dataFile = dataFile; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("low_watermark", lowWatermark) + .add("high_watermark", highWatermark) + .add("data_file", dataFile) + .toString(); + } + + /** + * only dump essential fields like lowTimestamp, highTimestamp,and path + * TODO: lowTimestamp or lowWaterMark? + */ + public String toCompactDump() { + return MoreObjects.toStringHelper(this) + .add("low_watermark", lowWatermark) + .add("high_watermark", highWatermark) + .add("path", dataFile.path()) + .toString(); + } + + public DataFile getIcebergDataFile() { + return dataFile; + } + + public long getLowWatermark() { + return lowWatermark; + } + + public long getHighWatermark() { + return highWatermark; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergCommitter.java new file mode 100644 index 000000000000..5d228f6e057b --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergCommitter.java @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.connector.IcebergConnectorConstant; +import org.apache.iceberg.flink.connector.model.CommitMetadata; +import org.apache.iceberg.flink.connector.model.CommitMetadataUtil; +import org.apache.iceberg.flink.connector.model.FlinkManifestFile; +import org.apache.iceberg.flink.connector.model.FlinkManifestFileUtil; +import org.apache.iceberg.flink.connector.model.GenericFlinkManifestFile; +import org.apache.iceberg.flink.connector.model.ManifestFileState; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This operator commit data files to Iceberg table. + *

+ * This operator should always run with parallelism of 1. + * Because Iceberg lib perform optimistic concurrency control, + * this can help reduce contention and retries + * when committing files to Iceberg table. + *

+ */ +@SuppressWarnings("checkstyle:HiddenField") +public class IcebergCommitter extends RichSinkFunction + implements CheckpointedFunction, CheckpointListener { + private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class); + + private static final String COMMIT_MANIFEST_HASHES_KEY = "flink.commit.manifest.hashes"; + private static final String WATERMARK_PROP_KEY_PREFIX = "flink.watermark"; + + private Configuration config; + private final String namespace; + private final String tableName; + + private final boolean watermarkEnabled; + private final String watermarkPropKey; + private final long snapshotRetentionHours; + private final boolean commitRestoredManifestFiles; + private final String icebergManifestFileDir; + private final PartitionSpec spec; + private final FileIO io; + private final String flinkJobId; + + private transient Table table; + private transient List pendingDataFiles; + private transient List flinkManifestFiles; + private transient ListState manifestFileState; + private transient CommitMetadata metadata; + private transient ListState commitMetadataState; + + public IcebergCommitter(Table table, Configuration config) { + this.config = config; + + // current Iceberg sink implementation can't work with concurrent checkpoints. + // We disable concurrent checkpoints by default as min pause is set to 60s by default. + // Add an assertion to fail explicit in case job enables concurrent checkpoints. + CheckpointConfig checkpointConfig = StreamExecutionEnvironment.getExecutionEnvironment().getCheckpointConfig(); + if (checkpointConfig.getMaxConcurrentCheckpoints() > 1) { + throw new IllegalArgumentException("Iceberg sink doesn't support concurrent checkpoints"); + } + + namespace = config.getString(IcebergConnectorConstant.NAMESPACE, ""); + tableName = config.getString(IcebergConnectorConstant.TABLE, ""); + + watermarkEnabled = !Strings.isNullOrEmpty( + config.getString(IcebergConnectorConstant.WATERMARK_TIMESTAMP_FIELD, "")); + watermarkPropKey = WATERMARK_PROP_KEY_PREFIX; + snapshotRetentionHours = config.getLong(IcebergConnectorConstant.SNAPSHOT_RETENTION_HOURS, + IcebergConnectorConstant.DEFAULT_SNAPSHOT_RETENTION_HOURS); + commitRestoredManifestFiles = config.getBoolean(IcebergConnectorConstant.COMMIT_RESTORED_MANIFEST_FILES, + IcebergConnectorConstant.DEFAULT_COMMIT_RESTORED_MANIFEST_FILES); + icebergManifestFileDir = getIcebergManifestFileDir(config); + + // The only final fields yielded by table inputted + spec = table.spec(); + io = table.io(); + + final JobExecutionResult jobExecutionResult + = ExecutionEnvironment.getExecutionEnvironment().getLastJobExecutionResult(); + if (jobExecutionResult != null) { + flinkJobId = jobExecutionResult.getJobID().toString(); + LOG.info("Get Flink job ID from execution environment: {}", flinkJobId); + } else { + flinkJobId = new JobID().toString(); + LOG.info("Execution environment doesn't have executed job. Generate a random job ID : {}", flinkJobId); + } + LOG.info("Iceberg committer {}.{} created with sink config", namespace, tableName); + LOG.info("Iceberg committer {}.{} loaded table partition spec: {}", namespace, tableName, spec); + } + + @VisibleForTesting + List getPendingDataFiles() { + return pendingDataFiles; + } + + @VisibleForTesting + List getFlinkManifestFiles() { + return flinkManifestFiles; + } + + @VisibleForTesting + CommitMetadata getMetadata() { + return metadata; + } + + private String getIcebergManifestFileDir(Configuration config) { + final String checkpointDir = config.getString( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, null); + if (null == checkpointDir) { + throw new IllegalArgumentException("checkpoint dir is null"); + } + + return String.format("%s/iceberg/manifest/", checkpointDir); + } + + @Override + public void close() throws Exception { + super.close(); + } + + void init() { + // TODO: duplicate logic, to extract + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + String catalogType = config.getString(IcebergConnectorConstant.CATALOG_TYPE, + IcebergConnectorConstant.CATALOG_TYPE_DEFAULT); + Catalog catalog = null; + switch (catalogType.toUpperCase()) { + case IcebergConnectorConstant.HIVE_CATALOG: + hadoopConf.set(ConfVars.METASTOREURIS.varname, config.getString(ConfVars.METASTOREURIS.varname, "")); + catalog = HiveCatalogs.loadCatalog(hadoopConf); + break; + + case IcebergConnectorConstant.HADOOP_CATALOG: + catalog = new HadoopCatalog(hadoopConf, + config.getString(IcebergConnectorConstant.HADOOP_CATALOG_WAREHOUSE_LOCATION, "")); + break; + + default: + throw new UnsupportedOperationException("Unknown catalog type or not set: " + catalogType); + } + + this.table = catalog.loadTable(TableIdentifier.parse(namespace + "." + tableName)); + + pendingDataFiles = new ArrayList<>(); + flinkManifestFiles = new ArrayList<>(); + metadata = CommitMetadata.newBuilder() + .setLastCheckpointId(0) + .setLastCheckpointTimestamp(0) + .setLastCommitTimestamp(System.currentTimeMillis()) + .build(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + init(); + + Preconditions.checkState(manifestFileState == null, + "checkpointedFilesState has already been initialized."); + Preconditions.checkState(commitMetadataState == null, + "commitMetadataState has already been initialized."); + manifestFileState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>( + "iceberg-committer-manifest-files-state", ManifestFileState.class)); + commitMetadataState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>( + "iceberg-committer-metadata-state", CommitMetadata.class)); + + if (context.isRestored()) { + final Iterable restoredMetadata = commitMetadataState.get(); + if (null != restoredMetadata) { + LOG.info("Iceberg committer {}.{} restoring metadata", namespace, tableName); + List metadataList = new ArrayList<>(); + for (CommitMetadata entry : restoredMetadata) { + metadataList.add(entry); + } + Preconditions.checkState(1 == metadataList.size(), + "metadata list size should be 1. got " + metadataList.size()); + metadata = metadataList.get(0); + LOG.info("Iceberg committer {}.{} restored metadata: {}", + namespace, tableName, CommitMetadataUtil.encodeAsJson(metadata)); + } else { + LOG.info("Iceberg committer {}.{} has nothing to restore for metadata", namespace, tableName); + } + + Iterable restoredManifestFileStates = manifestFileState.get(); + if (null != restoredManifestFileStates) { + LOG.info("Iceberg committer {}.{} restoring manifest files", + namespace, tableName); + for (ManifestFileState manifestFileState : restoredManifestFileStates) { + flinkManifestFiles.add(GenericFlinkManifestFile.fromState(manifestFileState)); + } + LOG.info("Iceberg committer {}.{} restored {} manifest files: {}", + namespace, tableName, flinkManifestFiles.size(), flinkManifestFiles); + final long now = System.currentTimeMillis(); + if (now - metadata.getLastCheckpointTimestamp() > TimeUnit.HOURS.toMillis(snapshotRetentionHours)) { + flinkManifestFiles.clear(); + LOG.info("Iceberg committer {}.{} cleared restored manifest files as checkpoint timestamp is too old: " + + "checkpointTimestamp = {}, now = {}, snapshotRetentionHours = {}", + namespace, tableName, metadata.getLastCheckpointTimestamp(), now, snapshotRetentionHours); + } else { + flinkManifestFiles = removeCommittedManifests(flinkManifestFiles); + if (flinkManifestFiles.isEmpty()) { + LOG.info("Iceberg committer {}.{} has zero uncommitted manifest files from restored state", + namespace, tableName); + } else { + if (commitRestoredManifestFiles) { + commitRestoredManifestFiles(); + } else { + LOG.info("skip commit of restored manifest files"); + } + } + } + } else { + LOG.info("Iceberg committer {}.{} has nothing to restore for manifest files", namespace, tableName); + } + } + } + + @VisibleForTesting + void commitRestoredManifestFiles() throws Exception { + LOG.info("Iceberg committer {}.{} committing last uncompleted transaction upon recovery: " + + "metadata = {}, flink manifest files ({}) = {}", namespace, tableName, + CommitMetadataUtil.encodeAsJson(metadata), + flinkManifestFiles.size(), flinkManifestFiles); + commit(); + LOG.info("Iceberg committer {}.{} committed last uncompleted transaction upon recovery: " + + "metadata = {}, flink manifest files ({}) = {}", namespace, tableName, + CommitMetadataUtil.encodeAsJson(metadata), + flinkManifestFiles.size(), flinkManifestFiles); + postCommitSuccess(); + } + + private List removeCommittedManifests(List flinkManifestFiles) { + int snapshotCount = 0; + String result = "succeeded"; + final long start = System.currentTimeMillis(); + try { + final Set manifestHashes = flinkManifestFiles.stream() + .map(f -> f.hash()) + .collect(Collectors.toSet()); + final Set committedHashes = new HashSet<>(flinkManifestFiles.size()); + final Iterable snapshots = table.snapshots(); + for (Snapshot snapshot : snapshots) { + ++snapshotCount; + final Map summary = snapshot.summary(); + final List hashes = FlinkManifestFileUtil.hashesStringToList(summary.get(COMMIT_MANIFEST_HASHES_KEY)); + for (String hash : hashes) { + if (manifestHashes.contains(hash)) { + committedHashes.add(hash); + } + } + } + final List uncommittedManifestFiles = flinkManifestFiles.stream() + .filter(f -> !committedHashes.contains(f.hash())) + .collect(Collectors.toList()); + return uncommittedManifestFiles; + } catch (Throwable t) { + result = "failed"; + //LOG.error(String.format("Iceberg committer %s.%s failed to check transaction completed", database, tableName), + // t); + LOG.error("Iceberg committer {}.{} failed to check transaction completed. Throwable = {}", + namespace, tableName, t); + throw t; + } finally { + final long duration = System.currentTimeMillis() - start; + LOG.info("Iceberg committer {}.{} {} to check transaction completed" + + " after iterating {} snapshots and {} milli-seconds", + namespace, tableName, result, snapshotCount, duration); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + LOG.info("Iceberg committer {}.{} snapshot state: checkpointId = {}, triggerTime = {}", + namespace, tableName, context.getCheckpointId(), context.getCheckpointTimestamp()); + Preconditions.checkState(manifestFileState != null, + "manifest files state has not been properly initialized."); + Preconditions.checkState(commitMetadataState != null, + "metadata state has not been properly initialized."); + + // set transaction to null to indicate a start of a new checkpoint/commit/transaction + synchronized (this) { + snapshot(context, pendingDataFiles); + checkpointState(flinkManifestFiles, metadata); + postSnapshotSuccess(); + } + } + + @VisibleForTesting + void snapshot(FunctionSnapshotContext context, List pendingDataFiles) throws Exception { + FlinkManifestFile flinkManifestFile = null; + if (!pendingDataFiles.isEmpty()) { + flinkManifestFile = createManifestFile(context, pendingDataFiles); + flinkManifestFiles.add(flinkManifestFile); + } + metadata = updateMetadata(metadata, context, flinkManifestFile); + } + + private FlinkManifestFile createManifestFile( + FunctionSnapshotContext context, List pendingDataFiles) throws Exception { + LOG.info("Iceberg committer {}.{} checkpointing {} pending data files}", + namespace, tableName, pendingDataFiles.size()); + String result = "succeeded"; + final long start = System.currentTimeMillis(); + try { + final String manifestFileName = Joiner.on("_") + .join(flinkJobId, context.getCheckpointId(), context.getCheckpointTimestamp()); + // Iceberg requires file format suffix right now + final String manifestFileNameWithSuffix = manifestFileName + ".avro"; + OutputFile outputFile = io.newOutputFile(icebergManifestFileDir + manifestFileNameWithSuffix); + ManifestWriter manifestWriter = ManifestFiles.write(spec, outputFile); + + // stats + long recordCount = 0; + long byteCount = 0; + long lowWatermark = Long.MAX_VALUE; + long highWatermark = Long.MIN_VALUE; + for (FlinkDataFile flinkDataFile : pendingDataFiles) { + DataFile dataFile = flinkDataFile.getIcebergDataFile(); + manifestWriter.add(dataFile); + // update stats + recordCount += dataFile.recordCount(); + byteCount += dataFile.fileSizeInBytes(); + if (flinkDataFile.getLowWatermark() < lowWatermark) { + lowWatermark = flinkDataFile.getLowWatermark(); + } + if (flinkDataFile.getHighWatermark() > highWatermark) { + highWatermark = flinkDataFile.getHighWatermark(); + } + LOG.debug("Data file with size of {} bytes added to manifest", dataFile.fileSizeInBytes()); + } + manifestWriter.close(); + ManifestFile manifestFile = manifestWriter.toManifestFile(); + + FlinkManifestFile flinkManifestFile = GenericFlinkManifestFile.builder() + .setPath(manifestFile.path()) + .setLength(manifestFile.length()) + .setSpecId(manifestFile.partitionSpecId()) + .setCheckpointId(context.getCheckpointId()) + .setCheckpointTimestamp(context.getCheckpointTimestamp()) + .setDataFileCount(pendingDataFiles.size()) + .setRecordCount(recordCount) + .setByteCount(byteCount) + .setLowWatermark(lowWatermark) + .setHighWatermark(highWatermark) + .build(); + + // don't want to log a giant list at one line. + // split the complete list into smaller chunks with 50 files. + final AtomicInteger counter = new AtomicInteger(0); + Collection> listOfFileList = pendingDataFiles.stream() + .map(flinkDataFile -> flinkDataFile.toCompactDump()) + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 50)) + .values(); + for (List fileList : listOfFileList) { + LOG.info("Iceberg committer {}.{} created manifest file {} for {}/{} data files: {}", + namespace, tableName, manifestFile.path(), fileList.size(), pendingDataFiles.size(), fileList); + } + return flinkManifestFile; + } catch (Throwable t) { + result = "failed"; + //LOG.error(String.format("Iceberg committer %s.%s failed to create manifest file for %d pending data files", + // database, tableName, pendingDataFiles.size()), t); + LOG.error("Iceberg committer {}.{} failed to create manifest file for {} pending data files. Throwable={}", + namespace, tableName, pendingDataFiles.size(), t); + throw t; + } finally { + final long duration = System.currentTimeMillis() - start; + LOG.info("Iceberg committer {}.{} {} to create manifest file with {} data files after {} milli-seconds", + namespace, tableName, result, pendingDataFiles.size(), duration); + } + } + + /** + * Extract watermark from old {@link CommitMetadata} and {@link FlinkManifestFile}, + * to build a new {@link CommitMetadata}. + */ + private CommitMetadata updateMetadata( + CommitMetadata oldMetadata, + FunctionSnapshotContext context, + @Nullable FlinkManifestFile flinkManifestFile) { + LOG.info("Iceberg committer {}.{} updating metadata {} with manifest file {}", + namespace, tableName, CommitMetadataUtil.encodeAsJson(oldMetadata), flinkManifestFile); + CommitMetadata.Builder metadataBuilder = CommitMetadata.newBuilder(oldMetadata) + .setLastCheckpointId(context.getCheckpointId()) + .setLastCheckpointTimestamp(context.getCheckpointTimestamp()); + if (watermarkEnabled) { + Long watermark = oldMetadata.getWatermark(); + if (flinkManifestFile == null) { + // when there is no data to be committed + // use elapsed wall clock time to move watermark forward. + if (watermark != null) { + final long elapsedTimeMs = System.currentTimeMillis() - oldMetadata.getLastCommitTimestamp(); + watermark += elapsedTimeMs; + } else { + watermark = System.currentTimeMillis(); + } + } else { + // use lowWatermark. + if (flinkManifestFile.lowWatermark() == null) { + throw new IllegalArgumentException("Watermark is enabled but lowWatermark is null"); + } + // in case one container/slot is lagging behind, + // we want to move watermark forward based on the slowest. + final long newWatermark = flinkManifestFile.lowWatermark(); + // make sure watermark doesn't go back in time + if (watermark == null || newWatermark > watermark) { + watermark = newWatermark; + } + } + metadataBuilder.setWatermark(watermark); + } + CommitMetadata newMetadata = metadataBuilder.build(); + LOG.info("Iceberg committer {}.{} updated metadata {} with manifest file {}", + namespace, tableName, CommitMetadataUtil.encodeAsJson(newMetadata), flinkManifestFile); + return newMetadata; + } + + private void checkpointState(List flinkManifestFiles, CommitMetadata metadata) throws Exception { + LOG.info("Iceberg committer {}.{} checkpointing state", namespace, tableName); + List manifestFileStates = flinkManifestFiles.stream() + .map(f -> f.toState()) + .collect(Collectors.toList()); + manifestFileState.clear(); + manifestFileState.addAll(manifestFileStates); + commitMetadataState.clear(); + commitMetadataState.add(metadata); + LOG.info("Iceberg committer {}.{} checkpointed state: metadata = {}, flinkManifestFiles({}) = {}", + namespace, tableName, CommitMetadataUtil.encodeAsJson(metadata), + flinkManifestFiles.size(), flinkManifestFiles); + } + + private void postSnapshotSuccess() { + pendingDataFiles.clear(); + LOG.debug("Un-committed manifest file count {}, containing data file count {}, record count {} and byte count {}", + flinkManifestFiles.size(), + FlinkManifestFileUtil.getDataFileCount(flinkManifestFiles), + FlinkManifestFileUtil.getRecordCount(flinkManifestFiles), + FlinkManifestFileUtil.getByteCount(flinkManifestFiles) + ); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + LOG.info("Iceberg committer {}.{} checkpoint {} completed", namespace, tableName, checkpointId); + synchronized (this) { + if (checkpointId == metadata.getLastCheckpointId()) { + try { + commit(); + postCommitSuccess(); + } catch (Exception t) { + // swallow the exception to avoid job restart in case of commit failure + //LOG.error(String.format("Iceberg committer %s.%s failed to do post checkpoint commit", database, tableName), + // t); + LOG.error("Iceberg committer {}.{} failed to do post checkpoint commit. Throwable = ", + namespace, tableName, t); + } + } else { + // TODO: it would be nice to fix this and allow concurrent checkpoint + LOG.info("Iceberg committer {}.{} skip committing transaction: " + + "notify complete checkpoint id = {}, last manifest checkpoint id = {}", + namespace, tableName, checkpointId, metadata.getLastCheckpointId()); + } + } + } + + @VisibleForTesting + void commit() { + if (!flinkManifestFiles.isEmpty() || watermarkEnabled) { + final long start = System.currentTimeMillis(); + try { + // prepare and commit transactions in two separate methods + // so that we can measure latency separately. + Transaction transaction = prepareTransaction(flinkManifestFiles, metadata); + commitTransaction(transaction); + final long duration = System.currentTimeMillis() - start; + LOG.info("Iceberg committer {}.{} succeeded to commit {} manifest files after {} milli-seconds", + namespace, tableName, flinkManifestFiles.size(), TimeUnit.NANOSECONDS.toMillis(duration)); + } catch (Throwable t) { + final long duration = System.currentTimeMillis() - start; + //LOG.error(String.format("Iceberg committer %s.%s failed to commit %d manifest files after %d milli-seconds", + //database, tableName, flinkManifestFiles.size(), TimeUnit.NANOSECONDS.toMillis(duration)), t); + LOG.error("Iceberg committer {}.{} failed to commit {} manifest files after {} milli-seconds." + + " Throwable = ", + namespace, tableName, flinkManifestFiles.size(), duration, t); + throw t; + } + } else { + LOG.info("Iceberg committer {}.{} skip commit, " + + "as there are no uncommitted data files and watermark is disabled", + namespace, tableName); + } + } + + private Transaction prepareTransaction(List flinkManifestFiles, CommitMetadata metadata) { + LOG.info("Iceberg committer {}.{} start to prepare transaction: {}", + namespace, tableName, CommitMetadataUtil.encodeAsJson(metadata)); + final long start = System.currentTimeMillis(); + try { + Transaction transaction = table.newTransaction(); + if (!flinkManifestFiles.isEmpty()) { + List hashes = new ArrayList<>(flinkManifestFiles.size()); + AppendFiles appendFiles = transaction.newAppend(); + for (FlinkManifestFile flinkManifestFile : flinkManifestFiles) { + appendFiles.appendManifest(flinkManifestFile); + hashes.add(flinkManifestFile.hash()); + } + + appendFiles.set(COMMIT_MANIFEST_HASHES_KEY, FlinkManifestFileUtil.hashesListToString(hashes)); + appendFiles.commit(); + LOG.info("Iceberg committer {}.{} appended {} manifest files to transaction", + namespace, tableName, flinkManifestFiles.size()); + } + if (watermarkEnabled) { + UpdateProperties updateProperties = transaction.updateProperties(); + updateProperties.set(watermarkPropKey, Long.toString(metadata.getWatermark())); + updateProperties.commit(); + LOG.info("Iceberg committer {}.{} set watermark to {}", namespace, tableName, metadata.getWatermark()); + } + return transaction; + } finally { + final long duration = System.currentTimeMillis() - start; + LOG.debug("Transaction prepared in {} milli-seconds", duration); + } + } + + private void commitTransaction(Transaction transaction) { + LOG.info("Iceberg committer {}.{} start to commit transaction: {}", + namespace, tableName, CommitMetadataUtil.encodeAsJson(metadata)); + final long start = System.currentTimeMillis(); + try { + transaction.commitTransaction(); + } finally { + final long duration = System.currentTimeMillis() - start; + LOG.debug("Transaction committed in {} milli-seconds", duration); + } + } + + private void postCommitSuccess() { + LOG.info("Iceberg committer {}.{} update metrics and metadata post commit success", namespace, tableName); + LOG.debug("Committed manifest file count {}, containing data file count {}, record count {} and byte count {}", + flinkManifestFiles.size(), + FlinkManifestFileUtil.getDataFileCount(flinkManifestFiles), + FlinkManifestFileUtil.getRecordCount(flinkManifestFiles), + FlinkManifestFileUtil.getByteCount(flinkManifestFiles)); + final Long lowWatermark = FlinkManifestFileUtil.getLowWatermark(flinkManifestFiles); + if (null != lowWatermark) { + LOG.debug("Low watermark as {}", lowWatermark); + } + final Long highWatermark = FlinkManifestFileUtil.getHighWatermark(flinkManifestFiles); + if (null != highWatermark) { + LOG.debug("High watermark as {}", highWatermark); + } + if (metadata.getWatermark() != null) { + LOG.debug("Watermark as {}", metadata.getWatermark()); + } + final long lastCommitTimestamp = System.currentTimeMillis(); + metadata.setLastCommitTimestamp(lastCommitTimestamp); + flinkManifestFiles.clear(); + } + + @Override + public void invoke(FlinkDataFile value, Context context) throws Exception { + pendingDataFiles.add(value); + LOG.debug("Receive file count {}, containing record count {} and file size in byte {}", + value.getIcebergDataFile().recordCount(), + value.getIcebergDataFile().fileSizeInBytes()); + LOG.debug("Iceberg committer {}.{} has total pending files {} after receiving new data file: {}", + namespace, tableName, pendingDataFiles.size(), value.toCompactDump()); + } + +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergSinkAppender.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergSinkAppender.java new file mode 100644 index 000000000000..634ff35ef1db --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergSinkAppender.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Append Iceberg sink to DataStream + * + * @param input data type + */ +@SuppressWarnings({"checkstyle:ClassTypeParameterName", "checkstyle:HiddenField"}) +public class IcebergSinkAppender { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkAppender.class); + + private final Table table; + private final Configuration config; + private final String sinkName; + private RecordSerializer serializer; + private Integer writerParallelism; + + public IcebergSinkAppender(Table table, Configuration config, String sinkName) { + this.table = table; + this.config = config; + this.sinkName = sinkName; + } + + /** + * Required. + * + * @param serializer Serialize input data type to Iceberg {@link Record} + */ + public IcebergSinkAppender withSerializer(RecordSerializer serializer) { + this.serializer = serializer; + return this; + } + + /** + * Optional. Explicitly set the parallelism for Iceberg writer operator. + * Otherwise, default job parallelism is used. + */ + public IcebergSinkAppender withWriterParallelism(Integer writerParallelism) { + this.writerParallelism = writerParallelism; + return this; + } + + /** + * @param dataStream append sink to this DataStream + */ + public DataStreamSink append(DataStream dataStream) { + // TODO: need to return? + IcebergWriter writer = new IcebergWriter<>(table, serializer, config); + IcebergCommitter committer = new IcebergCommitter(table, config); + + final String writerId = sinkName + "-writer"; + SingleOutputStreamOperator writerStream = dataStream + .transform(writerId, TypeInformation.of(FlinkDataFile.class), writer) // IcebergWriter as stream operator + .uid(writerId); + if (null != writerParallelism && writerParallelism > 0) { + LOG.info("Set Iceberg writer parallelism to {}", writerParallelism); + writerStream.setParallelism(writerParallelism); + } + + final String committerId = sinkName + "-committer"; + return writerStream + .addSink(committer) // IcebergCommitter as sink + .name(committerId) + .uid(committerId) + .setParallelism(1); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergWriter.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergWriter.java new file mode 100644 index 000000000000..4deb75a36862 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergWriter.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.connector.IcebergConnectorConstant; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +@SuppressWarnings("checkstyle:ClassTypeParameterName") +public class IcebergWriter extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); + + private final RecordSerializer serializer; + private final String namespace; + private final String tableName; + private final FileFormat format; + private final boolean skipIncompatibleRecord; + private final Schema schema; + private final PartitionSpec spec; + private final LocationProvider locations; + private final FileIO io; + private final Map properties; + private final String timestampFeild; + private final TimeUnit timestampUnit; + private final long maxFileSize; + + private transient org.apache.hadoop.conf.Configuration hadoopConf; + private transient Map openPartitionFiles; + private transient int subtaskId; + private transient ProcessingTimeService timerService; + private transient Partitioner partitioner; + + public IcebergWriter(Table table, @Nullable RecordSerializer serializer, Configuration config) { + this.serializer = serializer; + skipIncompatibleRecord = config.getBoolean(IcebergConnectorConstant.SKIP_INCOMPATIBLE_RECORD, + IcebergConnectorConstant.DEFAULT_SKIP_INCOMPATIBLE_RECORD); + timestampFeild = config.getString(IcebergConnectorConstant.WATERMARK_TIMESTAMP_FIELD, ""); + timestampUnit = TimeUnit.valueOf(config.getString(IcebergConnectorConstant.WATERMARK_TIMESTAMP_UNIT, + IcebergConnectorConstant.DEFAULT_WATERMARK_TIMESTAMP_UNIT)); + maxFileSize = config.getLong(IcebergConnectorConstant.MAX_FILE_SIZE, + IcebergConnectorConstant.DEFAULT_MAX_FILE_SIZE); + + namespace = config.getString(IcebergConnectorConstant.NAMESPACE, ""); + tableName = config.getString(IcebergConnectorConstant.TABLE, ""); + + schema = table.schema(); + spec = table.spec(); + locations = table.locationProvider(); + io = table.io(); + properties = table.properties(); + format = FileFormat.valueOf( + properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH)); + + LOG.info("Iceberg writer {}.{} data file location: {}", + namespace, tableName, locations.newDataLocation("")); + LOG.info("Iceberg writer {}.{} created with sink config", namespace, tableName); + LOG.info("Iceberg writer {}.{} loaded table: schema = {}\npartition spec = {}", + namespace, tableName, schema, spec); + + // default ChainingStrategy is set to HEAD + // we prefer chaining to avoid the huge serialization and deserializatoin overhead. + super.setChainingStrategy(ChainingStrategy.ALWAYS); + } + + @Override + public void open() throws Exception { + super.open(); + + hadoopConf = new org.apache.hadoop.conf.Configuration(); + subtaskId = getRuntimeContext().getIndexOfThisSubtask(); + timerService = getProcessingTimeService(); + openPartitionFiles = new HashMap<>(); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + LOG.info("Iceberg writer {}.{} subtask {} begin preparing for checkpoint {}", + namespace, tableName, subtaskId, checkpointId); + // close all open files and emit files to downstream committer operator + flush(true); + LOG.info("Iceberg writer {}.{} subtask {} completed preparing for checkpoint {}", + namespace, tableName, subtaskId, checkpointId); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + } + + @VisibleForTesting + List flush(boolean emit) throws IOException { + List dataFiles = new ArrayList<>(openPartitionFiles.size()); + for (Map.Entry entry : openPartitionFiles.entrySet()) { + FileWriter writer = entry.getValue(); + FlinkDataFile flinkDataFile = closeWriter(writer); + dataFiles.add(flinkDataFile); + if (emit) { + emit(flinkDataFile); + } + } + LOG.info("Iceberg writer {}.{} subtask {} flushed {} open files", + namespace, tableName, subtaskId, openPartitionFiles.size()); + openPartitionFiles.clear(); + return dataFiles; + } + + FlinkDataFile closeWriter(FileWriter writer) throws IOException { + FlinkDataFile flinkDataFile = writer.close(); + LOG.info( + "Iceberg writer {}.{} subtask {} uploaded to Iceberg table {}.{} with {} records and {} bytes on this path: {}", + namespace, tableName, subtaskId, namespace, tableName, flinkDataFile.getIcebergDataFile().recordCount(), + flinkDataFile.getIcebergDataFile().fileSizeInBytes(), flinkDataFile.getIcebergDataFile().path()); + return flinkDataFile; + } + + void emit(FlinkDataFile flinkDataFile) { + output.collect(new StreamRecord<>(flinkDataFile)); + LOG.debug("Iceberg writer {}.{} subtask {} emitted uploaded file to committer for Iceberg table {}.{}" + + " with {} records and {} bytes on this path: {}", + namespace, tableName, subtaskId, namespace, tableName, flinkDataFile.getIcebergDataFile().recordCount(), + flinkDataFile.getIcebergDataFile().fileSizeInBytes(), flinkDataFile.getIcebergDataFile().path()); + } + + @Override + public void close() throws Exception { + super.close(); + + LOG.info("Iceberg writer {}.{} subtask {} begin close", namespace, tableName, subtaskId); + // close all open files without emitting to downstream committer + flush(false); + LOG.info("Iceberg writer {}.{} subtask {} completed close", namespace, tableName, subtaskId); + } + + @Override + public void dispose() throws Exception { + super.dispose(); + + LOG.info("Iceberg writer {}.{} subtask {} begin dispose", namespace, tableName, subtaskId); + abort(); + LOG.info("Iceberg writer {}.{} subtask {} completed dispose", namespace, tableName, subtaskId); + } + + private void abort() { + LOG.info("Iceberg writer {}.{} subtask {} has {} open files to abort", + namespace, tableName, subtaskId, openPartitionFiles.size()); + // close all open files without sending DataFile list to downstream committer operator. + // because there are not checkpointed, + // we don't want to commit these files. + for (Map.Entry entry : openPartitionFiles.entrySet()) { + final FileWriter writer = entry.getValue(); + final Path path = writer.getPath(); + try { + LOG.debug("Iceberg writer {}.{} subtask {} start to abort file: {}", + namespace, tableName, subtaskId, path); + writer.abort(); + LOG.info("Iceberg writer {}.{} subtask {} completed aborting file: {}", + namespace, tableName, subtaskId, path); + } catch (Throwable t) { +// LOG.error(String.format("Iceberg writer %s.%s subtask %d failed to abort open file: %s", +// namespace, tableName, subtaskId, path.toString()), t); + LOG.error("Iceberg writer {}.{} subtask {} failed to abort open file: {}. Throwable = {}", + namespace, tableName, subtaskId, path.toString(), t); + continue; + } + + try { + LOG.debug("Iceberg writer {}.{} subtask {} deleting aborted file: {}", + namespace, tableName, subtaskId, path); + io.deleteFile(path.toString()); + LOG.info("Iceberg writer {}.{} subtask {} deleted aborted file: {}", + namespace, tableName, subtaskId, path); + } catch (Throwable t) { +// LOG.error(String.format( +// "Iceberg writer %s.%s subtask %d failed to delete aborted file: %s", +// namespace, tableName, subtaskId, path.toString()), t); + LOG.error("Iceberg writer {}.{} subtask {} failed to delete aborted file: {}. Throwable = {}", + namespace, tableName, subtaskId, path.toString(), t); + } + } + LOG.info("Iceberg writer {}.{} subtask {} aborted {} open files", + namespace, tableName, subtaskId, openPartitionFiles.size()); + openPartitionFiles.clear(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + IN value = element.getValue(); + try { + processInternal(value); + } catch (Exception t) { + if (!skipIncompatibleRecord) { + throw t; + } + } + } + + @VisibleForTesting + void processInternal(IN value) throws Exception { + Record record = serializer.serialize(value, schema); + processRecord(record); + } + + /** + * process element as {@link Record} + */ + private void processRecord(Record record) throws Exception { + if (partitioner == null) { + partitioner = new RecordPartitioner(spec); + } + partitioner.partition(record); + final String partitionPath = locations.newDataLocation(spec, partitioner, ""); + if (!openPartitionFiles.containsKey(partitionPath)) { + final Path path = new Path(partitionPath, generateFileName()); + FileWriter writer = newWriter(path, partitioner); + openPartitionFiles.put(partitionPath, writer); // TODO: 1 writer for 1 partition path? + LOG.info("Iceberg writer {}.{} subtask {} opened a new file: {}", + namespace, tableName, subtaskId, path.toString()); + } + final FileWriter writer = openPartitionFiles.get(partitionPath); + final long fileSize = writer.write(record); + // Rotate the file if over size limit. + // This is mainly to avoid the 5 GB size limit of copying object in S3 + // that auto-lift otherwise can run into. + // We still rely on presto-s3fs for progressive upload + // that uploads a ~100 MB part whenever filled, + // which achieves smoother outbound/upload network traffic. + if (fileSize >= maxFileSize) { + FlinkDataFile flinkDataFile = closeWriter(writer); + emit(flinkDataFile); + openPartitionFiles.remove(partitionPath); + } + } + + private String generateFileName() { + return format.addExtension( + String.format("%d_%d_%s", subtaskId, System.currentTimeMillis(), UUID.randomUUID().toString())); + } + + private FileWriter newWriter(final Path path, final Partitioner part) throws Exception { + FileAppender appender = newAppender(io.newOutputFile(path.toString())); + FileWriter writer = FileWriter.builder() + .withFileFormat(format) + .withPath(path) + .withProcessingTimeService(timerService) + .withPartitioner(part.copy()) + .withAppender(appender) + .withHadooopConfig(hadoopConf) + .withSpec(spec) + .withSchema(schema) + .withTimestampField(timestampFeild) + .withTimestampUnit(timestampUnit) + .build(); + return writer; + } + + + private FileAppender newAppender(OutputFile file) throws Exception { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); + try { + switch (format) { + case PARQUET: + return Parquet.write(file) + .createWriterFunc(GenericParquetWriter::buildWriter) + .setAll(properties) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + case AVRO: + return Avro.write(file) + .createWriterFunc(DataWriter::create) + .setAll(properties) + .schema(schema) + .overwrite() + .build(); + + default: + throw new UnsupportedOperationException("Cannot write unknown format: " + format); + } + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/Partitioner.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/Partitioner.java new file mode 100644 index 000000000000..18a79130ea30 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/Partitioner.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import org.apache.iceberg.StructLike; + +interface Partitioner extends StructLike { + + Partitioner copy(); + + String toPath(); + + void partition(T record); +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/PassThroughRecordSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/PassThroughRecordSerializer.java new file mode 100644 index 000000000000..6eb362ca0713 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/PassThroughRecordSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; + +/** + * Pass {@link Record} directly, without any operations + */ +public class PassThroughRecordSerializer implements RecordSerializer { + + private static final PassThroughRecordSerializer INSTANCE = new PassThroughRecordSerializer(); + + public static PassThroughRecordSerializer getInstance() { + return INSTANCE; + } + + @Override + public Record serialize(Record record, Schema schema) { + return record; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/RecordPartitioner.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/RecordPartitioner.java new file mode 100644 index 000000000000..247e836d0460 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/RecordPartitioner.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.lang.reflect.Array; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +class RecordPartitioner extends AbstractPartitioner { + + private final Accessor[] accessors; + + @SuppressWarnings("unchecked") + RecordPartitioner(PartitionSpec spec) { + super(spec); + + final Schema schema = spec.schema(); + final List fields = spec.fields(); + final int numFields = fields.size(); + this.accessors = (Accessor[]) Array.newInstance(Accessor.class, numFields); + final Map> idToAccessorMap = buildAccessors(schema); + for (int i = 0; i < numFields; i += 1) { + PartitionField field = fields.get(i); + Accessor accessor = idToAccessorMap.get(field.sourceId()); + if (accessor == null) { + throw new RuntimeException( + "Cannot build accessor for field: " + schema.findField(field.sourceId())); + } + this.accessors[i] = accessor; + } + } + + RecordPartitioner(RecordPartitioner toCopy) { + super(toCopy); + accessors = toCopy.accessors; + } + + @Override + public RecordPartitioner copy() { + return new RecordPartitioner(this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(RecordPartitioner.class.toString()); + sb.append("["); + sb.append(super.toString()); + sb.append("]"); + return sb.toString(); + } + + @SuppressWarnings("unchecked") + @Override + public void partition(Record record) { + for (int i = 0; i < partitionTuple.length; i += 1) { + Transform transform = transforms[i]; + partitionTuple[i] = transform.apply(accessors[i].get(record)); + } + } + + private interface Accessor { + Object get(T container); + } + + private static Map> buildAccessors(Schema schema) { + return TypeUtil.visit(schema, new BuildPositionAccessors()); + } + + private static Accessor newAccessor(int position, Type type) { + switch (type.typeId()) { + case TIMESTAMP: + return new TimeStampAccessor(position, ((Types.TimestampType) type).shouldAdjustToUTC()); + case TIME: + return new TimeAccessor(position); + case DATE: + return new DateAccessor(position); + default: + return new PositionAccessor(position); + } + } + + private static Accessor newAccessor(int position, boolean isOptional, + Accessor accessor) { + if (isOptional) { + // the wrapped position handles null layers + return new WrappedPositionAccessor(position, accessor); + } else if (accessor != null && accessor.getClass() == PositionAccessor.class) { + return new Position2Accessor(position, (PositionAccessor) accessor); + } else if (accessor instanceof Position2Accessor) { + return new Position3Accessor(position, (Position2Accessor) accessor); + } else { + return new WrappedPositionAccessor(position, accessor); + } + } + + private static class BuildPositionAccessors + extends TypeUtil.SchemaVisitor>> { + @Override + public Map> schema( + Schema schema, Map> structResult) { + return structResult; + } + + @Override + public Map> struct( + Types.StructType struct, List>> fieldResults) { + Map> accessors = Maps.newHashMap(); + List fields = struct.fields(); + for (int i = 0; i < fieldResults.size(); i += 1) { + Types.NestedField field = fields.get(i); + Map> result = fieldResults.get(i); + if (result != null) { + for (Map.Entry> entry : result.entrySet()) { + accessors.put(entry.getKey(), newAccessor(i, field.isOptional(), entry.getValue())); + } + } else { + accessors.put(field.fieldId(), newAccessor(i, field.type())); + } + } + + if (accessors.isEmpty()) { + return null; + } + + return accessors; + } + + @Override + public Map> field( + Types.NestedField field, Map> fieldResult) { + return fieldResult; + } + } + + private static class PositionAccessor implements Accessor { + private final int position; + + private PositionAccessor(int position) { + this.position = position; + } + + @Override + public Object get(Record record) { + return record.get(position); + } + + int position() { + return position; + } + } + + private static class Position2Accessor implements Accessor { + private final int p0; + private final int p1; + + private Position2Accessor(int position, PositionAccessor wrapped) { + this.p0 = position; + this.p1 = wrapped.position; + } + + @Override + public Object get(Record record) { + Record inner = (Record) record.get(p0); + return inner.get(p1); + } + } + + private static class Position3Accessor implements Accessor { + private final int p0; + private final int p1; + private final int p2; + + private Position3Accessor(int position, Position2Accessor wrapped) { + this.p0 = position; + this.p1 = wrapped.p0; + this.p2 = wrapped.p1; + } + + @Override + public Object get(Record record) { + Record inner = (Record) record.get(p0); + Record inner2 = (Record) inner.get(p1); + return inner2.get(p2); + } + } + + private static class WrappedPositionAccessor implements Accessor { + private final int position; + private final Accessor accessor; + + private WrappedPositionAccessor(int position, Accessor accessor) { + this.position = position; + this.accessor = accessor; + } + + @Override + public Object get(Record record) { + Record inner = (Record) record.get(position); + if (inner != null) { + return accessor.get(inner); + } + return null; + } + } + + private static class TimeStampAccessor extends PositionAccessor { + private final boolean withZone; + + private TimeStampAccessor(int position, boolean withZone) { + super(position); + this.withZone = withZone; + } + + @Override + public Object get(Record record) { + return withZone ? + DateTimeUtil.microsFromTimestamptz(record.get(position(), java.time.OffsetDateTime.class)) : + DateTimeUtil.microsFromTimestamp(record.get(position(), java.time.LocalDateTime.class)); + } + } + + private static class TimeAccessor extends PositionAccessor { + private TimeAccessor(int position) { + super(position); + } + + @Override + public Object get(Record record) { + return DateTimeUtil.microsFromTime(record.get(position(), java.time.LocalTime.class)); + } + } + + private static class DateAccessor extends PositionAccessor { + private DateAccessor(int position) { + super(position); + } + + @Override + public Object get(Record record) { + return DateTimeUtil.daysFromDate(record.get(position(), java.time.LocalDate.class)); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/RecordSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/RecordSerializer.java new file mode 100644 index 000000000000..2af1141e99ba --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/RecordSerializer.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.io.Serializable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; + +/** + * Serialize input data type to {@link Record} of Iceberg + */ +@FunctionalInterface +public interface RecordSerializer extends Serializable { + // TODO: input parameter should be schema in Iceberg? + Record serialize(IN record, Schema schema) throws Exception; +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/connector/sink/WatermarkTimeExtractor.java b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/WatermarkTimeExtractor.java new file mode 100644 index 000000000000..1ae5d71ad656 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/connector/sink/WatermarkTimeExtractor.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import java.time.LocalTime; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.types.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WatermarkTimeExtractor { + private static final Logger LOG = LoggerFactory.getLogger(WatermarkTimeExtractor.class); + + private final TimeUnit timestampUnit; + private final List timestampFieldChain; + + public WatermarkTimeExtractor(Schema schema, String timestampFieldChainAsString, TimeUnit timestampUnit) { + this.timestampUnit = timestampUnit; + this.timestampFieldChain = getTimestampFieldChain(schema, timestampFieldChainAsString); + } + + private List getTimestampFieldChain(Schema schema, String timestampFieldChainAsString) { + if (Strings.isNullOrEmpty(timestampFieldChainAsString)) { + return Collections.emptyList(); + } + + List fieldNames = Splitter.on(".").splitToList(timestampFieldChainAsString); + final int size = fieldNames.size(); // size >= 1 due to the Strings.isNullOrEmpty() check ahead + Type type = null; + for (int i = 0; i <= size - 1; i++) { // each field in the chain + String fieldName = fieldNames.get(i).trim(); + + if (i == 0) { // first level + type = schema.findType(fieldName); + } else { // other levels including leaf + type = type.asNestedType().fieldType(fieldName); + } + + if (type == null) { + throw new IllegalArgumentException( + String.format("Can't find field %s in schema", fieldName)); + } else { + if (i == size - 1) { // leaf node should be timestamp type + if (type.typeId() != Type.TypeID.TIME) { // TODO: to use TimestampType ? + throw new IllegalArgumentException( + String.format("leaf timestamp field %s is not a timestamp type, but %s", fieldName, type.typeId())); + } + } else { + if (!type.isNestedType()) { + throw new IllegalArgumentException( + String.format("upstream field %s is not a nested type, but %s", fieldName, type.typeId())); + } + } + } + } // each field in the chain + + LOG.info("Found matched timestamp field identified by {} in the schema", timestampFieldChainAsString); + return fieldNames; + } + + /** + * @return null if timestamp field not found in the record + */ + public Long getWatermarkTimeMs(final Record record) { + if (timestampFieldChain.isEmpty()) { + return null; + } + + Record recordAlongPath = record; + + // from top to the parent of leaf + for (int i = 0; i <= timestampFieldChain.size() - 2; i++) { + recordAlongPath = (Record) recordAlongPath.getField(timestampFieldChain.get(i)); + } + + // leaf + LocalTime ts = (LocalTime) recordAlongPath.getField(timestampFieldChain.get(timestampFieldChain.size() - 1)); + if (ts == null) { + return null; + } else { + long tsInMills = ts.toNanoOfDay() / 1000; + if (timestampUnit != TimeUnit.MILLISECONDS) { + return timestampUnit.toMillis(tsInMills); + } else { + return tsInMills; + } + } + } +} diff --git a/settings.gradle b/settings.gradle index 5910fd6f1767..1885665398b9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -22,6 +22,8 @@ include 'api' include 'common' include 'core' include 'data' +include 'flink' +include 'flink-runtime' include 'mr' include 'orc' include 'parquet' @@ -37,6 +39,8 @@ project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' project(':core').name = 'iceberg-core' project(':data').name = 'iceberg-data' +project(':flink').name = 'iceberg-flink' +project(':flink-runtime').name = 'iceberg-flink-runtime' project(':mr').name = 'iceberg-mr' project(':orc').name = 'iceberg-orc' project(':arrow').name = 'iceberg-arrow' diff --git a/versions.props b/versions.props index 45a9be439e49..ffbb6d90fcc3 100644 --- a/versions.props +++ b/versions.props @@ -13,6 +13,7 @@ com.google.guava:guava = 28.0-jre com.github.ben-manes.caffeine:caffeine = 2.7.0 org.apache.arrow:arrow-vector = 0.14.1 com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 +org.apache.flink:* = 1.10.0 # test deps junit:junit = 4.12