Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test_scala_no_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ jobs:
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 hub/test"

- name: Run router tests
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 router/test"
23 changes: 21 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ inThisBuild(
lazy val supportedVersions = List(scala_2_12) // List(scala211, scala212, scala213)

lazy val root = (project in file("."))
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub)
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub, router)
.settings(name := "chronon")

val spark_sql = Seq(
Expand Down Expand Up @@ -260,7 +260,7 @@ lazy val hub = (project in file("hub"))
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
"org.scala-lang.modules" %% "scala-xml" % "2.1.0",
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0",
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2",
),
libraryDependencies ++= circe,
libraryDependencySchemes ++= Seq(
Expand All @@ -283,6 +283,25 @@ lazy val hub = (project in file("hub"))
)
)

lazy val router = project
.dependsOn(api.%("compile->compile;test->test"))
.settings(
libraryDependencies ++= Seq(
"io.vertx" % "vertx-core" % "4.5.9",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled out the vertx deps into a collection in my PR to migrate play to vertx, so post rebase you could pull that collection in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"io.vertx" % "vertx-web" % "4.5.9",
"io.vertx" % "vertx-web-client" % "4.5.9",
"io.vertx" % "vertx-junit5" % "4.5.9",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2", // pinned from elsewhere
"org.junit.jupiter" % "junit-jupiter-api" % "5.10.5" % Test
),
libraryDependencies ++= {
if (System.getProperty("os.name").toLowerCase.contains("mac"))
Seq("io.netty" % "netty-resolver-dns-native-macos" % "4.1.115.Final" classifier "osx-aarch_64")
else
Seq.empty
}
)

ThisBuild / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", _*) => MergeStrategy.filterDistinctLines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import ai.chronon.api._
import ai.chronon.api.thrift.TDeserializer
import ai.chronon.api.thrift.TSerializer
import ai.chronon.api.thrift.protocol.TBinaryProtocol
import ai.chronon.api.thrift.protocol.TProtocolFactory
import ai.chronon.online.KVStore
import ai.chronon.online.KVStore.GetRequest
import ai.chronon.online.MetadataStore
import ai.chronon.online.stats.DriftStore.binaryDeserializer
import ai.chronon.online.stats.DriftStore.binarySerializer

import java.io.Serializable
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
Expand Down Expand Up @@ -196,10 +194,6 @@ class DriftStore(kvStore: KVStore,
}

object DriftStore {
class SerializableSerializer(factory: TProtocolFactory) extends TSerializer(factory) with Serializable

// crazy bug in compact protocol - do not change to compact

@transient
lazy val binarySerializer: ThreadLocal[TSerializer] = new ThreadLocal[TSerializer] {
override def initialValue(): TSerializer = new TSerializer(new TBinaryProtocol.Factory())
Expand Down
Empty file added router/README.md
Empty file.
178 changes: 178 additions & 0 deletions router/src/main/java/ai/chronon/router/RouteHandlerWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package ai.chronon.router;

import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import io.vertx.core.json.JsonObject;

import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.lang.reflect.ParameterizedType;
import java.io.StringWriter;
import java.io.PrintWriter;
import java.util.stream.Collectors;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public class RouteHandlerWrapper {

/**
* Creates a RoutingContext handler that maps parameters to an Input object and transforms it to Output
*
* @param transformer Function to convert from Input to Output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing to maybe call out explicitly is the transformer is responsible for wrapping the work in a vert.x future if its time consuming. Else we'll quickly exhaust event threads

* @param inputClass Class object for the Input type
* @param <I> Input type with setter methods
* @param <O> Output type
* @return Handler for RoutingContext that produces Output
*/
public static <I, O> Handler<RoutingContext> createHandler(
Function<I, O> transformer,
Class<I> inputClass) {

return ctx -> {
try {
// Create parameter map
Map<String, String> params = new HashMap<>();

// Add path parameters
params.putAll(ctx.pathParams());

// Add query parameters
for (Map.Entry<String, String> entry : ctx.queryParams().entries()) {
params.put(entry.getKey(), entry.getValue());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation for request parameters

The current implementation accepts all parameters without validation. Consider:

  • Adding size limits for parameter values
  • Validating parameter names against expected patterns
  • Implementing rate limiting for requests

Here's a suggested improvement:

 Map<String, String> params = new HashMap<>();
+// Add validation for parameter count
+if (ctx.queryParams().size() > MAX_PARAMS) {
+    throw new IllegalArgumentException("Too many parameters");
+}
+
+// Validate parameter names and values
+for (Map.Entry<String, String> entry : ctx.queryParams().entries()) {
+    if (!isValidParamName(entry.getKey())) {
+        throw new IllegalArgumentException("Invalid parameter name: " + entry.getKey());
+    }
+    if (entry.getValue().length() > MAX_PARAM_LENGTH) {
+        throw new IllegalArgumentException("Parameter value too long: " + entry.getKey());
+    }
     params.put(entry.getKey(), entry.getValue());
 }

Committable suggestion skipped: line range outside the PR's diff.


I input = createInputFromParams(params, inputClass);
O output = transformer.apply(input);

// Send response
ctx.response()
.putHeader("content-type", "application/json")
.end(JsonObject.mapFrom(output).encode());

} catch (Exception e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets log here instead of printstack trace. Our logging configs will take care of dumping the st

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String stackTrace = sw.toString();

ctx.response()
.setStatusCode(400)
.putHeader("content-type", "application/json")
.end(new JsonObject()
.put("error", stackTrace)
.encode());
}
};
}

private static <I> I createInputFromParams(Map<String, String> params, Class<I> inputClass)
throws Exception {
// Create new instance using no-args constructor
I input = inputClass.getDeclaredConstructor().newInstance();

// Get all methods
Method[] methods = inputClass.getMethods();

// Find and invoke setters for matching parameters
for (Method method : methods) {
if (isSetter(method)) {
String fieldName = getFieldNameFromSetter(method);
String paramValue = params.get(fieldName);

if (paramValue != null) {
Type paramType = method.getGenericParameterTypes()[0];
Object convertedValue = convertValue(paramValue, paramType);
method.invoke(input, convertedValue);
}
}
}

return input;
}

private static boolean isSetter(Method method) {
return method.getName().startsWith("set") &&
!method.getName().endsWith("IsSet") &&
method.getParameterCount() == 1 &&
(method.getReturnType() == void.class || method.getReturnType() == method.getDeclaringClass());
}

private static String getFieldNameFromSetter(Method method) {
String methodName = method.getName();
String fieldName = methodName.substring(3); // Remove "set"
return fieldName.substring(0, 1).toLowerCase() + fieldName.substring(1);
}

private static Object convertValue(String value, Type targetType) { // Changed parameter to Type
// Handle Class types
if (targetType instanceof Class<?>) {
Class<?> targetClass = (Class<?>) targetType;

if (targetClass == String.class) {
return value;
} else if (targetClass == Integer.class || targetClass == int.class) {
return Integer.parseInt(value);
} else if (targetClass == Long.class || targetClass == long.class) {
return Long.parseLong(value);
} else if (targetClass == Double.class || targetClass == double.class) {
return Double.parseDouble(value);
} else if (targetClass == Boolean.class || targetClass == boolean.class) {
return Boolean.parseBoolean(value);
} else if (targetClass == Float.class || targetClass == float.class) {
return Float.parseFloat(value);
} else if (targetClass.isEnum()) {
try {
// Try custom fromString method first
Method fromString = targetClass.getMethod("fromString", String.class);
Object result = fromString.invoke(null, value);
if (value != null && result == null) {
throw new IllegalArgumentException(String.format(
"Invalid enum value %s for type %s", value, targetClass.getSimpleName()));
}
return result;
} catch (NoSuchMethodException e) {
// Fall back to standard enum valueOf
return Enum.valueOf(targetClass.asSubclass(Enum.class), value.toUpperCase());
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"Error converting %s to enum type %s : %s",
value, targetClass.getSimpleName(), e.getMessage()));
}
}
}

// Handle parameterized types (List, Map)
if (targetType instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) targetType;
Class<?> rawType = (Class<?>) parameterizedType.getRawType();

// Handle List types
if (List.class.isAssignableFrom(rawType)) {
Type elementType = parameterizedType.getActualTypeArguments()[0];
return Arrays.stream(value.split(","))
.map(v -> convertValue(v.trim(), elementType))
.collect(Collectors.toList());
}

// Handle Map types
if (Map.class.isAssignableFrom(rawType)) {
Type keyType = parameterizedType.getActualTypeArguments()[0];
Type valueType = parameterizedType.getActualTypeArguments()[1];
return Arrays.stream(value.split(","))
.map(entry -> entry.split(":"))
.collect(Collectors.toMap(
kv -> convertValue(kv[0].trim(), keyType),
kv -> convertValue(kv[1].trim(), valueType)
));
}
}

throw new IllegalArgumentException("Unsupported type: " + targetType.getTypeName());
}
}

16 changes: 16 additions & 0 deletions router/src/test/java/ai/chronon/router/test/OrderStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ai.chronon.router.test;

public enum OrderStatus {
PENDING,
PROCESSING,
COMPLETED,
CANCELLED;

public static OrderStatus fromString(String value) {
try {
return valueOf(value.toUpperCase());
} catch (Exception e) {
return null;
}
}
}
Loading
Loading