Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ export JAVA_OPTS

JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
JAVA_INTP_OPTS+=" -Dlog4j.configuration='file://${ZEPPELIN_CONF_DIR}/log4j.properties' -Dlog4j.configurationFile='file://${ZEPPELIN_CONF_DIR}/log4j2.properties'"
else
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
fi
Expand Down
53 changes: 53 additions & 0 deletions conf/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

logger.flink.name = org.apache.zeppelin.flink
logger.flink.level = DEBUG


# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = File
appender.main.append = false
appender.main.fileName = ${sys:zeppelin.log.file}
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
53 changes: 53 additions & 0 deletions flink/flink-shims/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<parent>
<artifactId>flink-parent</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink-shims</artifactId>
<version>0.9.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Zeppelin: Flink Shims</name>

<build>
<plugins>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-interpreter-setting</id>
<phase>none</phase>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.flink;


import org.apache.zeppelin.interpreter.InterpreterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.util.List;
import java.util.Properties;

/**
* This is abstract class for anything that is api incompatible between different flink versions. It will
* load the correct version of FlinkShims based on the version of flink.
*/
public abstract class FlinkShims {

private static final Logger LOGGER = LoggerFactory.getLogger(FlinkShims.class);

private static FlinkShims flinkShims;

protected Properties properties;

public FlinkShims(Properties properties) {
this.properties = properties;
}

private static FlinkShims loadShims(FlinkVersion flinkVersion, Properties properties)
throws Exception {
Class<?> flinkShimsClass;
if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 10) {
LOGGER.info("Initializing shims for Flink 1.10");
flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink110Shims");
} else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() >= 11) {
LOGGER.info("Initializing shims for Flink 1.11");
flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink111Shims");
} else {
throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
}

Constructor c = flinkShimsClass.getConstructor(Properties.class);
return (FlinkShims) c.newInstance(properties);
}

/**
*
* @param flinkVersion
* @param properties
* @return
*/
public static FlinkShims getInstance(FlinkVersion flinkVersion,
Properties properties) throws Exception {
if (flinkShims == null) {
flinkShims = loadShims(flinkVersion, properties);
}
return flinkShims;
}

public abstract Object createCatalogManager(Object config);

public abstract String getPyFlinkPythonPath(Properties properties) throws IOException;

public abstract Object getCollectStreamTableSink(InetAddress targetAddress,
int targetPort,
Object serializer);

public abstract List collectToList(Object table) throws Exception;

public abstract void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception;

public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception;

public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception;

public abstract boolean rowEquals(Object row1, Object row2);

public abstract Object fromDataSet(Object btenv, Object ds);

public abstract Object toDataSet(Object btenv, Object table);

public abstract void registerTableFunction(Object btenv, String name, Object tableFunction);

public abstract void registerAggregateFunction(Object btenv, String name, Object aggregateFunction);

public abstract void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction);

public abstract void registerTableSink(Object stenv, String tableName, Object collectTableSink);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.flink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class FlinkVersion {
private static final Logger logger = LoggerFactory.getLogger(FlinkVersion.class);

private int majorVersion;
private int minorVersion;
private int patchVersion;
private String versionString;

FlinkVersion(String versionString) {
this.versionString = versionString;

try {
int pos = versionString.indexOf('-');

String numberPart = versionString;
if (pos > 0) {
numberPart = versionString.substring(0, pos);
}

String versions[] = numberPart.split("\\.");
this.majorVersion = Integer.parseInt(versions[0]);
this.minorVersion = Integer.parseInt(versions[1]);
if (versions.length == 3) {
this.patchVersion = Integer.parseInt(versions[2]);
}

} catch (Exception e) {
logger.error("Can not recognize Spark version " + versionString +
". Assume it's a future release", e);
}
}

public int getMajorVersion() {
return majorVersion;
}

public int getMinorVersion() {
return minorVersion;
}

public String toString() {
return versionString;
}

public static FlinkVersion fromVersionString(String versionString) {
return new FlinkVersion(versionString);
}

public boolean isFlink110() {
return this.majorVersion == 1 && minorVersion == 10;
}
}
Loading