Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ langchain_callback:
pip install -e ".[langchain_callback]"

llama_index_callback:
pip install -e ".[llama_index_callback]"
pip install -e ".[llama_index_callback]"

instrumentation:
pip install -e ".[opentelemetry]"
5 changes: 2 additions & 3 deletions portkey_ai/api_resources/apis/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

class Logger:
def __init__(
self,
api_key: Optional[str] = None,
self, api_key: Optional[str] = None, base_url: Optional[str] = None
) -> None:
api_key = api_key or os.getenv("PORTKEY_API_KEY")
if api_key is None:
Expand All @@ -20,7 +19,7 @@ def __init__(
"x-portkey-api-key": api_key,
}

self.url = PORTKEY_BASE_URL + "/logs"
self.url = (base_url or PORTKEY_BASE_URL) + "/logs"

def log(
self,
Expand Down
4 changes: 4 additions & 0 deletions portkey_ai/api_resources/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(
self.cache_force_refresh = cache_force_refresh
self.custom_host = custom_host
self.forward_headers = forward_headers
self.instrumentation = instrumentation or False
self.openai_project = openai_project
self.openai_organization = openai_organization
self.aws_secret_access_key = aws_secret_access_key
Expand Down Expand Up @@ -709,6 +711,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -743,6 +746,7 @@ def __init__(
self.cache_force_refresh = cache_force_refresh
self.custom_host = custom_host
self.forward_headers = forward_headers
self.instrumentation = instrumentation
self.openai_project = openai_project
self.openai_organization = openai_organization
self.aws_secret_access_key = aws_secret_access_key
Expand Down
32 changes: 32 additions & 0 deletions portkey_ai/api_resources/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -98,6 +99,7 @@ def __init__(
cache_force_refresh=cache_force_refresh,
custom_host=custom_host,
forward_headers=forward_headers,
instrumentation=instrumentation,
openai_project=openai_project,
openai_organization=openai_organization,
aws_secret_access_key=aws_secret_access_key,
Expand Down Expand Up @@ -152,6 +154,18 @@ def __init__(
self.logs = apis.Logs(self)
self.beta = self.beta(self) # type: ignore

if self.instrumentation:
try:
from portkey_ai.api_resources.instrumentation import (
initialize_instrumentation,
)
except ImportError:
raise ImportError(
"""Please install opentelemetry for instrumentation,
you can use `make instrumentation` to install the dependencies"""
)
initialize_instrumentation(api_key=self.api_key, base_url=self.base_url)

def copy(
self,
*,
Expand All @@ -168,6 +182,7 @@ def copy(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -203,6 +218,7 @@ def copy(
cache_force_refresh=cache_force_refresh or self.cache_force_refresh,
custom_host=custom_host or self.custom_host,
forward_headers=forward_headers or self.forward_headers,
instrumentation=instrumentation or self.instrumentation,
openai_project=openai_project or self.openai_project,
openai_organization=openai_organization or self.openai_organization,
aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key,
Expand Down Expand Up @@ -287,6 +303,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -322,6 +339,7 @@ def __init__(
cache_force_refresh=cache_force_refresh,
custom_host=custom_host,
forward_headers=forward_headers,
instrumentation=instrumentation,
openai_project=openai_project,
openai_organization=openai_organization,
aws_secret_access_key=aws_secret_access_key,
Expand Down Expand Up @@ -376,6 +394,18 @@ def __init__(
self.logs = apis.AsyncLogs(self)
self.beta = self.beta(self) # type: ignore

if self.instrumentation:
try:
from portkey_ai.api_resources.instrumentation import (
initialize_instrumentation,
)
except ImportError:
raise ImportError(
"""Please install opentelemetry for instrumentation,
you can use `make instrumentation` to install the dependencies"""
)
initialize_instrumentation(api_key=self.api_key, base_url=self.base_url)

def copy(
self,
*,
Expand All @@ -392,6 +422,7 @@ def copy(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -427,6 +458,7 @@ def copy(
cache_force_refresh=cache_force_refresh or self.cache_force_refresh,
custom_host=custom_host or self.custom_host,
forward_headers=forward_headers or self.forward_headers,
instrumentation=instrumentation or self.instrumentation,
openai_project=openai_project or self.openai_project,
openai_organization=openai_organization or self.openai_organization,
aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key,
Expand Down
38 changes: 38 additions & 0 deletions portkey_ai/api_resources/instrumentation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from importlib.metadata import version, PackageNotFoundError
from typing import Dict
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]

from .crewai import CrewAIInstrumentor
from .litellm import LitellmInstrumentor
from .portkey_span_exporter import PortkeySpanExporter
from .langgraph import LanggraphInstrumentor

__all__ = ["initialize_instrumentation"]

package_instrumentor_map: Dict[str, BaseInstrumentor] = {
"crewai": CrewAIInstrumentor,
"litellm": LitellmInstrumentor,
"langgraph": LanggraphInstrumentor,
}


def is_package_installed(pkg_name):
try:
version(pkg_name)
return True
except PackageNotFoundError:
return False


def initialize_instrumentation(api_key: str, base_url: str):
tracer_provider = TracerProvider()
exporter = PortkeySpanExporter(api_key=api_key, base_url=base_url)
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
for package, instrumentor in package_instrumentor_map.items():
if is_package_installed(package):
instrumentor().instrument()
print(f"Portkey: {package} Instrumentation initialized")
3 changes: 3 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .instrumentation import CrewAIInstrumentor

__all__ = ["CrewAIInstrumentor"]
68 changes: 68 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import importlib.metadata
from typing import Any, Collection
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from portkey_ai.api_resources.instrumentation.crewai.patch import patch_crew


class CrewAIInstrumentor(BaseInstrumentor):
methods_to_patch = [
{
"module": "crewai.crew",
"method": "Crew.kickoff",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_for_each",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_async",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_for_each_async",
},
{
"module": "crewai.agent",
"method": "Agent.execute_task",
},
{
"module": "crewai.task",
"method": "Task.execute_sync",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.save",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.search",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.reset",
},
]

def instrumentation_dependencies(self) -> Collection[str]:
return ["crewai >= 0.32.0"]

def _instrument(self, **kwargs: Any) -> None:
version = importlib.metadata.version("crewai")
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
try:
for method in self.methods_to_patch:
wrap_function_wrapper(
module=method["module"],
name=method["method"],
wrapper=patch_crew(method["method"], version, tracer),
)
except Exception as e:
print(f"Failed to instrument CrewAI: {e}")

def _uninstrument(self, **kwargs: Any) -> None:
pass
50 changes: 50 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode

from portkey_ai.utils.json_utils import serialize_args, serialize_kwargs
from portkey_ai.api_resources.instrumentation.utils import (
set_members,
set_span_attribute,
)


def patch_crew(operation_name: str, version: str, tracer: trace.Tracer):
def traced_func(wrapped, instance, args, kwargs):
with tracer.start_as_current_span(
name=operation_name, kind=SpanKind.CLIENT
) as span:
try:
module_name = instance.__module__
class_name = instance.__class__.__name__

span.set_attribute("_source", "crewai")
span.set_attribute("_source_type", "agent framework")
span.set_attribute("framework.version", version)
span.set_attribute("module", module_name)
span.set_attribute("method", operation_name)
span.set_attribute("args", serialize_args(*args))
span.set_attribute("kwargs", serialize_kwargs(**kwargs))

result = wrapped(*args, **kwargs)
span.set_status(Status(StatusCode.OK))

try:
set_members(span, instance, module_name, class_name)
except Exception as e:
span.record_exception(e)

set_span_attribute(span, "result", result)

if class_name == "Crew":
for attr in ["tasks_output", "token_usage", "usage_metrics"]:
if hasattr(result, attr):
span.set_attribute(
f"crewai.crew.{attr}", str(getattr(result, attr))
)
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
return result

return traced_func
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from portkey_ai.api_resources.instrumentation.langgraph.instrumentation import (
LanggraphInstrumentor,
)

__all__ = ["LanggraphInstrumentor"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import importlib.metadata
from typing import Any, Collection
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from portkey_ai.api_resources.instrumentation.langgraph.patch import patch_langgraph


class LanggraphInstrumentor(BaseInstrumentor):
methods_to_patch = [
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_node",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_edge",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.set_entry_point",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.set_finish_point",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_conditional_edges",
},
]

def instrumentation_dependencies(self) -> Collection[str]:
return ["langgraph >= 0.2.0"]

def _instrument(self, **kwargs: Any) -> None:
version = importlib.metadata.version("langgraph")
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
try:
for method in self.methods_to_patch:
wrap_function_wrapper(
module=method["module"],
name=method["method"],
wrapper=patch_langgraph(method["method"], version, tracer),
)
except Exception as e:
print(f"Failed to instrument Langgraph: {e}")

def _uninstrument(self, **kwargs: Any) -> None:
pass
Loading
Loading