Skip to content

Commit

Permalink
[SEDONA-559] Make the flink example work (#1420)
Browse files Browse the repository at this point in the history
* [hotfix] Enable scalastyle config

* [SEDONA-559] Make the flink example project work

* [hotfix] unbundle the example projects

* [hotfix] fix example version in github ci/cd pipeline

* [hotfix] Remove slf4j-api 1.7.36 from example parent because spark need slf4j-api 2.x

* [hotfix] remove parent pom
  • Loading branch information
docete authored May 29, 2024
1 parent a6b4ab5 commit ee74206
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 55 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ jobs:
SPARK_VERSION: ${{ matrix.spark }}
SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
SEDONA_VERSION: ${{ matrix.sedona }}
run: (cd examples/spark-sql;mvn clean install -Dspark.version=${SPARK_VERSION} -Dspark.compat.version=${SPARK_COMPAT_VERSION} -Dsedona.version=${SEDONA_VERSION};java -jar target/sedona-spark-example-1.0.0.jar)
- run: (cd examples/flink-sql;mvn clean install;java -jar target/sedona-flink-example-1.0.0.jar)
run: (cd examples/spark-sql;mvn clean install -Dspark.version=${SPARK_VERSION} -Dspark.compat.version=${SPARK_COMPAT_VERSION} -Dsedona.version=${SEDONA_VERSION};java -jar target/sedona-spark-example-1.6.0.jar)
- run: (cd examples/flink-sql;mvn clean install;java -jar target/sedona-flink-example-1.6.0.jar)
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/target/
target
/.idea
/*.iml
*.iml
/*.ipr
/*.iws
/.settings/
/.classpath
/.project
/dependency-reduced-pom.xml
dependency-reduced-pom.xml
/bin/
/doc/
/conf/
Expand Down
75 changes: 52 additions & 23 deletions examples/flink-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,28 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.sedona</groupId>
<artifactId>sedona-flink-example</artifactId>
<version>1.0.0</version>

<version>1.6.0</version>
<name>Sedona : Examples : Flink</name>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<geotools.version>1.5.1-28.2</geotools.version>
<geotools.scope>compile</geotools.scope>
<scala.compat.version>2.12</scala.compat.version>
<sedona.version>1.5.1</sedona.version>
<flink.version>1.14.3</flink.version>
<flink.version>1.19.0</flink.version>
<flink.scope>compile</flink.scope>
<scala.compat.version>2.12</scala.compat.version>
<geotools.version>28.2</geotools.version>
<log4j.version>2.17.2</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-flink_${scala.compat.version}</artifactId>
<version>${sedona.version}</version>
</dependency>
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geotools-wrapper</artifactId>
<version>${geotools.version}</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -56,28 +52,21 @@
<!-- For Flink DataStream API-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<!-- Flink Kafka connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.compat.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<!-- For playing flink in IDE-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.compat.version}</artifactId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<!-- For Flink flink api, planner, udf/udt, csv-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.compat.version}</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
Expand All @@ -100,10 +89,50 @@
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>

<!-- Add geotools dependencies, to make the examples use geotools in the IDE -->
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
<version>${geotools.version}</version>
<scope>compile</scope>
</dependency>

<!-- Add a logging Framework, to make the examples produce -->
<!-- logs when executing in the IDE -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<!-- Let log4j code log to log4j2 logfile -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<!-- For Flink Web Ui in test-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.compat.version}</artifactId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
19 changes: 12 additions & 7 deletions examples/flink-sql/src/main/java/FlinkExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.sedona.flink.SedonaFlinkRegistrator;
import org.apache.sedona.flink.expressions.Constructors;

Expand All @@ -43,17 +44,18 @@ public static void main(String[] args) {

// Create a fake WKT string table source
Table pointWktTable = Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames);

// Create a geometry column
Table pointTable = pointWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
$(pointColNames[0])).as(pointColNames[0]),
$(pointColNames[1]));
Table pointTable = pointWktTable.select(
call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]),
$(pointColNames[1]));

// Create S2CellID
pointTable = pointTable.select($(pointColNames[0]), $(pointColNames[1]),
call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
// Explode s2id array
tableEnv.createTemporaryView("pointTable", pointTable);
pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");
pointTable.execute().print();


// Create a fake WKT string table source
Expand All @@ -68,13 +70,16 @@ public static void main(String[] args) {
// Explode s2id array
tableEnv.createTemporaryView("polygonTable", polygonTable);
polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");
polygonTable.execute().print();

// TODO: TableImpl.print() occurs EOF Exception due to https://issues.apache.org/jira/browse/FLINK-35406
// Use polygonTable.execute().print() when FLINK-35406 is fixed.
polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row));

// Join two tables by their S2 ids
Table joinResult = pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
// Optional: remove false positives
joinResult = joinResult.where("ST_Contains(geom_polygon, geom_point)");
joinResult.execute().print();
joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point")));
joinResult.execute().collect().forEachRemaining(row -> System.out.println(row));
}

}
25 changes: 25 additions & 0 deletions examples/flink-sql/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Binary file removed examples/spark-sql/colocationMap.png
Binary file not shown.
6 changes: 3 additions & 3 deletions examples/spark-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-example</artifactId>
<version>1.0.0</version>

<name>${project.groupId}:${project.artifactId}</name>
<version>1.6.0</version>
<name>Sedona : Examples : Spark</name>
<description>Maven Example for SedonaDB</description>
<packaging>jar</packaging>

Expand Down
1 change: 0 additions & 1 deletion flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

<properties>
<maven.deploy.skip>${skip.deploy.common.modules}</maven.deploy.skip>
<flink.version>1.19.0</flink.version>
<flink.scope>provided</flink.scope>
</properties>

Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<spark.compat.version>3.0</spark.compat.version>
<log4j.version>2.17.2</log4j.version>

<flink.version>1.19.0</flink.version>
<slf4j.version>1.7.36</slf4j.version>
<googles2.version>2.0.0</googles2.version>
<uberh3.version>4.1.1</uberh3.version>
Expand Down Expand Up @@ -525,7 +526,7 @@
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>${project.basedir}/../scalastyle_config.xml</configLocation>
<configLocation>tools/maven/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
Expand Down
15 changes: 0 additions & 15 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,4 @@
</dependency>
</dependencies>
</dependencyManagement>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<configuration>
<configLocation>${project.basedir}/../../scalastyle_config.xml</configLocation>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

</project>
File renamed without changes.

0 comments on commit ee74206

Please sign in to comment.