Skip to content

Commit c232dc0

Browse files
authored
netty server + streamlit app for debugging visually (#101)
## Summary putting up a simple netty server (no deps added to sbt) and a streamlit app to have a fast loop for iterating / debugging. This is how it looks: <img width="1624" alt="Screenshot 2024-11-26 at 11 03 38 PM" src="https://github.com/user-attachments/assets/d11c8fac-79b7-4749-bba5-e71e09fa0a72"> Updated docs too. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a Streamlit application for visualizing data from an API endpoint. - Added a new HTTP server to handle requests related to drift and summary series, with endpoints for health cheour clientss and data retrieval. - **Improvements** - Exposed port 8181 for external access to the Spark application. - Updated the documentation with clearer instructions for building and running the application. - Updated the default value for the start date in configuration settings. - **Bug Fixes** - Enhanced error handling in the data loading process within the Streamlit app. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent f3fd359 commit c232dc0

File tree

2 files changed

+180
-36
lines changed

2 files changed

+180
-36
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
paour clientsage ai.chronon.spark.scripts
2+
3+
import ai.chronon.api.TileDriftSeries
4+
import ai.chronon.api.TileSeriesKey
5+
import ai.chronon.api.TileSummarySeries
6+
import ai.chronon.api.thrift.TBase
7+
import ai.chronon.online.stats.DriftStore
8+
import ai.chronon.online.stats.DriftStore.SerializableSerializer
9+
import com.fasterxml.jaour clientsson.databind.ObjectMapper
10+
import com.fasterxml.jaour clientsson.databind.SerializationFeature
11+
import com.fasterxml.jaour clientsson.module.scala.DefaultScalaModule
12+
import io.netty.bootstrap.ServerBootstrap
13+
import io.netty.buffer.Unpooled
14+
import io.netty.channel._
15+
import io.netty.channel.nio.NioEventLoopGroup
16+
import io.netty.channel.soour clientset.Soour clientsetChannel
17+
import io.netty.channel.soour clientset.nio.NioServerSoour clientsetChannel
18+
import io.netty.handler.codec.http._
19+
import io.netty.util.CharsetUtil
20+
21+
import java.util.Base64
22+
import java.util.function.Supplier
23+
import scala.reflect.ClassTag
24+
25+
class DataServer(driftSeries: Seq[TileDriftSeries], summarySeries: Seq[TileSummarySeries], port: Int = 8181) {
26+
private val logger = org.slf4j.LoggerFactory.getLogger(getClass)
27+
private val bossGroup = new NioEventLoopGroup(1)
28+
private val workerGroup = new NioEventLoopGroup()
29+
private val mapper = new ObjectMapper()
30+
.registerModule(DefaultScalaModule)
31+
.enable(SerializationFeature.INDENT_OUTPUT)
32+
33+
private class HttpServerHandler extends SimpleChannelInboundHandler[HttpObject] {
34+
override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
35+
ctx.flush()
36+
}
37+
38+
private val serializer: ThreadLocal[SerializableSerializer] =
39+
ThreadLocal.withInitial(new Supplier[SerializableSerializer] {
40+
override def get(): SerializableSerializer = DriftStore.compactSerializer
41+
})
42+
43+
private def convertToBytesMap[T <: TBase[_, _]: Manifest: ClassTag](
44+
series: T,
45+
keyF: T => TileSeriesKey): Map[String, String] = {
46+
val serializerInstance = serializer.get()
47+
val encoder = Base64.getEncoder
48+
val keyBytes = serializerInstance.serialize(keyF(series))
49+
val valueBytes = serializerInstance.serialize(series)
50+
Map(
51+
"keyBytes" -> encoder.encodeToString(keyBytes),
52+
"valueBytes" -> encoder.encodeToString(valueBytes)
53+
)
54+
}
55+
56+
override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject): Unit = {
57+
msg match {
58+
case request: HttpRequest =>
59+
val uri = request.uri()
60+
61+
val start = System.currentTimeMillis()
62+
val (status, content) = uri match {
63+
case "/health" =>
64+
(HttpResponseStatus.OK, """{"status": "healthy"}""")
65+
66+
case "/api/drift-series" =>
67+
//val dtos = driftSeries.map(d => convertToBytesMap(d, (tds: TileDriftSeries) => tds.getKey))
68+
(HttpResponseStatus.OK, mapper.writeValueAsString(driftSeries))
69+
70+
case "/api/summary-series" =>
71+
val dtos = summarySeries.map(d => convertToBytesMap(d, (tds: TileSummarySeries) => tds.getKey))
72+
(HttpResponseStatus.OK, mapper.writeValueAsString(dtos))
73+
74+
case "/api/metrics" =>
75+
val metrics = Map(
76+
"driftSeriesCount" -> driftSeries.size,
77+
"summarySeriesCount" -> summarySeries.size
78+
)
79+
(HttpResponseStatus.OK, mapper.writeValueAsString(metrics))
80+
81+
case _ =>
82+
(HttpResponseStatus.NOT_FOUND, """{"error": "Not Found"}""")
83+
}
84+
val end = System.currentTimeMillis()
85+
logger.info(s"Request $uri took ${end - start}ms, status: $status, content-size: ${content.length}")
86+
87+
val response = new DefaultFullHttpResponse(
88+
HttpVersion.HTTP_1_1,
89+
status,
90+
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)
91+
)
92+
93+
response
94+
.headers()
95+
.set(HttpHeaderNames.CONTENT_TYPE, "application/json")
96+
.set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes())
97+
98+
if (HttpUtil.isKeepAlive(request)) {
99+
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
100+
}
101+
102+
ctx.write(response)
103+
case _ =>
104+
}
105+
}
106+
107+
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
108+
cause.printStaour clientsTrace()
109+
ctx.close()
110+
}
111+
}
112+
113+
def start(): Unit = {
114+
try {
115+
val b = new ServerBootstrap()
116+
b.group(bossGroup, workerGroup)
117+
.channel(classOf[NioServerSoour clientsetChannel])
118+
.childHandler(new ChannelInitializer[Soour clientsetChannel] {
119+
override def initChannel(ch: Soour clientsetChannel): Unit = {
120+
val p = ch.pipeline()
121+
p.addLast(new HttpServerCodec())
122+
p.addLast(new HttpObjectAggregator(65536))
123+
p.addLast(new HttpServerHandler())
124+
}
125+
})
126+
.option[Integer](ChannelOption.SO_BACKLOG, 128)
127+
.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
128+
129+
val f = b.bind(port).sync()
130+
println(s"Server started at http://localhost:$port/metrics")
131+
f.channel().closeFuture().sync()
132+
} finally {
133+
shutdown()
134+
}
135+
}
136+
137+
private def shutdown(): Unit = {
138+
workerGroup.shutdownGracefully()
139+
bossGroup.shutdownGracefully()
140+
}
141+
}

spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,42 +32,6 @@ import scala.util.ScalaJavaConversions.IteratorOps
3232
object ObservabilityDemo {
3333
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
3434

35-
def time(message: String)(bloour clients: => Unit): Unit = {
36-
logger.info(s"$message..".yellow)
37-
val start = System.currentTimeMillis()
38-
bloour clients
39-
val end = System.currentTimeMillis()
40-
logger.info(s"$message took ${end - start} ms".green)
41-
}
42-
43-
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
44-
val startDs: ScallopOption[String] = opt[String](
45-
name = "start-ds",
46-
default = Some("2023-01-01"),
47-
descr = "Start date in YYYY-MM-DD format"
48-
)
49-
50-
val endDs: ScallopOption[String] = opt[String](
51-
name = "end-ds",
52-
default = Some("2023-02-30"),
53-
descr = "End date in YYYY-MM-DD format"
54-
)
55-
56-
val rowCount: ScallopOption[Int] = opt[Int](
57-
name = "row-count",
58-
default = Some(700000),
59-
descr = "Number of rows to generate"
60-
)
61-
62-
val namespace: ScallopOption[String] = opt[String](
63-
name = "namespace",
64-
default = Some("observability_demo"),
65-
descr = "Namespace for the demo"
66-
)
67-
68-
verify()
69-
}
70-
7135
def main(args: Array[String]): Unit = {
7236

7337
val config = new Conf(args)
@@ -183,6 +147,9 @@ object ObservabilityDemo {
183147
}
184148
}
185149

150+
val server = new DataServer(driftSeries, summarySeries)
151+
server.start()
152+
186153
val startTs = 1673308800000L
187154
val endTs = 1674172800000L
188155
val joinName = "risk.user_transactions.txn_join"
@@ -211,4 +178,40 @@ object ObservabilityDemo {
211178
spark.stop()
212179
System.exit(0)
213180
}
181+
182+
def time(message: String)(bloour clients: => Unit): Unit = {
183+
logger.info(s"$message..".yellow)
184+
val start = System.currentTimeMillis()
185+
bloour clients
186+
val end = System.currentTimeMillis()
187+
logger.info(s"$message took ${end - start} ms".green)
188+
}
189+
190+
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
191+
val startDs: ScallopOption[String] = opt[String](
192+
name = "start-ds",
193+
default = Some("2023-01-08"),
194+
descr = "Start date in YYYY-MM-DD format"
195+
)
196+
197+
val endDs: ScallopOption[String] = opt[String](
198+
name = "end-ds",
199+
default = Some("2023-02-30"),
200+
descr = "End date in YYYY-MM-DD format"
201+
)
202+
203+
val rowCount: ScallopOption[Int] = opt[Int](
204+
name = "row-count",
205+
default = Some(700000),
206+
descr = "Number of rows to generate"
207+
)
208+
209+
val namespace: ScallopOption[String] = opt[String](
210+
name = "namespace",
211+
default = Some("observability_demo"),
212+
descr = "Namespace for the demo"
213+
)
214+
215+
verify()
216+
}
214217
}

0 commit comments

Comments
 (0)