Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test_scala_no_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ jobs:
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 hub/test"

- name: Run router tests
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 router/test"
24 changes: 22 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ inThisBuild(
lazy val supportedVersions = List(scala_2_12) // List(scala211, scala212, scala213)

lazy val root = (project in file("."))
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub)
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub, router)
.settings(name := "chronon")

val spark_sql = Seq(
Expand Down Expand Up @@ -260,7 +260,7 @@ lazy val hub = (project in file("hub"))
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
"org.scala-lang.modules" %% "scala-xml" % "2.1.0",
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0",
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2",
),
libraryDependencies ++= circe,
libraryDependencySchemes ++= Seq(
Expand All @@ -283,6 +283,26 @@ lazy val hub = (project in file("hub"))
)
)

lazy val router = project
.dependsOn(api.%("compile->compile;test->test"))
.settings(
libraryDependencies ++= Seq(
"io.vertx" % "vertx-core" % "4.5.9",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled out the vertx deps into a collection in my PR to migrate play to vertx, so post rebase you could pull that collection in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"io.vertx" % "vertx-web" % "4.5.9",
"io.vertx" % "vertx-web-client" % "4.5.9",
"io.vertx" % "vertx-junit5" % "4.5.9",
"org.slf4j" % "slf4j-api" % "1.7.36", // match with spark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a val declared for the slf4j version - could use that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2", // pinned from elsewhere
"org.junit.jupiter" % "junit-jupiter-api" % "5.10.5" % Test
),
libraryDependencies ++= {
if (System.getProperty("os.name").toLowerCase.contains("mac"))
Seq("io.netty" % "netty-resolver-dns-native-macos" % "4.1.115.Final" classifier "osx-aarch_64")
else
Seq.empty
}
)

ThisBuild / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", _*) => MergeStrategy.filterDistinctLines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import ai.chronon.api._
import ai.chronon.api.thrift.TDeserializer
import ai.chronon.api.thrift.TSerializer
import ai.chronon.api.thrift.protocol.TBinaryProtocol
import ai.chronon.api.thrift.protocol.TProtocolFactory
import ai.chronon.online.KVStore
import ai.chronon.online.KVStore.GetRequest
import ai.chronon.online.MetadataStore
import ai.chronon.online.stats.DriftStore.binaryDeserializer
import ai.chronon.online.stats.DriftStore.binarySerializer

import java.io.Serializable
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
Expand Down Expand Up @@ -196,10 +194,6 @@ class DriftStore(kvStore: KVStore,
}

object DriftStore {
class SerializableSerializer(factory: TProtocolFactory) extends TSerializer(factory) with Serializable

// crazy bug in compact protocol - do not change to compact

@transient
lazy val binarySerializer: ThreadLocal[TSerializer] = new ThreadLocal[TSerializer] {
override def initialValue(): TSerializer = new TSerializer(new TBinaryProtocol.Factory())
Expand Down
121 changes: 121 additions & 0 deletions router/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Why?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other related PR - #118, I pulled some of the common Vert.x code between Hub & fetcher in a module - service_commons, wdyt of folding router into that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


We have a lot of glue code that maps data into objects across system and language boundaries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing up this Readme & documenting!

This glue code adds effort, reduces maintainability with added indirection and lacks type-safety.

![img.png](img.png)

We aim to reduce this glue code instead by using serialization protocols that generate code
into various languages. We choose Thrift because it is the most entrenched protocol for the Chronon codebase already.

This module adds ability to make REST api's type-safe and boilerplate free by mapping requests into thrift objects
automagically (via reflection). Developers still have full control over url design - as they did with vert.x before.


## Usage

We basically translate a `Func<Input, Output>` -> `Func<RequestContext, Response>` via reflection to achieve this

### Setting up the endpoint

Thrift def
```c
struct TileKey {
1: optional string column
2: optional string slice
3: optional string name
4: optional i64 sizeMillis
}
```


Route declaration
```java
Function<TileKey, TileKey> thriftTransformer = input -> input; // some dummy function

router.get("/thrift_api/column/:column/slice/:slice")
.handler(RouteHandlerWrapper.createHandler(thriftTransformer, TileKey.class));
```

### For json encoded results

Requesting
```java
client.get("/thrift_api/column/my_col/slice/my_slice")
.addQueryParam("name", "my_name")
.addQueryParam("sizeMillis", "5")
.send()
```

Response
```json
{"column":"my_col","slice":"my_slice","name":"my_name","sizeMillis":5}
```

### For Thrift binary + base64 encoded results

Using thrift over the wire would shrink the payload significantly without additional deserialization penalty.
The reader side is expected to deserialize the thrift - simply by doing a base64 decode and using the `read` method
on the generated thrift classes.


Simply request with additional header param `response-content-type` set to `application/tbinary-b64`.

Below is a java way of calling - but you replicate this in any other language or cli.

```java
client.get("/thrift_api/column/my_col/slice/my_slice")
.addQueryParam("name", "my_name")
.addQueryParam("sizeMillis", "5")
.putHeader("response-content-type", "application/tbinary-b64")
.send()
```

This will produce data that looks like below.
The `data` key holds base64 encoded string of thrift binary compact protocol bytes.

```json
{"data":"CAABAAAAZAgAAgAAAAAA","contentType":"application/tbinary-b64"}
```


### [Untested] Here is how you would use this in javascript

First - write a helper method
```js
// Assuming you have your Thrift-generated code imported
import { TBinaryProtocol, TFramedTransport } from 'thrift';

// returns a thrift protocol with the data ready to be read - type agnostic
function payloadToThriftProtocol(base64String) {
const binaryString = atob(base64String);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
const binaryData = base64ToBinary(bytes);
const transport = new TFramedTransport(binaryData);
const protocol = new TBinaryProtocol(transport);
return protocol
}
```

Second - use the helper to decode the payload
```js
import { YourThriftObject } from './gen-js/your_thrift';

async function handleThriftData(base64EncodedData) {
try {
const thriftObj = new YourThriftObject();
const buffer = payloadToThriftProtocol(base64EncodedData)
thriftObj.read(buffer);

console.log('Decoded Thrift object:', thriftObj);
return decodedObject;
} catch (error) {
console.error('Failed to decode Thrift data:', error);
throw error;
}
}
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix issues in the JavaScript implementation.

The JavaScript example contains several issues:

  1. The base64ToBinary function is used but not defined
  2. The decodedObject variable is returned but never defined
  3. Error handling could be more specific

Consider this improvement:

 function payloadToThriftProtocol(base64String) {
     const binaryString = atob(base64String);
     const bytes = new Uint8Array(binaryString.length);
     for (let i = 0; i < binaryString.length; i++) {
         bytes[i] = binaryString.charCodeAt(i);
     }
-    const binaryData = base64ToBinary(bytes);
+    const binaryData = bytes.buffer;
     const transport = new TFramedTransport(binaryData);
     const protocol = new TBinaryProtocol(transport);
     return protocol
 }

 async function handleThriftData(base64EncodedData) {
     try {
         const thriftObj = new YourThriftObject();
         const buffer = payloadToThriftProtocol(base64EncodedData)
         thriftObj.read(buffer);
      
         console.log('Decoded Thrift object:', thriftObj);
-        return decodedObject;
+        return thriftObj;
     } catch (error) {
-        console.error('Failed to decode Thrift data:', error);
+        console.error('Failed to decode Thrift data:', error.message);
         throw error;
     }
 }

Also, consider removing the [Untested] tag once the implementation has been verified.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// returns a thrift protocol with the data ready to be read - type agnostic
function payloadToThriftProtocol(base64String) {
const binaryString = atob(base64String);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
const binaryData = base64ToBinary(bytes);
const transport = new TFramedTransport(binaryData);
const protocol = new TBinaryProtocol(transport);
return protocol
}
```
Second - use the helper to decode the payload
```js
import { YourThriftObject } from './gen-js/your_thrift';
async function handleThriftData(base64EncodedData) {
try {
const thriftObj = new YourThriftObject();
const buffer = payloadToThriftProtocol(base64EncodedData)
thriftObj.read(buffer);
console.log('Decoded Thrift object:', thriftObj);
return decodedObject;
} catch (error) {
console.error('Failed to decode Thrift data:', error);
throw error;
}
}
```
// returns a thrift protocol with the data ready to be read - type agnostic
function payloadToThriftProtocol(base64String) {
const binaryString = atob(base64String);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
const binaryData = bytes.buffer;
const transport = new TFramedTransport(binaryData);
const protocol = new TBinaryProtocol(transport);
return protocol
}
Second - use the helper to decode the payload


Binary file added router/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
19 changes: 19 additions & 0 deletions router/src/main/java/ai/chronon/router/BinaryResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ai.chronon.router;

public class BinaryResponse {
private final String data;
private final String contentType;

public BinaryResponse(String data, String contentType) {
this.data = data;
this.contentType = contentType;
}

public String getData() {
return data;
}

public String getContentType() {
return contentType;
}
}
Loading
Loading