Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
java corretto-11.0.25.9.1
scala 2.12.20
java corretto-17.0.9.8.1
scala 2.12.18
asdf-plugin-manager 1.4.0
sbt 1.8.2
python 3.7.17
18 changes: 14 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ val flink_all = Seq(
val vertx_java = Seq(
"io.vertx" % "vertx-core",
"io.vertx" % "vertx-web",
"io.vertx" % "vertx-web-client",
"io.vertx" % "vertx-config",
// wire up metrics using micro meter and statsd
"io.vertx" % "vertx-micrometer-metrics"
Expand Down Expand Up @@ -297,7 +298,7 @@ lazy val frontend = (project in file("frontend"))
)

lazy val service_commons = (project in file("service_commons"))
.dependsOn(online)
.dependsOn(api.%("compile->compile;test->test"), online)
.settings(
libraryDependencies ++= vertx_java,
libraryDependencies ++= Seq(
Expand All @@ -308,8 +309,17 @@ lazy val service_commons = (project in file("service_commons"))
// our online module's spark deps which causes the web-app to not serve up content
"io.netty" % "netty-all" % "4.1.111.Final",
// wire up metrics using micro meter and statsd
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6"
)
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2", // pinned from elsewhere
"io.vertx" % "vertx-junit5" % vertxVersion % Test,
"org.junit.jupiter" % "junit-jupiter-api" % "5.10.5" % Test
),
libraryDependencies ++= {
if (System.getProperty("os.name").toLowerCase.contains("mac"))
Seq("io.netty" % "netty-resolver-dns-native-macos" % "4.1.115.Final" classifier "osx-aarch_64")
else
Seq.empty
}
)

lazy val service = (project in file("service"))
Expand Down Expand Up @@ -418,7 +428,7 @@ lazy val orchestration = project
.dependsOn(online.%("compile->compile;test->test"))
.settings(
assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"),

Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"),
Compile / unmanagedResourceDirectories += baseDirectory.value / "src" / "main" / "resources",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,6 @@ object DriftStore {
override def initialValue(): TDeserializer = new TDeserializer(new TBinaryProtocol.Factory())
}

// todo - drop this hard-coded list in favor of a well known list or exposing as part of summaries
def breaks(count: Int): Seq[String] = (0 to count).map(_ * (100 / count)).map("p" + _.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import static ai.chronon.service.model.GetFeaturesResponse.Result.Status.Failure;
Expand Down
83 changes: 83 additions & 0 deletions service_commons/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Why?

We have a lot of glue code that maps data into objects across system and language boundaries.
This glue code adds effort, reduces maintainability with added indirection and lacks type-safety.

![img.png](img.png)

We aim to reduce this glue code instead by using serialization protocols that generate code
into various languages. We choose Thrift because it is the most entrenched protocol for the Chronon codebase already.

This module adds ability to make REST api's type-safe and boilerplate free by mapping requests into thrift objects
automagically (via reflection). Developers still have full control over url design - as they did with vert.x before.


## Usage

We basically translate a `Func<Input, Output>` -> `Func<RequestContext, Response>` via reflection to achieve this

### Setting up the endpoint

Thrift def
```c
struct TileKey {
1: optional string column
2: optional string slice
3: optional string name
4: optional i64 sizeMillis
}
```


Route declaration
```java
Function<TileKey, TileKey> thriftTransformer = input -> input; // some dummy function

router.get("/thrift_api/column/:column/slice/:slice")
.handler(RouteHandlerWrapper.createHandler(thriftTransformer, TileKey.class));
```

### For json encoded results

Requesting
```java
client.get("/thrift_api/column/my_col/slice/my_slice")
.addQueryParam("name", "my_name")
.addQueryParam("sizeMillis", "5")
.send()
```

Response
```json
{"column":"my_col","slice":"my_slice","name":"my_name","sizeMillis":5}
```

### For Thrift binary + base64 encoded results

Using thrift over the wire would shrink the payload significantly without additional deserialization penalty.
The reader side is expected to deserialize the thrift - simply by doing a base64 decode and using the `read` method
on the generated thrift classes.


Simply request with additional header param `response-content-type` set to `application/tbinary-b64`.

Below is a java way of calling - but you replicate this in any other language or cli.

```java
client.get("/thrift_api/column/my_col/slice/my_slice")
.addQueryParam("name", "my_name")
.addQueryParam("sizeMillis", "5")
.putHeader("response-content-type", "application/tbinary-b64")
.send()
```

This will produce data that looks like below.
The `data` key holds base64 encoded string of thrift binary protocol bytes.

```json
{"data":"CAABAAAAZAgAAgAAAAAA","contentType":"application/tbinary-b64"}
```

Not every language has TBinaryProtocol support. But if our py cli wants to use it to
request large graphs for lineage & planning, this should shrink the payload by a good percentage.

Binary file added service_commons/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Map;
import java.util.Optional;

/**
* Responsible for loading the relevant concrete Chronon Api implementation and providing that
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package ai.chronon.service;

import ai.chronon.api.thrift.*;
import ai.chronon.api.thrift.protocol.TBinaryProtocol;
import ai.chronon.api.thrift.transport.TTransportException;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper class for creating Route handlers that map parameters to an Input object and transform it to Output
* The wrapped handler produces a JSON response.
* TODO: Add support for Thrift BinaryProtocol based serialization based on a special request query param.
*/
public class RouteHandlerWrapper {

public static String RESPONSE_CONTENT_TYPE_HEADER = "response-content-type";
public static String TBINARY_B64_TYPE_VALUE = "application/tbinary-b64";
public static String JSON_TYPE_VALUE = "application/json";

private static final Logger LOGGER = LoggerFactory.getLogger(RouteHandlerWrapper.class.getName());

private static final ThreadLocal<TSerializer> binarySerializer = ThreadLocal.withInitial(() -> {
try {
return new TSerializer(new TBinaryProtocol.Factory());
} catch (TTransportException e) {
throw new RuntimeException(e);
}
});

private static final ThreadLocal<TDeserializer> binaryDeSerializer = ThreadLocal.withInitial(() -> {
try {
return new TDeserializer(new TBinaryProtocol.Factory());
} catch (TTransportException e) {
throw new RuntimeException(e);
}
});

private static final ThreadLocal<Base64.Encoder> base64Encoder = ThreadLocal.withInitial(Base64::getEncoder);
private static final ThreadLocal<Base64.Decoder> base64Decoder = ThreadLocal.withInitial(Base64::getDecoder);


public static <T extends TBase> T deserializeTBinaryBase64(String base64Data, Class<? extends TBase> clazz) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
byte[] binaryData = base64Decoder.get().decode(base64Data);
T tb = (T) clazz.getDeclaredConstructor().newInstance();
binaryDeSerializer.get().deserialize(tb, binaryData);
return tb;
}

private static final Map<Class<?>, Map<String, Method>> SETTER_CACHE = new ConcurrentHashMap<>();

/**
* Creates a RoutingContext handler that maps parameters to an Input object and transforms it to Output
*
* @param transformer Function to convert from Input to Output
* @param inputClass Class object for the Input type
* @param <I> Input type with setter methods
* @param <O> Output type
* @return Handler for RoutingContext that produces Output
* TODO: To use consistent helper wrappers for the response.
*/
public static <I, O> Handler<RoutingContext> createHandler(Function<I, O> transformer, Class<I> inputClass) {

return ctx -> {
try {
// Create map with path parameters
Map<String, String> params = new HashMap<>(ctx.pathParams());

// Add query parameters
for (Map.Entry<String, String> entry : ctx.queryParams().entries()) {
params.put(entry.getKey(), entry.getValue());
}

I input = createInputFromParams(params, inputClass);
O output = transformer.apply(input);

String responseFormat = ctx.request().getHeader(RESPONSE_CONTENT_TYPE_HEADER);
if (responseFormat == null || responseFormat.equals("application/json")) {
// Send json response
ctx.response().setStatusCode(200).putHeader("content-type", JSON_TYPE_VALUE).end(JsonObject.mapFrom(output).encode());
} else {
if (!responseFormat.equals(TBINARY_B64_TYPE_VALUE)) {
throw new IllegalArgumentException(String.format("Unsupported response-content-type: %s. Supported values are: %s and %s", responseFormat, JSON_TYPE_VALUE, TBINARY_B64_TYPE_VALUE));
}

// Verify output is a Thrift object before casting
if (!(output instanceof TBase)) {
throw new IllegalArgumentException("Output must be a Thrift object for binary serialization");
}
TBase<?, TFieldIdEnum> tb = (TBase<?, TFieldIdEnum>) output;
// Serialize output to Thrift BinaryProtocol
byte[] serializedOutput = binarySerializer.get().serialize(tb);
String responseBase64 = base64Encoder.get().encodeToString(serializedOutput);

ctx.response().setStatusCode(200).putHeader("content-type", TBINARY_B64_TYPE_VALUE).end(responseBase64);
}
} catch (IllegalArgumentException ex) {
LOGGER.error("Incorrect arguments passed for handler creation", ex);
ctx.response().setStatusCode(400).putHeader("content-type", "application/json").end(new JsonObject().put("error", ex.getMessage()).encode());
} catch (Exception ex) {
LOGGER.error("Internal error occurred during handler creation", ex);
ctx.response().setStatusCode(500).putHeader("content-type", "application/json").end(new JsonObject().put("error", ex.getMessage()).encode());
}
};
}

private static <I> I createInputFromParams(Map<String, String> params, Class<I> inputClass) throws Exception {
// Create new instance using no-args constructor
I input = inputClass.getDeclaredConstructor().newInstance();


Map<String, Method> setters = SETTER_CACHE.computeIfAbsent(inputClass, cls ->
Arrays.stream(cls.getMethods())
.filter(RouteHandlerWrapper::isSetter)
.collect(Collectors.toMap(RouteHandlerWrapper::getFieldNameFromSetter, method -> method))
);

// Find and invoke setters for matching parameters
for (Map.Entry<String, String> param : params.entrySet()) {
Method setter = setters.get(param.getKey());
if (setter != null) {
String paramValue = param.getValue();

if (paramValue != null) {
Type paramType = setter.getGenericParameterTypes()[0];
Object convertedValue = convertValue(paramValue, paramType);
setter.invoke(input, convertedValue);
}
}
}

return input;
}

private static boolean isSetter(Method method) {
return method.getName().startsWith("set") && !method.getName().endsWith("IsSet") && method.getParameterCount() == 1 && (method.getReturnType() == void.class || method.getReturnType() == method.getDeclaringClass());
}

private static String getFieldNameFromSetter(Method method) {
String methodName = method.getName();
String fieldName = methodName.substring(3); // Remove "set"
return fieldName.substring(0, 1).toLowerCase() + fieldName.substring(1);
}

private static Object convertValue(String value, Type targetType) { // Changed parameter to Type
// Handle Class types
if (targetType instanceof Class<?> targetClass) {

if (targetClass == String.class) {
return value;
} else if (targetClass == Integer.class || targetClass == int.class) {
return Integer.parseInt(value);
} else if (targetClass == Long.class || targetClass == long.class) {
return Long.parseLong(value);
} else if (targetClass == Double.class || targetClass == double.class) {
return Double.parseDouble(value);
} else if (targetClass == Boolean.class || targetClass == boolean.class) {
return Boolean.parseBoolean(value);
} else if (targetClass == Float.class || targetClass == float.class) {
return Float.parseFloat(value);
} else if (targetClass.isEnum()) {
try {
// Try custom fromString method first
Method fromString = targetClass.getMethod("fromString", String.class);
Object result = fromString.invoke(null, value);
if (value != null && result == null) {
throw new IllegalArgumentException(String.format("Invalid enum value %s for type %s", value, targetClass.getSimpleName()));
}
return result;
} catch (NoSuchMethodException e) {
// Fall back to standard enum valueOf
return Enum.valueOf(targetClass.asSubclass(Enum.class), value.toUpperCase());
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Error converting %s to enum type %s : %s", value, targetClass.getSimpleName(), e.getMessage()));
}
}
}

// Handle parameterized types (List, Map)
if (targetType instanceof ParameterizedType parameterizedType) {
Class<?> rawType = (Class<?>) parameterizedType.getRawType();

// Handle List types
if (List.class.isAssignableFrom(rawType)) {
Type elementType = parameterizedType.getActualTypeArguments()[0];
return Arrays
.stream(value.split(","))
.map(v -> convertValue(v.trim(), elementType))
.collect(Collectors.toList());
}

// Handle Map types
if (Map.class.isAssignableFrom(rawType)) {
Type keyType = parameterizedType.getActualTypeArguments()[0];
Type valueType = parameterizedType.getActualTypeArguments()[1];
return Arrays
.stream(value.split(","))
.map(entry -> entry.split(":"))
.filter(kv -> {
if (kv.length != 2) {
throw new IllegalArgumentException("Invalid map entry format. Expected 'key:value' but got: " + String.join(":", kv));
}
return true;
})
.collect(Collectors.toMap(
kv -> convertValue(kv[0].trim(), keyType),
kv -> convertValue(kv[1].trim(), valueType)
));
}
}

throw new IllegalArgumentException("Unsupported type: " + targetType.getTypeName());
}
}

Loading
Loading