Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch integration #464

Merged
merged 9 commits into from
Feb 24, 2023
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ jobs:
- name: Checkout
uses: actions/checkout@v3

- name: Build DB container
run: docker build -t piker:elastic dockering/elastic

- name: Setup python
uses: actions/setup-python@v3
with:
python-version: '3.10'

- name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements.txt --upgrade-strategy eager
run: pip install -U .[es] -r requirements-test.txt -r requirements.txt --upgrade-strategy eager

- name: Test suite
run: pytest tests -rs
11 changes: 11 additions & 0 deletions dockering/elastic/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM elasticsearch:7.17.4

ENV ES_JAVA_OPTS "-Xms2g -Xmx2g"
ENV ELASTIC_USERNAME "elastic"
ENV ELASTIC_PASSWORD "password"

COPY elasticsearch.yml /usr/share/elasticsearch/config/

RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password"

EXPOSE 19200
5 changes: 5 additions & 0 deletions dockering/elastic/elasticsearch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
network.host: 0.0.0.0

http.port: 19200

discovery.type: single-node
54 changes: 54 additions & 0 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
)
from .brokers import get_brokermod

from pprint import pformat
from functools import partial


log = get_logger(__name__)

Expand Down Expand Up @@ -313,13 +316,18 @@ async def open_piker_runtime(

@acm
async def open_pikerd(

loglevel: str | None = None,

# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,

# db init flags
tsdb: bool = False,
es: bool = False,

) -> Services:
'''
Start a root piker daemon who's lifetime extends indefinitely until
Expand Down Expand Up @@ -349,12 +357,54 @@ async def open_pikerd(
):
assert root_actor.accept_addr == reg_addr

if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore

log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
start_ahab,
'marketstored',
start_marketstore,

)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)

if es:
from piker.data._ahab import start_ahab
from piker.data.elastic import start_elasticsearch

log.info('Spawning `elasticsearch` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
partial(
start_ahab,
'elasticsearch',
start_elasticsearch,
start_timeout=240.0 # high cause ci
)
)

log.info(
f'`elasticsearch` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)

# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode


try:
yield Services

finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
Expand Down Expand Up @@ -390,6 +440,8 @@ async def maybe_open_runtime(
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
tsdb: bool = False,
es: bool = False,

**kwargs,

Expand Down Expand Up @@ -439,6 +491,8 @@ async def maybe_open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
tsdb=tsdb,
es=es,

) as service_manager:
# in the case where we're starting up the
Expand Down
29 changes: 11 additions & 18 deletions piker/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
'''
import os
from pprint import pformat
from functools import partial

import click
import trio
Expand Down Expand Up @@ -48,18 +49,25 @@
is_flag=True,
help='Enable local ``marketstore`` instance'
)
@click.option(
'--es',
is_flag=True,
help='Enable local ``elasticsearch`` instance'
)
def pikerd(
loglevel: str,
host: str,
port: int,
tl: bool,
pdb: bool,
tsdb: bool,
es: bool,
):
'''
Spawn the piker broker-daemon.

'''

from .._daemon import open_pikerd
log = get_console_log(loglevel)

Expand All @@ -80,33 +88,17 @@ def pikerd(
)

async def main():

async with (
open_pikerd(
tsdb=tsdb,
es=es,
loglevel=loglevel,
debug_mode=pdb,
registry_addr=reg_addr,

), # normally delivers a ``Services`` handle
trio.open_nursery() as n,
):
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore

log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await n.start(
start_ahab,
'marketstored',
start_marketstore,

)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)

await trio.sleep_forever()

Expand Down Expand Up @@ -213,6 +205,7 @@ async def list_services():

def _load_clis() -> None:
from ..data import marketstore # noqa
from ..data import elastic
from ..data import cli # noqa
from ..brokers import cli # noqa
from ..ui import cli # noqa
Expand Down
62 changes: 42 additions & 20 deletions piker/data/_ahab.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def __init__(

async def process_logs_until(
self,
patt: str,
# this is a predicate func for matching log msgs emitted by the
# underlying containerized app
patt_matcher: Callable[[str], bool],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the typing here is techincally an Awaitable now, but it's not a big deal and i don't want this to delay the patch.

bp_on_msg: bool = False,
) -> bool:
'''
Expand All @@ -135,39 +137,53 @@ async def process_logs_until(
seen_so_far = self.seen_so_far

while True:
logs = self.cntr.logs()
try:
logs = self.cntr.logs()
except (
docker.errors.NotFound,
docker.errors.APIError
):
return False

entries = logs.decode().split('\n')
for entry in entries:

# ignore null lines
if not entry:
continue

entry = entry.strip()
try:
record = json.loads(entry.strip())
record = json.loads(entry)

if 'msg' in record:
msg = record['msg']
elif 'message' in record:
msg = record['message']
else:
raise KeyError(f'Unexpected log format\n{record}')

level = record['level']

except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = entry
level = 'error'

msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()

getattr(log, level, log.error)(f'{msg}')
getattr(log, level.lower(), log.error)(f'{msg}')

# print(f'level: {level}')
if level in ('error', 'fatal'):
if level == 'fatal':
guilledk marked this conversation as resolved.
Show resolved Hide resolved
raise ApplicationLogError(msg)

if patt in msg:
if await patt_matcher(msg):
return True

# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
await trio.sleep(0.1)

return False

Expand Down Expand Up @@ -285,6 +301,7 @@ async def cancel(
async def open_ahabd(
ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type
start_timeout: float = 1.0,

**kwargs,

Expand All @@ -300,17 +317,20 @@ async def open_ahabd(
(
dcntr,
cntr_config,
start_msg,
stop_msg,
start_lambda,
stop_lambda,
) = ep_func(client)
cntr = Container(dcntr)

with trio.move_on_after(1):
found = await cntr.process_logs_until(start_msg)
with trio.move_on_after(start_timeout):
found = await cntr.process_logs_until(start_lambda)

if not found and dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)

if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
f'Failed to start {dcntr.id} check logs deats'
)

await ctx.started((
Expand All @@ -326,12 +346,13 @@ async def open_ahabd(
await trio.sleep_forever()

finally:
await cntr.cancel(stop_msg)
await cntr.cancel(stop_lambda)


async def start_ahab(
service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer],
start_timeout: float = 1.0,
task_status: TaskStatus[
tuple[
trio.Event,
Expand Down Expand Up @@ -379,6 +400,7 @@ async def start_ahab(
async with portal.open_context(
open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)),
start_timeout=start_timeout
) as (ctx, first):

cid, pid, cntr_config = first
Expand Down
Loading