Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions api/py/ai/chronon/repo/interactive.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,37 @@ Once we bulk it out - we can use this to power
2. in your `~/.bashrc` or `~/.zshrc` add `SPARK_HOME` & `CHRONON_ROOT` env vars
```
export SPARK_HOME=/Users/nikhilsimha/spark-3.5.4-bin-hadoop3 # choose dir where you unpacked spark
export CHRONON_ROOT=/Users/nikhilsimha/repos/chronon/api/py/test/sample # choose your conf dir
export CHRONON_ROOT=/Users/nikhilsimha/repos/etsy/zipline # choose your conf dir
```

### Running gateway
3. Copy the chronon jar from your blob store and set it to `CHRONON_SPARK_JAR`.
```
# for EXAMPLE, in your ~/.zshrc
export CHRONON_SPARK_JAR=/Users/nikhilsimha/Downloads/chronon_spark_assembly.jar
```

1. Load parquet files into `$CHRONON_ROOT/local_warehouse/<namespace>/<table>`

### Running gateway

1. Load parquet files into `$CHRONON_ROOT/local_warehouse/<namespace>/<table>`

2. Start gateway service - from chronon root
```
> sbt spark/assembly && ./scripts/interactive/gateway.sh
> ./scripts/interactive/gateway.sh
```

### Using interactively

1. You can interact with your conf objects in vscode in a notebook like so..

```py

# import source, I do this by running cell from the py file actually with "#%%" in vscode.
from group_bys.etsy_search.visit_beacon import source
# import runner
from ai.chronon.repo.interactive import eval

df = eval(source)
df.show()
eval(source)
```

```
Expand Down
14 changes: 10 additions & 4 deletions api/py/ai/chronon/repo/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ai.chronon.api.ttypes as thrift


def eval(obj):
def eval(obj, limit=5):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for limit parameter

 def eval(obj, limit=5):
+    if not isinstance(limit, int) or limit <= 0:
+        raise ValueError("limit must be a positive integer")

Also applies to: 25-25

"""
utility function to run conf's in an interactive environment.

Expand All @@ -22,7 +22,7 @@ def eval(obj):
evaluator = gateway.entry_point

if isinstance(obj, str):
evaluator.evalQuery(obj) # TODO
return _to_df(evaluator.evalQuery(obj, limit))

func = None

Expand All @@ -41,9 +41,12 @@ def eval(obj):
elif isinstance(obj, thrift.Model):
func = evaluator.evalModel # TODO

else:
raise Exception(f"Unsupported object type for: {obj}")

thrift_str = ser.thrift_simple_json(obj)

eval_result = func(thrift_str, 5)
eval_result = func(thrift_str, limit)

return _to_df(eval_result)

Expand All @@ -57,4 +60,7 @@ def _to_df(eval_result):
raise Exception(error)

df = eval_result.df()
return DataFrame(df, df.sqlContext())

py_df = DataFrame(df, df.sparkSession())

return py_df
14 changes: 14 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/interactive/Evaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ class Evaluator(localWarehouse: LocalWarehouse) {
}
}

def evalQuery(query: String, limit: Int = 5): EvaluationResult = {

try {

EvaluationResult(null, localWarehouse.runSql(query).limit(limit))

} catch {

case analysisException: AnalysisException =>
EvaluationResult(analysisException.getMessage, null)

}
}

private def runSourceBundle(sourceSqlBundle: SourceSqlBundle): DataFrame = {

val missingTables = sourceSqlBundle.tables -- localWarehouse.existingTables
Expand Down
Loading