Skip to content

Commit

Permalink
kamu: Register GeoSpark types at session startup
Browse files Browse the repository at this point in the history
kamu: Register types after thriftserver creates a sub-session

kamu: Updates for Spark 3

kamu: Update to latest Sedona

kamu: Improve readme
  • Loading branch information
sergiimk committed Feb 10, 2024
1 parent 96b1eb6 commit 6de52a7
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 0 deletions.
22 changes: 22 additions & 0 deletions KAMU.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Kamu fork of Apache Livy

Kamu is building its own version of Livy for two reasons:
- As of this time latest Livy release does not contain Slaca 2.12 binaries, which are needed for Spark 3+ compatibility
- When working with Livy we want Apache Sedona types (GIS library) to be pre-registered inside the user's Spark session. Livy and Spark don't provide any hooks to do so, so we had to extend Livy to add them upon Spark session startup
- **TODO:** We shoud investigate if the recently added `spark.sql.extensions` config option can allow such auto-registration

## Building the fork

First clone the repo and make sure you're on `kamu` branch.

Build command:
```sh
mvn package -Pthriftserver -Pspark-3.0 -Pspark.version=3.0.0 -Pscala.version=2.12.13 -Pscala.binary.version=2.12 -DskipITs -DskipTests
```
... this will take a while.

Package will be produced under:

```sh
./assembly/target/apache-livy-{version}-kamu-bin.zip
```
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>

<!-- kamu -->
<sedona.version>1.0.1-incubating</sedona.version>
</properties>

<repositories>
Expand Down
9 changes: 9 additions & 0 deletions rsc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@
<artifactId>slf4j-reload4j</artifactId>
<scope>provided</scope>
</dependency>

<!-- kamu -->
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-python-adapter-3.0_2.12</artifactId>
<version>${sedona.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.sedona.sql.utils.SedonaSQLRegistrator;

public class SparkEntries {

private static final Logger LOG = LoggerFactory.getLogger(SparkEntries.class);
Expand Down Expand Up @@ -84,6 +86,9 @@ public SparkSession sparkSession() {
sparksession = builder.getOrCreate();
LOG.info("Created Spark session.");
}

LOG.info("Registering Sedona UDTs / UDFs");
SedonaSQLRegistrator.registerAll(sparksession);
} catch (Exception e) {
LOG.warn("SparkSession is not supported", e);
throw e;
Expand Down
9 changes: 9 additions & 0 deletions thriftserver/session/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- kamu -->
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-python-adapter-3.0_2.12</artifactId>
<version>${sedona.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import org.apache.livy.JobContext;

import org.apache.sedona.sql.utils.SedonaSQLRegistrator;

/**
* State related to one Thrift session. One instance of this class is stored in the session's
* shared object map for each Thrift session that connects to the backing Livy session.
Expand Down Expand Up @@ -77,6 +79,8 @@ private ThriftSessionState(JobContext ctx, String sessionId) throws Exception {
this.sessionId = sessionId;
this.statements = new ConcurrentHashMap<>();
this.spark = ctx.<SparkSession>sparkSession().newSession();

SedonaSQLRegistrator.registerAll(this.spark);
}

SparkSession spark() {
Expand Down

0 comments on commit 6de52a7

Please sign in to comment.