Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Chronon service module to add support for serving features #873

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enablePlugins(GitVersioning, GitBranchPrompt)
lazy val supportedVersions = List(scala211, scala212, scala213)

lazy val root = (project in file("."))
.aggregate(api, aggregator, online, spark_uber, spark_embedded)
.aggregate(api, aggregator, online, spark_uber, spark_embedded, service)
.settings(
publish / skip := true,
crossScalaVersions := Nil,
Expand Down Expand Up @@ -391,6 +391,60 @@ lazy val flink = (project in file("flink"))
"flink")
)

lazy val service = (project in file("service"))
.dependsOn(online)
.settings(
assembly / assemblyJarName := s"${name.value}-${version.value}.jar",
assembly / artifact := {
val art = (assembly / artifact).value
art.withClassifier(Some("assembly"))
},
addArtifact(assembly / artifact, assembly),
publishSettings,
libraryDependencies ++= Seq(
"io.vertx" % "vertx-core" % "4.5.10",
"io.vertx" % "vertx-web" % "4.5.10",
"io.vertx" % "vertx-config" % "4.5.10",
"ch.qos.logback" % "logback-classic" % "1.2.11",
"org.slf4j" % "slf4j-api" % "1.7.36",
Comment on lines +408 to +409
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own learning/curiosity, any comment on why logback instead of log4j2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I might have hit some dependency issues on log4j2. Could give that a shot again if you'd prefer log4j2. It seems to be the newer framework (though there seem to have been a few sec issues found from time to time). The way we've configured logging currently though is to write async so perf wise we should see pretty low logging overhead.

"com.typesafe" % "config" % "1.4.3",
// force netty versions -> without this we conflict with the versions pulled in from
// our online module's spark deps which causes the web-app to not serve up content
"io.netty" % "netty-all" % "4.1.111.Final",
// wire up metrics using micro meter and statsd
"io.vertx" % "vertx-micrometer-metrics" % "4.5.10",
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6",
"junit" % "junit" % "4.13.2" % Test,
"com.novocode" % "junit-interface" % "0.11" % Test,
// use mockito 4.x as Chronon builds on Java8
"org.mockito" % "mockito-core" % "4.11.0" % Test,
"io.vertx" % "vertx-unit" % "4.5.10" % Test,
// add codegen dep to help with mockito errors
"io.vertx" % "vertx-codegen" % "4.5.10" % Test,
),
// Assembly settings
assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

// Main class configuration
// We use a custom launcher to help us wire up our statsd metrics
Compile / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),
assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),

// Merge strategy for assembly
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", xs @ _*) => MergeStrategy.first
case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first
case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
}
)

// Build Sphinx documentation
lazy val sphinx = taskKey[Unit]("Build Sphinx Documentation")
sphinx := {
Expand Down
5 changes: 4 additions & 1 deletion online/src/main/scala/ai/chronon/online/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,17 @@ object Metrics {
)
}

// Host can also be a Unix socket like: unix:///opt/datadog-agent/run/dogstatsd.sock
// In the unix socket case port is configured to be 0
val statsHost: String = System.getProperty("ai.chronon.metrics.host", "localhost")
val statsPort: Int = System.getProperty("ai.chronon.metrics.port", "8125").toInt
val tagCache: TTLCache[Context, String] = new TTLCache[Context, String](
{ ctx => ctx.toTags.reverse.mkString(",") },
{ ctx => ctx },
ttlMillis = 5 * 24 * 60 * 60 * 1000 // 5 days
)

val statsClient: NonBlockingStatsDClient = new NonBlockingStatsDClient("ai.zipline", "localhost", statsPort)
val statsClient: NonBlockingStatsDClient = new NonBlockingStatsDClient("ai.zipline", statsHost, statsPort)
}

case class Context(environment: Environment,
Expand Down
76 changes: 76 additions & 0 deletions service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Chronon Feature Service
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explicitly document the endpoints available here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this but I expect the docs to get outdated soon. A better approach might be something like wiring up OpenApi / Swagger annotations. Could tackle this in a follow up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's worth documenting the existence of this service in the chronon website?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah def worth it. I was thinking of letting it bake / clearing out some nits etc and putting up a doc update PR in a follow up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus one to this.


The feature service module consists of code to bring up a service that provides a thin shim around the Fetcher code. This
is meant to aid Chronon adopters who either need a quicker way to get a feature serving layer up and running or need to
build a way to retrieve features and typically work in a non-JVM based organization.

## Core Technology

The Chronon Feature Service is built on top of the [Vert.x](https://vertx.io/) JVM framework. Vert.x is a high-performance
web framework which supports HTTP and gRPC based services.

## Running locally

To build the service sub-module:
```bash
~/workspace/chronon $ sbt "project service" clean assembly
```

To test out the service, you also need to build a concrete instantiation of the [Api](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Api.scala#L187).
We can leverage the [quickstart Mongo API](https://github.com/airbnb/chronon/tree/main/quickstart/mongo-online-impl) for this:
```bash
~/workspace/chronon $ cd quickstart/mongo-online-impl
~/workspace/chronon/quickstart/mongo-online-impl $ sbt assembly
...
[success] Total time: 1 s, completed Nov 6, 2024, 2:35:26 PM
```
This command will write out a file in the target/scala-2.12 sub-directory.

We can now use this to start up the feature service:
```bash
~/workspace/chronon $ java -jar service/target/scala-2.12/service-vertx_service-*.jar run ai.chronon.service.WebServiceVerticle \
-Dserver.port=9000 -conf service/src/main/resources/example_config.json
...
14:39:26.626 [vert.x-eventloop-thread-1] INFO a.chronon.service.WebServiceVerticle - HTTP server started on port 9000
14:39:26.627 [vert.x-eventloop-thread-0] INFO i.v.c.i.l.c.VertxIsolatedDeployer - Succeeded in deploying verticle
```

A few things to call out so you can customize:
- Choose your port (this is where you'll hit your webservice with traffic)
- Update the example_config.json (specifically confirm the path to the mongo-online-impl assembly jar matches your setup)

If you'd like some real data to query from the feature service, make sure to run through the relevant steps of the
[Quickstart - Online Flows](https://chronon.ai/getting_started/Tutorial.html#online-flows) tutorial.

Some examples to curl the webservice:
```bash
$ curl 'http://localhost:9000/ping'
$ curl 'http://localhost:9000/config'
$ curl -X POST 'http://localhost:9000/v1/features/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}]'
```

## Metrics

The Vert.x feature service relies on the same statsd host / port coordinates as the rest of the Chronon project -
[Metrics](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Metrics.scala#L135). When configured correctly,
the service will emit metrics captured by [Vert.x](https://vertx.io/docs/vertx-micrometer-metrics/java/#_http_client), JVM metrics as well as metrics
captured by existing Chronon Fetcher code.

To view these metrics for your locally running feature service:
- Install the [statsd-logger](https://github.com/jimf/statsd-logger) npm module (`npm install -g statsd-logger`)
- Run the command - `statsd-logger`

Now you should see metrics of the format:
```bash
$ statsd-logger
Server listening on 0.0.0.0:8125
StatsD Metric: jvm.buffer.memory.used 12605920|g|#statistic:value,id:direct
StatsD Metric: jvm.threads.states 0|g|#statistic:value,state:blocked
StatsD Metric: jvm.memory.used 8234008|g|#statistic:value,area:nonheap,id:Compressed Class Space
StatsD Metric: jvm.threads.states 19|g|#statistic:value,state:runnable
StatsD Metric: system.load.average.1m 1.504883|g|#statistic:value
StatsD Metric: vertx.http.server.active.requests 0|g|#statistic:value,method:GET,path:/ping
StatsD Metric: ai.zipline.join.fetch.join_request.count 1|c|#null,null,null,null,environment:join.fetch,owner:quickstart,team:quickstart,production:false,join:quickstart_training_set_v2
StatsD Metric: ai.zipline.join.fetch.group_by_request.count 1|c|#null,null,accuracy:SNAPSHOT,environment:join.fetch,owner:quickstart,team:quickstart,production:false,group_by:quickstart_purchases_v1,join:quickstart_training_set_v2
...
```
61 changes: 61 additions & 0 deletions service/src/main/java/ai/chronon/service/ApiProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ai.chronon.service;

import ai.chronon.online.Api;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.util.ScalaVersionSpecificCollectionsConverter;

import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Map;
import java.util.Optional;

/**
* Responsible for loading the relevant concrete Chronon Api implementation and providing that
* for use in the Web service code. We follow similar semantics as the Driver to configure this:
* online.jar - Jar that contains the implementation of the Api
* online.class - Name of the Api class
* online.api.props - Structure that contains fields that are loaded and passed to the Api implementation
* during instantiation to configure it (e.g. connection params)
*/
Comment on lines +14 to +21
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really appreciate the comments before the classes!

public class ApiProvider {
private static final Logger logger = LoggerFactory.getLogger(ApiProvider.class);

public static Api buildApi(ConfigStore configStore) throws Exception {
Optional<String> maybeJarPath = configStore.getOnlineJar();
Optional<String> maybeClass = configStore.getOnlineClass();
if (!(maybeJarPath.isPresent() && maybeClass.isPresent())) {
throw new IllegalArgumentException("Both 'online.jar' and 'online.class' configs must be set.");
}

String jarPath = maybeJarPath.get();
String className = maybeClass.get();
File jarFile = new File(jarPath);
if (!jarFile.exists()) {
throw new IllegalArgumentException("JAR file does not exist: " + jarPath);
}

logger.info("Loading API implementation from JAR: {}, class: {}", jarPath, className);

// Create class loader for the API JAR
URL jarUrl = jarFile.toURI().toURL();
URLClassLoader apiClassLoader = new URLClassLoader(
new URL[]{jarUrl},
ApiProvider.class.getClassLoader()
);

// Load and instantiate the API implementation
Class<?> apiClass = Class.forName(className, true, apiClassLoader);
if (!Api.class.isAssignableFrom(apiClass)) {
throw new IllegalArgumentException(
"Class " + className + " does not extend the Api abstract class"
);
}

Map<String, String> propsMap = configStore.getOnlineApiProps();
scala.collection.immutable.Map<String, String> scalaPropsMap = ScalaVersionSpecificCollectionsConverter.convertJavaMapToScala(propsMap);

return (Api) apiClass.getConstructors()[0].newInstance(scalaPropsMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ai.chronon.service;

import ai.chronon.online.Metrics;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.statsd.StatsdConfig;
import io.micrometer.statsd.StatsdMeterRegistry;
import io.vertx.core.Launcher;
import io.vertx.core.VertxOptions;
import io.vertx.micrometer.Label;
import io.vertx.micrometer.MicrometerMetricsFactory;
import io.vertx.micrometer.MicrometerMetricsOptions;

import java.util.HashMap;
import java.util.Map;

/**
* Custom launcher to help configure the Chronon vertx feature service
* to handle things like setting up a statsd metrics registry.
* We use statsd here to be consistent with the rest of our project (e.g. fetcher code).
* This allows us to send Vertx webservice metrics along with fetcher related metrics to allow users
* to debug performance issues and set alerts etc.
*/
public class ChrononServiceLauncher extends Launcher {

@Override
public void beforeStartingVertx(VertxOptions options) {

StatsdConfig config = new StatsdConfig() {
private final String statsdHost = Metrics.Context$.MODULE$.statsHost();
private final String statsdPort = String.valueOf(Metrics.Context$.MODULE$.statsPort());

final Map<String, String> statsProps = new HashMap<String, String>() {{
put(prefix() + "." + "port", statsdPort);
put(prefix() + "." + "host", statsdHost);
put(prefix() + "." + "protocol", Integer.parseInt(statsdPort) == 0 ? "UDS_DATAGRAM" : "UDP");
}};

@Override
public String get(String key) {
return statsProps.get(key);
}
};

MeterRegistry registry = new StatsdMeterRegistry(config, Clock.SYSTEM);
MicrometerMetricsFactory metricsFactory = new MicrometerMetricsFactory(registry);

// Configure metrics via statsd
MicrometerMetricsOptions metricsOptions = new MicrometerMetricsOptions()
.setEnabled(true)
.setJvmMetricsEnabled(true)
.setFactory(metricsFactory)
.addLabels(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH);

options.setMetricsOptions(metricsOptions);
}

public static void main(String[] args) {
new ChrononServiceLauncher().dispatch(args);
}
}
66 changes: 66 additions & 0 deletions service/src/main/java/ai/chronon/service/ConfigStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ai.chronon.service;

import io.vertx.config.ConfigRetriever;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Helps keep track of the various Chronon fetcher service configs.
* We currently read configs once at startup - this makes sense for configs
* such as the server port and we can revisit / extend things in the future to
* be able to hot-refresh configs like Vertx supports under the hood.
*/
public class ConfigStore {

private static final int DEFAULT_PORT = 8080;

private static final String SERVER_PORT = "server.port";
private static final String ONLINE_JAR = "online.jar";
private static final String ONLINE_CLASS = "online.class";
private static final String ONLINE_API_PROPS = "online.api.props";

private JsonObject jsonConfig;

public ConfigStore(Vertx vertx) {
ConfigRetriever configRetriever = ConfigRetriever.create(vertx);
configRetriever.getConfig().onComplete(ar -> {
if (ar.failed()) {
throw new IllegalStateException("Unable to load service config", ar.cause());
}
jsonConfig = ar.result();
});
}

public int getServerPort() {
return jsonConfig.getInteger(SERVER_PORT, DEFAULT_PORT);
}

public Optional<String> getOnlineJar() {
return Optional.ofNullable(jsonConfig.getString(ONLINE_JAR));
}

public Optional<String> getOnlineClass() {
return Optional.ofNullable(jsonConfig.getString(ONLINE_CLASS));
}

public Map<String, String> getOnlineApiProps() {
JsonObject apiProps = jsonConfig.getJsonObject(ONLINE_API_PROPS);
if (apiProps == null) {
return new HashMap<String, String>();
}

return apiProps.stream().collect(Collectors.toMap(
Map.Entry::getKey,
e -> String.valueOf(e.getValue())
));
}

public String encodeConfig() {
return jsonConfig.encodePrettily();
}
}
Loading