Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ src/ai_company/
communication/ # Message bus, dispatcher, messenger, channels, delegation, loop prevention, conflict resolution, meeting protocol
config/ # YAML company config loading and validation
core/ # Shared domain models and base classes
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task lifecycle, recovery, and shutdown
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, task lifecycle, recovery, and shutdown
memory/ # Persistent agent memory (memory layer TBD)
observability/ # Structured logging, correlation tracking, log sinks
providers/ # LLM provider abstraction (LiteLLM adapter)
Expand Down
7 changes: 7 additions & 0 deletions DESIGN_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -2386,6 +2386,12 @@ ai-company/
│ │ ├── parallel_models.py # AgentAssignment, ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult, ParallelProgress
│ │ ├── resource_lock.py # ResourceLock protocol + InMemoryResourceLock
│ │ ├── shutdown.py # Graceful shutdown strategy & manager
│ │ ├── assignment/ # Task assignment subsystem
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── models.py # AssignmentRequest, AssignmentResult, AssignmentCandidate, AgentWorkload
│ │ │ ├── protocol.py # TaskAssignmentStrategy protocol
│ │ │ ├── service.py # TaskAssignmentService (orchestrates strategy + validation)
│ │ │ └── strategies.py # ManualAssignmentStrategy, RoleBasedAssignmentStrategy, LoadBalancedAssignmentStrategy
│ │ ├── decomposition/ # Task decomposition subsystem
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── classifier.py # TaskStructureClassifier (sequential/parallel/mixed)
Expand Down Expand Up @@ -2490,6 +2496,7 @@ ai-company/
│ │ │ ├── routing.py # ROUTING_* constants
│ │ │ ├── sandbox.py # SANDBOX_* constants
│ │ │ ├── task.py # TASK_* constants
│ │ │ ├── task_assignment.py # TASK_ASSIGNMENT_* constants
│ │ │ ├── task_routing.py # TASK_ROUTING_* constants
│ │ │ ├── template.py # TEMPLATE_* constants
│ │ │ └── tool.py # TOOL_* constants
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ AI Company lets you spin up a virtual organization staffed entirely by AI agents
- **Hierarchical Delegation** - Chain-of-command task delegation with five-mechanism loop prevention
- **Conflict Resolution** - Pluggable strategies for resolving agent disagreements (authority, debate, human escalation, hybrid) with dissent audit trail
- **Task Decomposition & Routing** - DAG-based subtask decomposition, structure classification, and agent-task scoring
- **Task Assignment** - Pluggable strategies (manual, role-based, load-balanced) for matching tasks to capable agents
- **Configurable Autonomy** - From fully autonomous to human-approves-everything, with a Security Ops agent in between
- **Persistent Memory** - Agents remember past decisions, code, relationships (memory layer TBD)
- **HR System** - Hire, fire, promote agents. HR agent analyzes skill gaps and proposes candidates
Expand Down
1 change: 1 addition & 0 deletions src/ai_company/config/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ def default_config_dict() -> dict[str, Any]:
"workflow_handoffs": [],
"escalation_paths": [],
"coordination_metrics": {},
"task_assignment": {},
}
41 changes: 41 additions & 0 deletions src/ai_company/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,41 @@ class GracefulShutdownConfig(BaseModel):
)


class TaskAssignmentConfig(BaseModel):
"""Configuration for task assignment behaviour.

Attributes:
strategy: Assignment strategy name (e.g. ``"role_based"``).
min_score: Minimum capability score for agent eligibility.
max_concurrent_tasks_per_agent: Maximum tasks an agent can
handle concurrently.
"""

model_config = ConfigDict(frozen=True, allow_inf_nan=False)

strategy: NotBlankStr = Field(
default="role_based",
description="Assignment strategy name",
)
Comment on lines +373 to +376

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strategy field is not validated against known strategy names

TaskAssignmentConfig.strategy accepts any NotBlankStr with no validator checking it against the actual entries in STRATEGY_MAP ("manual", "role_based", "load_balanced"). A misconfigured value like "auction" or a typo like "role_base" passes config validation cleanly and will only fail (or be silently ignored) when the caller tries to resolve the strategy at runtime.

Add a @model_validator method to assert the value is a known key. The allowed set can be defined as a constant in engine/assignment/strategies.py to avoid circular imports between config and engine modules.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/config/schema.py
Line: 373-376

Comment:
`strategy` field is not validated against known strategy names

`TaskAssignmentConfig.strategy` accepts any `NotBlankStr` with no validator checking it against the actual entries in `STRATEGY_MAP` (`"manual"`, `"role_based"`, `"load_balanced"`). A misconfigured value like `"auction"` or a typo like `"role_base"` passes config validation cleanly and will only fail (or be silently ignored) when the caller tries to resolve the strategy at runtime.

Add a `@model_validator` method to assert the value is a known key. The allowed set can be defined as a constant in `engine/assignment/strategies.py` to avoid circular imports between `config` and `engine` modules.

How can I resolve this? If you propose a fix, please make it concise.

min_score: float = Field(
default=0.1,
ge=0.0,
le=1.0,
description="Minimum capability score for agent eligibility",
)
max_concurrent_tasks_per_agent: int = Field(
default=5,
ge=1,
le=50,
description=(
"Maximum concurrent tasks per agent. "
"Reserved for engine-layer enforcement — the engine "
"should pass this value to AssignmentRequest when "
"wiring up the assignment pipeline."
),
)


Copilot AI Mar 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_concurrent_tasks_per_agent's description says the engine “should pass this value to AssignmentRequest”, but AssignmentRequest currently has no field to carry this value and none of the strategies enforce it. Either remove/reword this guidance, or add the corresponding request field/enforcement wiring so the config is actionable.

Suggested change
"Maximum concurrent tasks per agent. "
"Reserved for engine-layer enforcement — the engine "
"should pass this value to AssignmentRequest when "
"wiring up the assignment pipeline."
),
)
"Maximum concurrent tasks an agent is intended to handle. "
"Actual enforcement must be implemented by the task "
"assignment/engine layer."
),
)

Copilot uses AI. Check for mistakes.
class RootConfig(BaseModel):
"""Root company configuration — the top-level validation target.

Expand All @@ -379,6 +414,8 @@ class RootConfig(BaseModel):
graceful_shutdown: Graceful shutdown configuration.
workflow_handoffs: Cross-department workflow handoffs.
escalation_paths: Cross-department escalation paths.
coordination_metrics: Coordination metrics configuration.
task_assignment: Task assignment configuration.
"""

model_config = ConfigDict(frozen=True)
Expand Down Expand Up @@ -442,6 +479,10 @@ class RootConfig(BaseModel):
default_factory=CoordinationMetricsConfig,
description="Coordination metrics configuration",
)
task_assignment: TaskAssignmentConfig = Field(
default_factory=TaskAssignmentConfig,
description="Task assignment configuration",
)

@model_validator(mode="after")
def _validate_unique_agent_names(self) -> Self:
Expand Down
32 changes: 32 additions & 0 deletions src/ai_company/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@
"""

from ai_company.engine.agent_engine import AgentEngine
from ai_company.engine.assignment import (
STRATEGY_MAP,
STRATEGY_NAME_LOAD_BALANCED,
STRATEGY_NAME_MANUAL,
STRATEGY_NAME_ROLE_BASED,
AgentWorkload,
AssignmentCandidate,
AssignmentRequest,
AssignmentResult,
LoadBalancedAssignmentStrategy,
ManualAssignmentStrategy,
RoleBasedAssignmentStrategy,
TaskAssignmentService,
TaskAssignmentStrategy,
)
from ai_company.engine.context import (
DEFAULT_MAX_TURNS,
AgentContext,
Expand Down Expand Up @@ -33,9 +48,11 @@
ExecutionStateError,
LoopExecutionError,
MaxTurnsExceededError,
NoEligibleAgentError,
ParallelExecutionError,
PromptBuildError,
ResourceConflictError,
TaskAssignmentError,
TaskRoutingError,
)
from ai_company.engine.loop_protocol import (
Expand Down Expand Up @@ -97,6 +114,10 @@

__all__ = [
"DEFAULT_MAX_TURNS",
"STRATEGY_MAP",
"STRATEGY_NAME_LOAD_BALANCED",
"STRATEGY_NAME_MANUAL",
"STRATEGY_NAME_ROLE_BASED",
"ZERO_TOKEN_USAGE",
"AgentAssignment",
"AgentContext",
Expand All @@ -105,6 +126,10 @@
"AgentOutcome",
"AgentRunResult",
"AgentTaskScorer",
"AgentWorkload",
"AssignmentCandidate",
"AssignmentRequest",
"AssignmentResult",
"AutoTopologyConfig",
"BudgetChecker",
"BudgetExhaustedError",
Expand All @@ -127,9 +152,12 @@
"ExecutionStateError",
"FailAndReassignStrategy",
"InMemoryResourceLock",
"LoadBalancedAssignmentStrategy",
"LoopExecutionError",
"ManualAssignmentStrategy",
"ManualDecompositionStrategy",
"MaxTurnsExceededError",
"NoEligibleAgentError",
"ParallelExecutionError",
"ParallelExecutionGroup",
"ParallelExecutionResult",
Expand All @@ -146,6 +174,7 @@
"RecoveryStrategy",
"ResourceConflictError",
"ResourceLock",
"RoleBasedAssignmentStrategy",
"RoutingCandidate",
"RoutingDecision",
"RoutingResult",
Expand All @@ -159,6 +188,9 @@
"SubtaskDefinition",
"SubtaskStatusRollup",
"SystemPrompt",
"TaskAssignmentError",
"TaskAssignmentService",
"TaskAssignmentStrategy",
"TaskCompletionMetrics",
"TaskExecution",
"TaskRoutingError",
Expand Down
39 changes: 39 additions & 0 deletions src/ai_company/engine/assignment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Task assignment engine.

Assigns tasks to agents using pluggable strategies: manual
designation, role-based scoring, or load-balanced selection.
"""

from ai_company.engine.assignment.models import (
AgentWorkload,
AssignmentCandidate,
AssignmentRequest,
AssignmentResult,
)
from ai_company.engine.assignment.protocol import TaskAssignmentStrategy
from ai_company.engine.assignment.service import TaskAssignmentService
from ai_company.engine.assignment.strategies import (
STRATEGY_MAP,
STRATEGY_NAME_LOAD_BALANCED,
STRATEGY_NAME_MANUAL,
STRATEGY_NAME_ROLE_BASED,
LoadBalancedAssignmentStrategy,
ManualAssignmentStrategy,
RoleBasedAssignmentStrategy,
)

__all__ = [
"STRATEGY_MAP",
"STRATEGY_NAME_LOAD_BALANCED",
"STRATEGY_NAME_MANUAL",
"STRATEGY_NAME_ROLE_BASED",
"AgentWorkload",
"AssignmentCandidate",
"AssignmentRequest",
"AssignmentResult",
"LoadBalancedAssignmentStrategy",
"ManualAssignmentStrategy",
"RoleBasedAssignmentStrategy",
"TaskAssignmentService",
"TaskAssignmentStrategy",
]
154 changes: 154 additions & 0 deletions src/ai_company/engine/assignment/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Task assignment domain models.

Frozen Pydantic models for assignment requests, results,
agent workloads, and assignment candidates.
"""

from typing import Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

from ai_company.core.agent import AgentIdentity # noqa: TC001
from ai_company.core.task import Task # noqa: TC001
from ai_company.core.types import NotBlankStr # noqa: TC001


class AgentWorkload(BaseModel):
"""Snapshot of an agent's current workload.

Attributes:
agent_id: Unique agent identifier.
active_task_count: Number of tasks currently in progress.
total_cost_usd: Total cost incurred by this agent in USD.
"""

model_config = ConfigDict(frozen=True, allow_inf_nan=False)

agent_id: NotBlankStr = Field(description="Agent identifier")
active_task_count: int = Field(
ge=0,
description="Number of tasks currently in progress",
)
total_cost_usd: float = Field(
default=0.0,
ge=0.0,
description="Total cost incurred by this agent in USD",
)


class AssignmentCandidate(BaseModel):
"""A candidate agent for task assignment with scoring details.

Attributes:
agent_identity: The candidate agent.
score: Match score between 0.0 and 1.0.
matched_skills: Skills that matched the assignment requirements.
reason: Human-readable explanation of the score.
"""

model_config = ConfigDict(frozen=True)

agent_identity: AgentIdentity = Field(description="Candidate agent")
score: float = Field(
ge=0.0,
le=1.0,
description="Match score (0.0-1.0)",
)

Copilot AI Mar 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AssignmentCandidate uses ConfigDict(frozen=True) without allow_inf_nan=False, so score could potentially accept NaN/Inf values even though it has range constraints. Consider setting allow_inf_nan=False (as done in other numeric-heavy models) to prevent non-finite scores from entering the assignment pipeline.

Copilot uses AI. Check for mistakes.
matched_skills: tuple[NotBlankStr, ...] = Field(
default=(),
description="Skills that matched assignment requirements",
)
reason: NotBlankStr = Field(description="Explanation of score")


class AssignmentRequest(BaseModel):
"""Request for task assignment to an agent.

The ``required_skills`` and ``required_role`` fields live here
(not on Task) so that scoring strategies can evaluate agent-task
fit without modifying the Task model.

Attributes:
task: The task to assign.
available_agents: Pool of agents to consider (must be non-empty).
workloads: Current workload snapshots per agent.
min_score: Minimum score threshold for eligibility.
required_skills: Skill names needed for scoring.
required_role: Optional role name for scoring.
"""

model_config = ConfigDict(frozen=True)

task: Task = Field(description="The task to assign")
available_agents: tuple[AgentIdentity, ...] = Field(
description="Pool of agents to consider",
)
workloads: tuple[AgentWorkload, ...] = Field(
default=(),
description="Current workload snapshots per agent",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
min_score: float = Field(
default=0.1,
ge=0.0,
le=1.0,
description="Minimum score threshold for eligibility",
)

Copilot AI Mar 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AssignmentRequest doesn't set allow_inf_nan=False, so min_score may accept NaN/Inf (Pydantic constraints don't reliably reject NaN unless configured). That can lead to confusing behavior (e.g., NaN comparisons filtering all candidates). Consider adding allow_inf_nan=False to the model config.

Copilot uses AI. Check for mistakes.
required_skills: tuple[NotBlankStr, ...] = Field(
default=(),
description="Skill names needed for scoring",
)
required_role: NotBlankStr | None = Field(
default=None,
description="Optional role name for scoring",
)

@model_validator(mode="after")
def _validate_non_empty_agents(self) -> Self:
"""Ensure at least one agent is available for assignment."""
if not self.available_agents:
msg = "available_agents must not be empty"
raise ValueError(msg)
return self
Comment on lines +108 to +128

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The AssignmentRequest model could be made more robust by validating the uniqueness of agent identifiers in available_agents and workloads. Currently, if there are duplicate agent IDs in these tuples, the logic in the assignment strategies might silently use the first occurrence or, in the case of workloads being converted to a dictionary, the last occurrence. This could lead to incorrect behavior or hide data integrity issues.

I suggest combining the existing _validate_non_empty_agents check with new uniqueness checks for both available_agents and workloads into a single, more comprehensive validator. This will make the system more predictable and easier to debug.

    @model_validator(mode="after")
    def _validate_request_collections(self) -> Self:
        """Ensure collections in the request are valid.

        - `available_agents` must not be empty.
        - Agent IDs in `available_agents` must be unique.
        - Agent IDs in `workloads` must be unique.
        """
        if not self.available_agents:
            msg = "available_agents must not be empty"
            raise ValueError(msg)

        agent_ids = [a.id for a in self.available_agents]
        if len(agent_ids) != len(set(agent_ids)):
            from collections import Counter

            dupes = sorted(str(i) for i, c in Counter(agent_ids).items() if c > 1)
            msg = f"Duplicate agent IDs in available_agents: {dupes}"
            raise ValueError(msg)

        if self.workloads:
            workload_agent_ids = [w.agent_id for w in self.workloads]
            if len(workload_agent_ids) != len(set(workload_agent_ids)):
                from collections import Counter

                dupes = sorted(i for i, c in Counter(workload_agent_ids).items() if c > 1)
                msg = f"Duplicate agent_id in workloads: {dupes}"
                raise ValueError(msg)

        return self



class AssignmentResult(BaseModel):
"""Result of a task assignment operation.

Attributes:
task_id: ID of the task that was assigned.
strategy_used: Name of the strategy that produced this result.
selected: The selected candidate (None if no viable agent).
alternatives: Other candidates considered, ranked by score.
reason: Human-readable explanation of the assignment decision.
"""

model_config = ConfigDict(frozen=True)

task_id: NotBlankStr = Field(description="Task identifier")
strategy_used: NotBlankStr = Field(
description="Name of the strategy used",
)
selected: AssignmentCandidate | None = Field(
default=None,
description="Selected candidate (None if no viable agent)",
)
alternatives: tuple[AssignmentCandidate, ...] = Field(
default=(),
description="Other candidates considered, ranked by score",
)
reason: NotBlankStr = Field(description="Explanation of decision")

Copilot AI Mar 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AssignmentResult also lacks allow_inf_nan=False. If any strategy accidentally emits a non-finite score inside selected/alternatives, it will validate and then propagate into logs/metrics. Consider setting allow_inf_nan=False on this model as well for consistency with the rest of the codebase.

Copilot uses AI. Check for mistakes.

@model_validator(mode="after")
def _validate_selected_not_in_alternatives(self) -> Self:
"""Ensure selected candidate is not duplicated in alternatives."""
if self.selected is None:
return self
selected_id = self.selected.agent_identity.id
for alt in self.alternatives:
if alt.agent_identity.id == selected_id:
selected_name = self.selected.agent_identity.name
msg = (
f"Selected candidate {selected_name!r} also appears in alternatives"
)
raise ValueError(msg)
return self
Loading