Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions .github/workflows/flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,6 @@ on:
- 'site/**'

jobs:
flink-common-tests:
runs-on: ubuntu-latest
strategy:
matrix:
jvm: [8, 11]
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
with:
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions= -DhiveVersions= :iceberg-flink:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v2
if: failure()
with:
name: test logs
path: |
**/build/testlogs

flink-tests:
runs-on: ubuntu-latest
strategy:
Expand Down
89 changes: 0 additions & 89 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,95 +17,6 @@
* under the License.
*/

project(':iceberg-flink') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(':iceberg-core')
api project(':iceberg-data')
implementation project(':iceberg-orc')
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

compileOnly "org.apache.flink:flink-streaming-java_2.12"
compileOnly "org.apache.flink:flink-streaming-java_2.12::tests"
compileOnly "org.apache.flink:flink-table-api-java-bridge_2.12"
compileOnly "org.apache.flink:flink-table-planner-blink_2.12"
compileOnly "org.apache.flink:flink-table-planner_2.12"
compileOnly "org.apache.hadoop:hadoop-hdfs"
compileOnly "org.apache.hadoop:hadoop-common"
compileOnly("org.apache.hadoop:hadoop-minicluster") {
exclude group: 'org.apache.avro', module: 'avro'
}

implementation("org.apache.parquet:parquet-avro") {
exclude group: 'org.apache.avro', module: 'avro'
// already shaded by Parquet
exclude group: 'it.unimi.dsi'
exclude group: 'org.codehaus.jackson'
}

compileOnly "org.apache.avro:avro"

implementation("org.apache.orc:orc-core::nohive") {
exclude group: 'org.apache.hadoop'
exclude group: 'commons-lang'
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}

testImplementation "org.apache.flink:flink-core"
testImplementation "org.apache.flink:flink-runtime_2.12"
testImplementation "org.apache.flink:flink-table-planner-blink_2.12"
testImplementation("org.apache.flink:flink-test-utils-junit") {
exclude group: 'junit'
}
testImplementation("org.apache.flink:flink-test-utils_2.12") {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}

testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')

// By default, hive-exec is a fat/uber jar and it exports a guava library
// that's really old. We use the core classifier to be able to override our guava
// version. Luckily, hive-exec seems to work okay so far with this version of guava
// See: https://github.com/apache/hive/blob/master/ql/pom.xml#L911 for more context.
testImplementation("org.apache.hive:hive-exec::core") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.pentaho' // missing dependency
exclude group: 'org.apache.hive', module: 'hive-llap-tez'
exclude group: 'org.apache.logging.log4j'
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.calcite'
exclude group: 'org.apache.calcite.avatica'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}

testImplementation("org.apache.hive:hive-metastore") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.pentaho' // missing dependency
exclude group: 'org.apache.hbase'
exclude group: 'org.apache.logging.log4j'
exclude group: 'co.cask.tephra'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all'
exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet'
exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle'
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
}
}
}

def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",")

if (flinkVersions.contains("1.12")) {
Expand Down
13 changes: 0 additions & 13 deletions flink/v1.12/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@ configure(flinkProjects) {

project(':iceberg-flink:iceberg-flink-1.12') {

sourceSets {
main {
java.srcDirs = [
"${project(':iceberg-flink').projectDir}/src/main/java",
"src/main/java"
]
resources.srcDirs = [
"${project(':iceberg-flink').projectDir}/src/main/resources",
"src/main/resources"
]
}
}

dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
Expand Down
13 changes: 0 additions & 13 deletions flink/v1.13/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@ configure(flinkProjects) {

project(':iceberg-flink:iceberg-flink-1.13') {

sourceSets {
main {
java.srcDirs = [
"${project(':iceberg-flink').projectDir}/src/main/java",
"src/main/java"
]
resources.srcDirs = [
"${project(':iceberg-flink').projectDir}/src/main/resources",
"src/main/resources"
]
}
}

dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.io.Serializable;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/**
* Serializable loader to load an Iceberg {@link Catalog}.
*/
public interface CatalogLoader extends Serializable {

/**
* Create a new catalog with the provided properties. NOTICE: for flink, we may initialize the {@link CatalogLoader}
* at flink sql client side or job manager side, and then serialize this catalog loader to task manager, finally
* deserialize it and create a new catalog at task manager side.
*
* @return a newly created {@link Catalog}
*/
Catalog loadCatalog();

static CatalogLoader hadoop(String name, Configuration hadoopConf, Map<String, String> properties) {
return new HadoopCatalogLoader(name, hadoopConf, properties);
}

static CatalogLoader hive(String name, Configuration hadoopConf, Map<String, String> properties) {
return new HiveCatalogLoader(name, hadoopConf, properties);
}

static CatalogLoader custom(String name, Map<String, String> properties, Configuration hadoopConf, String impl) {
return new CustomCatalogLoader(name, properties, hadoopConf, impl);
}

class HadoopCatalogLoader implements CatalogLoader {
private final String catalogName;
private final SerializableConfiguration hadoopConf;
private final String warehouseLocation;
private final Map<String, String> properties;

private HadoopCatalogLoader(
String catalogName,
Configuration conf,
Map<String, String> properties) {
this.catalogName = catalogName;
this.hadoopConf = new SerializableConfiguration(conf);
this.warehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
this.properties = Maps.newHashMap(properties);
}

@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), catalogName, properties, hadoopConf.get());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("catalogName", catalogName)
.add("warehouseLocation", warehouseLocation)
.toString();
}
}

class HiveCatalogLoader implements CatalogLoader {
private final String catalogName;
private final SerializableConfiguration hadoopConf;
private final String uri;
private final String warehouse;
private final int clientPoolSize;
private final Map<String, String> properties;

private HiveCatalogLoader(String catalogName, Configuration conf, Map<String, String> properties) {
this.catalogName = catalogName;
this.hadoopConf = new SerializableConfiguration(conf);
this.uri = properties.get(CatalogProperties.URI);
this.warehouse = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
this.clientPoolSize = properties.containsKey(CatalogProperties.CLIENT_POOL_SIZE) ?
Integer.parseInt(properties.get(CatalogProperties.CLIENT_POOL_SIZE)) :
CatalogProperties.CLIENT_POOL_SIZE_DEFAULT;
this.properties = Maps.newHashMap(properties);
}

@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(HiveCatalog.class.getName(), catalogName, properties, hadoopConf.get());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("catalogName", catalogName)
.add("uri", uri)
.add("warehouse", warehouse)
.add("clientPoolSize", clientPoolSize)
.toString();
}
}

class CustomCatalogLoader implements CatalogLoader {

private final SerializableConfiguration hadoopConf;
private final Map<String, String> properties;
private final String name;
private final String impl;

private CustomCatalogLoader(
String name,
Map<String, String> properties,
Configuration conf,
String impl) {
this.hadoopConf = new SerializableConfiguration(conf);
this.properties = Maps.newHashMap(properties); // wrap into a hashmap for serialization
this.name = name;
this.impl = Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null");
}

@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(impl, name, properties, hadoopConf.get());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("impl", impl)
.toString();
}
}

}
Loading