Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ config.yml:
install: .venv/bin/python .venv/bin/pip-licenses .venv/bin/elastic-ingest
.venv/bin/pip-licenses --format=plain-vertical --with-license-file --no-license-path > NOTICE.txt

install-agent: install
.venv/bin/pip install -r requirements/agent.txt

.venv/bin/elastic-ingest: .venv/bin/python
.venv/bin/pip install -r requirements/$(ARCH).txt
.venv/bin/python setup.py develop
Expand Down Expand Up @@ -64,7 +67,10 @@ autoformat: .venv/bin/python .venv/bin/ruff .venv/bin/elastic-ingest
.venv/bin/ruff format setup.py

test: .venv/bin/pytest .venv/bin/elastic-ingest
.venv/bin/pytest --cov-report term-missing --cov-fail-under 92 --cov-report html --cov=connectors --fail-slow=$(SLOW_TEST_THRESHOLD) -sv tests
.venv/bin/pytest --cov-report term-missing --cov-fail-under 92 --cov-report html --cov=connectors --fail-slow=$(SLOW_TEST_THRESHOLD) -sv tests --ignore tests/agent
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring tests/agent here cause repo that we build package from is private and it'll fail in CI. We can set it up in CI to pull the repo too, but occasional user of main can run into problems and I'd rather keep main clean and functioning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd challenge that we don't care about 2xx, in operational review discussions we've discussed that bumps in traffic

++

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry what's that quote? :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


test-agent: .venv/bin/pytest .venv/bin/elastic-ingest install-agent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add this to CI now, to avoid forgetting or letting it get too behind in coverage. I think that ElasticMachine should be able to clone the private repo. Hit me up if it's not working.

.venv/bin/pytest --cov-report term-missing --cov-fail-under 92 --cov-report html --cov=connectors/agent --fail-slow=$(SLOW_TEST_THRESHOLD) -sv tests/agent

release: install
.venv/bin/python setup.py sdist
Expand Down
Empty file added connectors/agent/README.md
Empty file.
42 changes: 42 additions & 0 deletions connectors/agent/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import asyncio
import functools
import signal

from elastic_agent_client.util.async_tools import (
sleeps_for_retryable,
)
from elastic_agent_client.util.logger import logger

from connectors.agent.component import ConnectorsAgentComponent


def main(args=None):
"""Script entry point into running Connectors Service on Agent.

It initialises an event loop, creates a component and runs the component.
Additionally, signals are handled for graceful termination of the component.
"""
loop = asyncio.get_event_loop()
logger.info("Running agent")
component = ConnectorsAgentComponent()

def _shutdown(signal_name):
sleeps_for_retryable.cancel(signal_name)
component.stop(signal_name)

for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, functools.partial(_shutdown, sig.name))

return loop.run_until_complete(component.run())


if __name__ == "__main__":
try:
main()
finally:
logger.info("Bye")
74 changes: 74 additions & 0 deletions connectors/agent/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import sys

from elastic_agent_client.client import V2Options, VersionInfo
from elastic_agent_client.reader import new_v2_from_reader
from elastic_agent_client.service.actions import ActionsService
from elastic_agent_client.service.checkin import CheckinV2Service

from connectors.agent.config import ConnectorsAgentConfigurationWrapper
from connectors.agent.protocol import ConnectorActionHandler, ConnectorCheckinHandler
from connectors.agent.service_manager import ConnectorServiceManager
from connectors.services.base import MultiService
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is using the MulitService from Connectors instead of from the elastic_agent_client. Which I think is ok (and probably for the best) but just wanted to flag in case it wasn't intentional. I hadn't tested using ActionsService or CheckinV2Service with the Connectors MultiService.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's technically same code - we need to clean up the python-agent repo from some classes like this or move them to example namespace to not distribute these classes.


CONNECTOR_SERVICE = "connector-service"


class ConnectorsAgentComponent:
"""Entry point into running connectors service in Agent.

This class provides a simple abstraction over Agent components and Connectors Service manager.

It instantiates everything needed to read from Agent protocol, creates a wrapper around Connectors Service
and provides applied interface to be able to run it in 2 simple methods: run and stop.
"""

def __init__(self):
"""Inits the class.

Init should be safe to call without expectations of side effects (connections to Agent, blocking or anything).
"""
self.ver = VersionInfo(
name=CONNECTOR_SERVICE, meta={"input": CONNECTOR_SERVICE}
)
self.opts = V2Options()
self.buffer = sys.stdin.buffer
self.config_wrapper = ConnectorsAgentConfigurationWrapper()

async def run(self):
"""Start reading from Agent protocol and run Connectors Service with settings reported by agent.

This method can block if it's not running from Agent - it expects the client to be able to read messages
on initialisation. These messages are a handshake and it's a sync handshake.
If no messages are sent, this method can and will hang.

However, if ran under agent, this method will read configuration from Agent and attempt to start an
instance of Connectors Service with this configuration.

Additionally services for handling Check-in and Actions will be started to implement the protocol correctly.
"""
client = new_v2_from_reader(self.buffer, self.ver, self.opts)
action_handler = ConnectorActionHandler()
self.connector_service_manager = ConnectorServiceManager(self.config_wrapper)
checkin_handler = ConnectorCheckinHandler(
client, self.config_wrapper, self.connector_service_manager
)

self.multi_service = MultiService(
CheckinV2Service(client, checkin_handler),
ActionsService(client, action_handler),
self.connector_service_manager,
)

await self.multi_service.run()

def stop(self, sig):
"""Shutdown everything running in the component.

Attempts to gracefully shutdown the services that are running under the component.
"""
self.multi_service.shutdown(sig)
110 changes: 110 additions & 0 deletions connectors/agent/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
from connectors.config import add_defaults


class ConnectorsAgentConfigurationWrapper:
"""A wrapper that facilitates passing configuration from Agent to Connectors Service.

This class is responsible for:
- Storing in-memory configuration of Connectors Service running on Agent
- Transforming configuration reported by Agent to valid Connectors Service configuration
- Indicating that configuration has changed so that the user of the class can trigger the restart
"""

def __init__(self):
"""Inits the class.

There's default config that allows us to run connectors natively (see _force_allow_native flag),
when final configuration is reported these defaults will be merged with defaults from Connectors
Service config and specific config coming from Agent.
"""
self._default_config = {
"_force_allow_native": True,
"native_service_types": [
"azure_blob_storage",
"box",
"confluence",
"dropbox",
"github",
"gmail",
"google_cloud_storage",
"google_drive",
"jira",
"mongodb",
"mssql",
"mysql",
"notion",
"onedrive",
"oracle",
"outlook",
"network_drive",
"postgresql",
"s3",
"salesforce",
"servicenow",
"sharepoint_online",
"slack",
"microsoft_teams",
"zoom",
],
}

self.specific_config = {}

def try_update(self, source):
"""Try update the configuration and see if it changed.

This method takes the check-in event coming from Agent and checks if config needs an update.

If update is needed, configuration is updated and method returns True. If no update is needed
the method returns False.
"""

# TODO: find a good link to what this object is.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to be helpful, so digged into proto def, https://github.com/elastic/elastic-agent-client/blob/main/elastic-agent-client.proto#L168

But it seems like source is a an arbitrary struct hehe (so we have to look somewhere else for the actual definition) and take a leap of faith now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah judging by the protobuf files it's arbitrary:

    // Source is the original configuration of this unit configuration object in the agent policy.
    // Only standard fields are defined as explicit types, additional fields can be parsed from source.
    //
    // This source field will almost always contain arbitrary unit configuration fields beyond those
    // explicitly defined in this message type.
    google.protobuf.Struct source = 1;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I interrogated this during my POC was to just add logging. I'd get a checkin event, deserialize it, re-serialize it as json, then dump it in the logs.

has_hosts = source.fields.get("hosts")
has_api_key = source.fields.get("api_key")
has_basic_auth = source.fields.get("username") and source.fields.get("password")
if has_hosts and (has_api_key or has_basic_auth):
es_creds = {
"host": source["hosts"][0],
}

if source.fields.get("api_key"):
es_creds["api_key"] = source["api_key"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artem-shelkovnikov commenting so that it doesn't get lost, by default we would get Fleet-specific api key format, this should do the trick to turn it to our version:

    if source.fields.get("api_key"):
        api_key = source["api_key"]
        # if beats_logstash_format
        if ":" in api_key:
            api_key = base64.b64encode(api_key.encode()).decode()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified that this works

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the investigation! I'm gonna add this code as well :)

elif source.fields.get("username") and source.fields.get("password"):
es_creds["username"] = source["username"]
es_creds["password"] = source["password"]
else:
msg = "Invalid Elasticsearch credentials"
raise ValueError(msg)

new_config = {
"elasticsearch": es_creds,
}
self.specific_config = new_config
return True

return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add an error log here or something to leave a breadcrumb if we can't find basic auth or an api key


def get(self):
"""Get current Connectors Service configuration.

This method combines three configs with higher ones taking precedence:
- Config reported from Agent
- Default config stored in this class
- Default config of Connectors Service

Resulting config should be sufficient to run Connectors Service with.
"""
# First take "default config"
config = self._default_config.copy()
# Then override with what we get from Agent
config.update(self.specific_config)
# Then merge with default connectors config
configuration = dict(add_defaults(config))

return configuration
77 changes: 77 additions & 0 deletions connectors/agent/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
from elastic_agent_client.generated import elastic_agent_client_pb2 as proto
from elastic_agent_client.handler.action import BaseActionHandler
from elastic_agent_client.handler.checkin import BaseCheckinHandler
from elastic_agent_client.util.logger import logger


class ConnectorActionHandler(BaseActionHandler):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit/opinion - I don't like having multiple classes in one file like this, since I can never remember file names. I'd rather see each of these in their own file.

"""Class handling Agent actions.

As there are no actions that we can respond to, we don't actually do anything here.
"""

async def handle_action(self, action: proto.ActionRequest):
"""Implementation of BaseActionHandler.handle_action

Right now does nothing as Connectors Service has no actions to respond to.
"""
msg = (
f"This connector component can't handle action requests. Received: {action}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we include the error log? I don't see any other "wrapper" logic that would surface this to the user / logs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should log the error when called, right?

cc @seanstory will it terminate the process?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise NotImplementedError(msg)


class ConnectorCheckinHandler(BaseCheckinHandler):
"""Class handling to Agent check-in events.

Agent sends check-in events from time to time that might contain
information that's needed to run Connectors Service.

This class reads the events, sees if there's a reported change to connector-specific settings,
tries to update the configuration and, if the configuration is updated, restarts the Connectors Service.
"""

def __init__(self, client, agent_connectors_config_wrapper, service_manager):
"""Inits the class.

Initing this class should not produce side-effects.
"""
super().__init__(client)
self.agent_connectors_config_wrapper = agent_connectors_config_wrapper
self.service_manager = service_manager

async def apply_from_client(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a lot like the "Fake example", but I'm worried we'll need to implement more here. This seems to only look at the first unit's config, but doesn't consider log_level, features, or anything else from the checkin service's sync_component or sync_units method.

"""Implementation of BaseCheckinHandler.apply_from_client

This method is called by the Agent Protocol handlers when there's a check-in event
coming from Agent. This class reads the event and runs business logic based on the
content of the event.

If this class blocks for too long, the component will mark the agent as failed:
agent expects the components to respond within 30 seconds.
See comment in https://github.com/elastic/elastic-agent-client/blob/main/elastic-agent-client.proto#L29
"""
logger.info("There's new information for the components/units!")
if self.client.units:
logger.debug("Client reported units")
outputs = [
unit
for unit in self.client.units
if unit.unit_type == proto.UnitType.OUTPUT
]

if len(outputs) > 0 and outputs[0].config:
logger.debug("Outputs were found")
source = outputs[0].config.source

changed = self.agent_connectors_config_wrapper.try_update(source)
if changed:
logger.info("Updating connector service manager config")
self.service_manager.restart()
else:
logger.debug("No changes to connectors config")
Loading