Skip to content

Commit

Permalink
Add perspective tools
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Jul 30, 2024
1 parent 5de6169 commit 464e49a
Show file tree
Hide file tree
Showing 18 changed files with 1,403 additions and 331 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

<!-- insertion marker -->
## [v0.9.0](https://github.com/tradewelltech/beavers/releases/tag/v0.9.0) - 2024-07-30

<small>[Compare with v0.8.0](https://github.com/tradewelltech/beavers/compare/v0.8.0...v0.9.0)</small>

### Added

- Add perspective tools ([07878be](https://github.com/tradewelltech/beavers/commit/07878bec527d6e2523345ca437e6a64b77c47182) by aandres).

## [v0.8.0](https://github.com/tradewelltech/beavers/releases/tag/v0.8.0) - 2024-07-01

<small>[Compare with v0.7.0](https://github.com/tradewelltech/beavers/compare/v0.7.0...v0.8.0)</small>
Expand Down
Binary file added beavers/assets/favicon.ico
Binary file not shown.
13 changes: 12 additions & 1 deletion beavers/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
from beavers.pandas_wrapper import PandasWrapper
except ImportError:
PandasWrapper = None

try:
from beavers.perspective_wrapper import PerspectiveDagWrapper
except ImportError:
PerspectiveDagWrapper = None

import pandas as pd

Expand Down Expand Up @@ -671,6 +674,14 @@ def pd(self) -> "PandasWrapper":

return PandasWrapper(self)

@cached_property
def psp(self) -> "PerspectiveDagWrapper":
"""Returns the PerspectiveDagWrapper."""
# Import dynamically because of circular dependency
from beavers.perspective_wrapper import PerspectiveDagWrapper

return PerspectiveDagWrapper(self)

def _add_stream(
self,
function: Callable[[...], T],
Expand Down
245 changes: 245 additions & 0 deletions beavers/perspective_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import dataclasses
import pathlib
import threading
from typing import Any, Literal, Optional, Sequence

import perspective
import pyarrow as pa
import tornado
from perspective import PerspectiveManager, PerspectiveTornadoHandler

from beavers import Dag, Node
from beavers.kafka import KafkaDriver

COMPARATORS = (
"==",
"!=",
">",
">=",
"<",
"<=",
"begins with",
"contains",
"ends with",
"in",
"not in",
"is not null",
"is null",
)

_SOURCE_DIRECTORY = pathlib.Path(__file__).parent
TABLE_PATH = str(_SOURCE_DIRECTORY / "table.html")
ASSETS_DIRECTORY = str(_SOURCE_DIRECTORY / "assets")


@dataclasses.dataclass(frozen=True)
class PerspectiveTableDefinition:
"""
API table definition
"""

name: str
index_column: str
remove_column: Optional[str] = None
sort: list[tuple[str, Literal["asc", "desc"]]] = dataclasses.field(
default_factory=list
)
filters: list[tuple[str, str, Any]] = dataclasses.field(default_factory=list)
hidden_columns: Sequence[str] = tuple()
limit: Optional[int] = None

def validate(self, schema: pa.Schema):
assert self.index_column in schema.names, self.index_column
if self.remove_column is not None:
assert isinstance(self.remove_column, str)
assert self.remove_column in schema.names, self.remove_column

assert isinstance(self.sort, list)
for column, order in self.sort:
assert isinstance(column, str)
assert column in schema.names
assert order in ("asc", "desc")
for column in self.hidden_columns:
assert isinstance(column, str)
assert column in schema.names
for each_filter in self.filters:
assert len(each_filter) in (2, 3)
assert isinstance(each_filter[0], str), each_filter
assert each_filter[1] in COMPARATORS


@dataclasses.dataclass(frozen=True)
class _TableConfig:
"""
Internal perspective table config, which is passed to the html template
"""

name: str
index: str
columns: list[str]
sort: Sequence[tuple[str, Literal["asc", "desc"]]]
filters: Sequence[tuple[str, str, Any]]

@staticmethod
def from_definition(definition: PerspectiveTableDefinition, schema: pa.Schema):
return _TableConfig(
name=definition.name,
index=definition.index_column,
columns=[f for f in schema.names if f not in definition.hidden_columns],
sort=[] if definition.sort is None else definition.sort,
filters=definition.filters,
)


class TableRequestHandler(tornado.web.RequestHandler):
"""Renders the table.html template, using the provided configurations"""

_tables: Optional[dict[str, _TableConfig]] = None
_default_table: Optional[str] = None

def initialize(self, table_configs: list[_TableConfig]) -> None:
self._tables = {
table_config.name: table_config for table_config in table_configs
}
self._default_table = table_configs[0].name

async def get(self, path: str) -> None:
table_name = path or self._default_table
table_config = self._tables[table_name]

await self.render(
TABLE_PATH,
table_config=table_config,
perspective_version=perspective.__version__,
)


def _table_to_bytes(table: pa.Table) -> bytes:
"""Serialize a table as bytes, to pass it to a perspective table"""
with pa.BufferOutputStream() as sink:
with pa.ipc.new_stream(sink, table.schema) as writer:
for batch in table.to_batches():
writer.write_batch(batch)
return sink.getvalue().to_pybytes()


@dataclasses.dataclass(frozen=True)
class _UpdateRunner:
kafka_driver: KafkaDriver

def __call__(self):
self.kafka_driver.run_cycle(0.0)


@dataclasses.dataclass(frozen=True)
class _PerspectiveNode:
table_definition: PerspectiveTableDefinition
schema: pa.Schema
table: perspective.Table = None

def __call__(self, table: pa.Table) -> None:
"""Pass the arrow data to perspective"""
self.table.update(_table_to_bytes(table))

def get_table_config(self) -> _TableConfig:
return _TableConfig.from_definition(self.table_definition, self.schema)


@dataclasses.dataclass(frozen=True)
class PerspectiveDagWrapper:
"""Helper for adding perspective Nodes to a Dag."""

_dag: Dag

def to_perspective(
self,
node: Node,
table_definition: PerspectiveTableDefinition,
schema: Optional[pa.Schema] = None,
) -> None:
"""Add a source stream of type `pa.Table`."""
if schema is None:
assert node._is_stream(), "Must provide a schema for state nodes"
empty = node._empty_factory()
assert isinstance(empty, pa.Table), "Only pyarrow.Table nodes supported"
schema = empty.schema
table_definition.validate(schema)
self._dag.state(
_PerspectiveNode(
table_definition,
schema,
table=perspective.Table(
_table_to_bytes(schema.empty_table()),
limit=table_definition.limit,
index=table_definition.index_column,
),
)
).map(node)


def perspective_thread(
manager: perspective.PerspectiveManager,
kafka_driver: KafkaDriver,
nodes: list[_PerspectiveNode],
):
psp_loop = tornado.ioloop.IOLoop()

manager.set_loop_callback(psp_loop.add_callback)
for node in nodes:
manager.host_table(node.table_definition.name, node.table)

callback = tornado.ioloop.PeriodicCallback(
callback=_UpdateRunner(kafka_driver), callback_time=1_000
)
callback.start()
psp_loop.start()


def create_web_application(
kafka_driver: KafkaDriver,
assets_directory: str = ASSETS_DIRECTORY,
) -> tornado.web.Application:
manager = PerspectiveManager()

nodes: list[_PerspectiveNode] = []
for node in kafka_driver._dag._nodes:
if isinstance(node._function, _PerspectiveNode):
nodes.append(node._function)
assert len(nodes) > 0, "No perspective table nodes"
assert len({n.table_definition.name for n in nodes}) == len(
nodes
), "Duplicate table name"

thread = threading.Thread(
target=perspective_thread,
args=(manager, kafka_driver, nodes),
)
thread.daemon = True
thread.start()

return tornado.web.Application(
[
(
r"/websocket",
PerspectiveTornadoHandler,
{"manager": manager, "check_origin": True},
),
(
r"/assets/(.*)",
tornado.web.StaticFileHandler,
{"path": assets_directory, "default_filename": None},
),
(
r"/([a-z0-9_]*)",
TableRequestHandler,
{"table_configs": [node.get_table_config() for node in nodes]},
),
],
serve_traceback=True,
)


def run_web_application(web_app: tornado.web.Application, port: int):
web_app.listen(port)
loop = tornado.ioloop.IOLoop.current()
loop.start()
2 changes: 1 addition & 1 deletion beavers/pyarrow_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __call__(self, stream: pa.Table):

@dataclasses.dataclass(frozen=True)
class ArrowDagWrapper:
"""Helper call for adding pyarrow Nodes to a Dag."""
"""Helper for adding pyarrow Nodes to a Dag."""

_dag: Dag

Expand Down
63 changes: 63 additions & 0 deletions beavers/table.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<!DOCTYPE html>
<html>
<head>
<title>{{table_config.name}} Beavers</title>
<link rel="icon" type="image/x-icon" href="/assets/favicon.ico" />
<link rel="shortcut icon" type="image/x-icon" href="/assets/favicon.ico" />
<meta
name="viewport"
content="width=device-width, initial-scale=1, maximum-scale=1, minimum-scale=1, user-scalable=no"
/>
<script type="module" src="https://cdn.jsdelivr.net/npm/@finos/perspective@{{perspective_version}}/dist/cdn/perspective.js"></script>
<script type="module" src="https://cdn.jsdelivr.net/npm/@finos/perspective-viewer@{{perspective_version}}/dist/cdn/perspective-viewer.js"></script>
<script type="module" src="https://cdn.jsdelivr.net/npm/@finos/perspective-viewer-datagrid@{{perspective_version}}/dist/cdn/perspective-viewer-datagrid.js"></script>
<script type="module" src="https://cdn.jsdelivr.net/npm/@finos/perspective-viewer-d3fc@{{perspective_version}}/dist/cdn/perspective-viewer-d3fc.js" ></script>

<link
rel="stylesheet"
crossorigin="anonymous"
href="https://cdn.jsdelivr.net/npm/@finos/perspective-viewer@{{perspective_version}}/dist/css/pro.css"
/>

<style>
body {
height: 100vh;
width: 100vw;
font-family: 'Roboto Mono';
overflow: scroll;
}
.perspective-table-class {
position: relative;
height: 100%;
}
</style>
</head>

<body>
<perspective-viewer id="viewer" class="perspective-table-class"> </perspective-viewer>

<script type="module">
import perspective from "https://cdn.jsdelivr.net/npm/@finos/perspective@{{perspective_version}}/dist/cdn/perspective.js";

window.addEventListener("DOMContentLoaded", async function () {
const viewer = document.getElementById("viewer");
const websocket_uri = "ws://" + window.location.host + "/websocket";
const websocket = perspective.websocket(websocket_uri);
websocket._ws.onclose = function () {
viewer.style.display = "none"
}
const worker = perspective.worker();
const server_table = await websocket.open_table("{{table_config.name}}");
const server_view = await server_table.view();
await viewer.load(await worker.table(server_view, { index: "{{table_config.index}}" }));
let config = await viewer.save();

config.columns = {% raw json_encode(table_config.columns) %}
config.sort = {% raw json_encode(table_config.sort) %}
config.filter = {% raw json_encode(table_config.filters) %}

await viewer.restore(config);
});
</script>
</body>
</html>
2 changes: 1 addition & 1 deletion docs/concepts/kafka.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Live with Kafka

This section explains how to a beavers application in real time using kafka.
This section explains how to run a beavers application in real time using kafka.

## Count Word Example

Expand Down
Loading

0 comments on commit 464e49a

Please sign in to comment.