From 4d51de59d1e18a2477e58b43b60e38fd931a2663 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Tue, 26 Nov 2024 23:03:30 -0800 Subject: [PATCH 1/2] netty server + streamlit app for debugging visually --- docker-init/demo/Dockerfile | 2 + docker-init/demo/build.sh | 0 docker-init/demo/run.sh | 1 + docker-init/demo/viz.py | 94 ++++++++++++ .../ai/chronon/spark/scripts/DataServer.scala | 141 ++++++++++++++++++ .../spark/scripts/ObservabilityDemo.scala | 75 +++++----- 6 files changed, 277 insertions(+), 36 deletions(-) mode change 100644 => 100755 docker-init/demo/build.sh create mode 100644 docker-init/demo/viz.py create mode 100644 spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala diff --git a/docker-init/demo/Dockerfile b/docker-init/demo/Dockerfile index 72bd835415..5b305f7628 100644 --- a/docker-init/demo/Dockerfile +++ b/docker-init/demo/Dockerfile @@ -32,4 +32,6 @@ ENV SPARK_DIST_CLASSPATH="/opt/spark/jars/*" ENV SPARK_EXTRA_CLASSPATH="/opt/spark/jars/*:/opt/chronon/jars/*" ENV HADOOP_CLASSPATH="/opt/spark/jars/*" +EXPOSE 8181 + CMD ["tail", "-f", "/dev/null"] \ No newline at end of file diff --git a/docker-init/demo/build.sh b/docker-init/demo/build.sh old mode 100644 new mode 100755 diff --git a/docker-init/demo/run.sh b/docker-init/demo/run.sh index 712a6a6cf5..8b3dda5775 100755 --- a/docker-init/demo/run.sh +++ b/docker-init/demo/run.sh @@ -16,6 +16,7 @@ fi # Run new container docker run -d \ + -p 8181:8181 \ --name spark-app \ -v "$SPARK_JAR_PATH":/opt/chronon/jars \ -v "$LOG_CONFIG_FILE":/opt/chronon/log4j2.properties \ diff --git a/docker-init/demo/viz.py b/docker-init/demo/viz.py new file mode 100644 index 0000000000..fc154e847b --- /dev/null +++ b/docker-init/demo/viz.py @@ -0,0 +1,94 @@ +import streamlit as st +import requests +from datetime import datetime +import pandas as pd +from collections import defaultdict + +# Configure the page to use wide mode +st.set_page_config(layout="wide") + +# Add custom CSS to make charts wider +st.markdown(""" + +""", unsafe_allow_html=True) + +def format_timestamp(ts_ms): + """Format millisecond timestamp to readable date.""" + return datetime.fromtimestamp(ts_ms/1000).strftime('%Y-%m-%d %H:%M') + +def load_data(): + """Load data from the API.""" + try: + response = requests.get("http://localhost:8181/api/drift-series") + return response.json() + except Exception as e: + st.error(f"Error loading data: {e}") + return None + +def create_series_df(timestamps, values): + """Create a DataFrame for a series.""" + return pd.DataFrame({ + 'timestamp': [format_timestamp(ts) for ts in timestamps], + 'value': values + }) + +def is_valid_series(series): + return any(value is not None for value in series) + +def main(): + st.title("Drift Board") + + # Load data + data = load_data() + if not data: + return + + # Group data by groupName + grouped_data = defaultdict(list) + for entry in data: + group_name = entry["key"].get("groupName", "unknown") + grouped_data[group_name].append(entry) + + # Series types and their display names + series_types = { + "percentileDriftSeries": "Percentile Drift", + "histogramDriftSeries": "Histogram Drift", + "countChangePercentSeries": "Count Change %" + } + + # Create tabs for each group + group_tabs = st.tabs(list(grouped_data.keys())) + + # Fill each tab with its group's data + for tab, (group_name, group_entries) in zip(group_tabs, grouped_data.items()): + with tab: + for entry in group_entries: + # Create expander for each column + column_name = entry["key"].get("column", "unknown") + with st.expander(f"Column: {column_name}", expanded=True): + # Get available series for this entry + available_series = [s_type for s_type in series_types.keys() + if s_type in entry and is_valid_series(entry[s_type])] + + if available_series: + # Create columns for charts with extra padding + cols = st.columns([1] * len(available_series)) + + # Create charts side by side + for col_idx, series_type in enumerate(available_series): + if is_valid_series(entry[series_type]): + with cols[col_idx]: + st.subheader(series_types[series_type]) + df = create_series_df(entry["timestamps"], entry[series_type]) + st.line_chart(df.set_index('timestamp'), height=400) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala b/spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala new file mode 100644 index 0000000000..fba36f4d0c --- /dev/null +++ b/spark/src/main/scala/ai/chronon/spark/scripts/DataServer.scala @@ -0,0 +1,141 @@ +package ai.chronon.spark.scripts + +import ai.chronon.api.TileDriftSeries +import ai.chronon.api.TileSeriesKey +import ai.chronon.api.TileSummarySeries +import ai.chronon.api.thrift.TBase +import ai.chronon.online.stats.DriftStore +import ai.chronon.online.stats.DriftStore.SerializableSerializer +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.Unpooled +import io.netty.channel._ +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.handler.codec.http._ +import io.netty.util.CharsetUtil + +import java.util.Base64 +import java.util.function.Supplier +import scala.reflect.ClassTag + +class DataServer(driftSeries: Seq[TileDriftSeries], summarySeries: Seq[TileSummarySeries], port: Int = 8181) { + private val logger = org.slf4j.LoggerFactory.getLogger(getClass) + private val bossGroup = new NioEventLoopGroup(1) + private val workerGroup = new NioEventLoopGroup() + private val mapper = new ObjectMapper() + .registerModule(DefaultScalaModule) + .enable(SerializationFeature.INDENT_OUTPUT) + + private class HttpServerHandler extends SimpleChannelInboundHandler[HttpObject] { + override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { + ctx.flush() + } + + private val serializer: ThreadLocal[SerializableSerializer] = + ThreadLocal.withInitial(new Supplier[SerializableSerializer] { + override def get(): SerializableSerializer = DriftStore.compactSerializer + }) + + private def convertToBytesMap[T <: TBase[_, _]: Manifest: ClassTag]( + series: T, + keyF: T => TileSeriesKey): Map[String, String] = { + val serializerInstance = serializer.get() + val encoder = Base64.getEncoder + val keyBytes = serializerInstance.serialize(keyF(series)) + val valueBytes = serializerInstance.serialize(series) + Map( + "keyBytes" -> encoder.encodeToString(keyBytes), + "valueBytes" -> encoder.encodeToString(valueBytes) + ) + } + + override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject): Unit = { + msg match { + case request: HttpRequest => + val uri = request.uri() + + val start = System.currentTimeMillis() + val (status, content) = uri match { + case "/health" => + (HttpResponseStatus.OK, """{"status": "healthy"}""") + + case "/api/drift-series" => + //val dtos = driftSeries.map(d => convertToBytesMap(d, (tds: TileDriftSeries) => tds.getKey)) + (HttpResponseStatus.OK, mapper.writeValueAsString(driftSeries)) + + case "/api/summary-series" => + val dtos = summarySeries.map(d => convertToBytesMap(d, (tds: TileSummarySeries) => tds.getKey)) + (HttpResponseStatus.OK, mapper.writeValueAsString(dtos)) + + case "/api/metrics" => + val metrics = Map( + "driftSeriesCount" -> driftSeries.size, + "summarySeriesCount" -> summarySeries.size + ) + (HttpResponseStatus.OK, mapper.writeValueAsString(metrics)) + + case _ => + (HttpResponseStatus.NOT_FOUND, """{"error": "Not Found"}""") + } + val end = System.currentTimeMillis() + logger.info(s"Request $uri took ${end - start}ms, status: $status, content-size: ${content.length}") + + val response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + status, + Unpooled.copiedBuffer(content, CharsetUtil.UTF_8) + ) + + response + .headers() + .set(HttpHeaderNames.CONTENT_TYPE, "application/json") + .set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()) + + if (HttpUtil.isKeepAlive(request)) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + } + + ctx.write(response) + case _ => + } + } + + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + cause.printStackTrace() + ctx.close() + } + } + + def start(): Unit = { + try { + val b = new ServerBootstrap() + b.group(bossGroup, workerGroup) + .channel(classOf[NioServerSocketChannel]) + .childHandler(new ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel): Unit = { + val p = ch.pipeline() + p.addLast(new HttpServerCodec()) + p.addLast(new HttpObjectAggregator(65536)) + p.addLast(new HttpServerHandler()) + } + }) + .option[Integer](ChannelOption.SO_BACKLOG, 128) + .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) + + val f = b.bind(port).sync() + println(s"Server started at http://localhost:$port/metrics") + f.channel().closeFuture().sync() + } finally { + shutdown() + } + } + + private def shutdown(): Unit = { + workerGroup.shutdownGracefully() + bossGroup.shutdownGracefully() + } +} diff --git a/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala b/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala index 065ebf6d0c..064b69700f 100644 --- a/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala +++ b/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala @@ -32,42 +32,6 @@ import scala.util.ScalaJavaConversions.IteratorOps object ObservabilityDemo { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) - def time(message: String)(block: => Unit): Unit = { - logger.info(s"$message..".yellow) - val start = System.currentTimeMillis() - block - val end = System.currentTimeMillis() - logger.info(s"$message took ${end - start} ms".green) - } - - class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { - val startDs: ScallopOption[String] = opt[String]( - name = "start-ds", - default = Some("2023-01-01"), - descr = "Start date in YYYY-MM-DD format" - ) - - val endDs: ScallopOption[String] = opt[String]( - name = "end-ds", - default = Some("2023-02-30"), - descr = "End date in YYYY-MM-DD format" - ) - - val rowCount: ScallopOption[Int] = opt[Int]( - name = "row-count", - default = Some(700000), - descr = "Number of rows to generate" - ) - - val namespace: ScallopOption[String] = opt[String]( - name = "namespace", - default = Some("observability_demo"), - descr = "Namespace for the demo" - ) - - verify() - } - def main(args: Array[String]): Unit = { val config = new Conf(args) @@ -183,6 +147,9 @@ object ObservabilityDemo { } } + val server = new DataServer(driftSeries, summarySeries) + server.start() + val startTs = 1673308800000L val endTs = 1674172800000L val joinName = "risk.user_transactions.txn_join" @@ -211,4 +178,40 @@ object ObservabilityDemo { spark.stop() System.exit(0) } + + def time(message: String)(block: => Unit): Unit = { + logger.info(s"$message..".yellow) + val start = System.currentTimeMillis() + block + val end = System.currentTimeMillis() + logger.info(s"$message took ${end - start} ms".green) + } + + class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { + val startDs: ScallopOption[String] = opt[String]( + name = "start-ds", + default = Some("2023-01-08"), + descr = "Start date in YYYY-MM-DD format" + ) + + val endDs: ScallopOption[String] = opt[String]( + name = "end-ds", + default = Some("2023-02-30"), + descr = "End date in YYYY-MM-DD format" + ) + + val rowCount: ScallopOption[Int] = opt[Int]( + name = "row-count", + default = Some(700000), + descr = "Number of rows to generate" + ) + + val namespace: ScallopOption[String] = opt[String]( + name = "namespace", + default = Some("observability_demo"), + descr = "Namespace for the demo" + ) + + verify() + } } From 4fdd244057b4e3718d84c58ad0f0cf7d9424e10f Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Tue, 26 Nov 2024 23:10:42 -0800 Subject: [PATCH 2/2] readme --- docker-init/demo/README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docker-init/demo/README.md b/docker-init/demo/README.md index 1d4cc951ad..c1abae2d9b 100644 --- a/docker-init/demo/README.md +++ b/docker-init/demo/README.md @@ -1,2 +1,5 @@ -run build.sh once, and you can repeatedly exec -sbt spark/assembly + run.sh on iterations to the chronon code. \ No newline at end of file +run build.sh once, and you can repeatedly exec to quickly visualize + +In first terminal: `sbt spark/assembly` +In second terminal: `./run.sh` to load the built jar and serve the data on localhost:8181 +In third terminal: `streamlit run viz.py`