Skip to content

Execution Module

Executor abstraction for running sandboxed commands locally or remotely.

Overview

The execution module provides an abstract base class and concrete implementations for executing sandboxed commands. Executors allow Shannot to work across different platforms and execution environments while maintaining a consistent interface.

Key Components:

  • SandboxExecutor - Abstract base class for all execution strategies
  • LocalExecutor - Execute commands locally using Bubblewrap (Linux only)
  • SSHExecutor - Execute commands on remote Linux systems via SSH

Executor Architecture

Executors separate the "how" of command execution from the "what". This allows:

  • Cross-platform support: Use SSH executor on macOS/Windows to execute on remote Linux
  • Remote diagnostics: Check production systems from development machines
  • Unified interface: Same code works locally and remotely
  • Pluggable backends: Easy to add new execution strategies

Common Usage Patterns

Local Execution

from shannot.executors import LocalExecutor
from shannot import load_profile_from_path

# Create local executor (Linux only)
executor = LocalExecutor()

# Load profile
profile = load_profile_from_path("~/.config/shannot/diagnostics.json")

# Execute command
result = await executor.run_command(profile, ["df", "-h"])
print(result.stdout)

Remote Execution via SSH

from shannot.executors import SSHExecutor

# Create SSH executor
executor = SSHExecutor(
    host="prod.example.com",
    username="readonly",
    key_filename="/path/to/ssh/key"
)

# Execute on remote system
result = await executor.run_command(profile, ["uptime"])
print(result.stdout)

# Clean up connection when done
await executor.cleanup()

Reading Files

# Read file using executor
content = await executor.read_file(profile, "/etc/os-release")
print(content)

# Works with both local and remote executors

With Context Manager

async with SSHExecutor(host="server.example.com") as executor:
    result = await executor.run_command(profile, ["ls", "/"])
    print(result.stdout)
# Connection automatically cleaned up

Timeout Handling

try:
    result = await executor.run_command(
        profile,
        ["sleep", "60"],
        timeout=5  # seconds
    )
except TimeoutError:
    print("Command timed out")

Integration with Tools

Executors integrate seamlessly with the tools module:

from shannot.tools import SandboxDeps
from shannot.executors import SSHExecutor

# Create executor
executor = SSHExecutor(host="prod.example.com")

# Use with tools
deps = SandboxDeps(
    profile_name="diagnostics",
    executor=executor
)

# Tools automatically use the executor
result = await run_sandbox_command(deps, command=["df", "-h"], args=[])

Platform Compatibility

Platform LocalExecutor SSHExecutor
Linux ✅ Yes ✅ Yes
macOS ❌ No ✅ Yes
Windows ❌ No ✅ Yes

Note: LocalExecutor requires Bubblewrap, which is Linux-only. Use SSHExecutor on non-Linux platforms to execute commands on remote Linux systems.

API Reference

execution

Executor abstraction for sandbox command execution.

This module provides the abstract base class for all execution strategies. Executors are responsible for running sandboxed commands, either locally or on remote systems. All executors implement the same interface to ensure tools and MCP code works unchanged.

Example: Local execution on Linux: >>> executor = LocalExecutor() >>> result = await executor.run_command(profile, ["ls", "/"])

Remote execution via SSH:
    >>> executor = SSHExecutor(host="prod.example.com")
    >>> result = await executor.run_command(profile, ["ls", "/"])

Classes

SandboxExecutor

Bases: ABC

Abstract base class for all execution strategies.

Executors are responsible for running sandboxed commands, either locally or on remote systems. All executors must implement the same interface to ensure tools/MCP code works unchanged.

The executor abstraction allows Shannot to work on any platform: - Linux: Use LocalExecutor for native bubblewrap execution - macOS/Windows: Use SSHExecutor to execute on remote Linux systems

Subclasses must implement: - run_command(): Execute command in sandbox

Subclasses may override: - read_file(): Read file from filesystem (default uses cat) - cleanup(): Clean up resources like connections

Source code in shannot/execution.py
class SandboxExecutor(ABC):
    """Abstract base class for all execution strategies.

    Executors are responsible for running sandboxed commands, either
    locally or on remote systems. All executors must implement the
    same interface to ensure tools/MCP code works unchanged.

    The executor abstraction allows Shannot to work on any platform:
    - Linux: Use LocalExecutor for native bubblewrap execution
    - macOS/Windows: Use SSHExecutor to execute on remote Linux systems

    Subclasses must implement:
        - run_command(): Execute command in sandbox

    Subclasses may override:
        - read_file(): Read file from filesystem (default uses cat)
        - cleanup(): Clean up resources like connections
    """

    @abstractmethod
    async def run_command(
        self, profile: SandboxProfile, command: list[str], timeout: int = 30
    ) -> ProcessResult:
        """Execute command in sandbox.

        This is the core method all executors must implement. It takes
        a sandbox profile (which defines allowed commands, mounts, etc.)
        and executes the command in that sandbox environment.

        Args:
            profile: Sandbox profile configuration
            command: Command to execute as list of strings
                    Example: ["ls", "-la", "/tmp"]
            timeout: Timeout in seconds (default: 30)

        Returns:
            ProcessResult with:
                - command: The command that was executed (tuple)
                - stdout: Standard output as string
                - stderr: Standard error as string
                - returncode: Exit code (0 = success)
                - duration: Execution time in seconds

        Raises:
            TimeoutError: Command exceeded timeout
            RuntimeError: Execution error (SSH connection failure, etc.)

        Example:
            >>> result = await executor.run_command(
            ...     profile,
            ...     ["echo", "hello"],
            ...     timeout=10
            ... )
            >>> assert result.returncode == 0
            >>> assert "hello" in result.stdout
        """
        raise NotImplementedError("Subclasses must implement run_command")

    async def read_file(self, profile: SandboxProfile, path: str) -> str:
        """Read file from filesystem.

        Default implementation uses 'cat' command via run_command.
        Subclasses can override for more efficient implementations
        (e.g., SSH executor could use SFTP).

        Args:
            profile: Sandbox profile (must allow 'cat' command)
            path: Absolute path to file to read

        Returns:
            File contents as string

        Raises:
            FileNotFoundError: File doesn't exist or can't be read

        Example:
            >>> content = await executor.read_file(
            ...     profile,
            ...     "/etc/os-release"
            ... )
            >>> assert "Linux" in content
        """
        result = await self.run_command(profile, ["cat", path])
        if result.returncode != 0:
            raise FileNotFoundError(f"Cannot read {path}: {result.stderr}")
        return result.stdout

    async def cleanup(self):
        """Cleanup resources (connections, temp files, etc.).

        Called when executor is no longer needed. Subclasses should
        override to clean up resources like:
        - SSH connection pools
        - Temporary files
        - Background processes

        Default implementation does nothing.

        Example:
            >>> executor = SSHExecutor(host="example.com")
            >>> try:
            ...     result = await executor.run_command(...)
            ... finally:
            ...     await executor.cleanup()  # Close SSH connections
        """
        # Default implementation: no cleanup needed
        return None
Functions
run_command(profile, command, timeout=30) abstractmethod async

Execute command in sandbox.

This is the core method all executors must implement. It takes a sandbox profile (which defines allowed commands, mounts, etc.) and executes the command in that sandbox environment.

Args: profile: Sandbox profile configuration command: Command to execute as list of strings Example: ["ls", "-la", "/tmp"] timeout: Timeout in seconds (default: 30)

Returns: ProcessResult with: - command: The command that was executed (tuple) - stdout: Standard output as string - stderr: Standard error as string - returncode: Exit code (0 = success) - duration: Execution time in seconds

Raises: TimeoutError: Command exceeded timeout RuntimeError: Execution error (SSH connection failure, etc.)

Example: >>> result = await executor.run_command( ... profile, ... ["echo", "hello"], ... timeout=10 ... ) >>> assert result.returncode == 0 >>> assert "hello" in result.stdout

Source code in shannot/execution.py
@abstractmethod
async def run_command(
    self, profile: SandboxProfile, command: list[str], timeout: int = 30
) -> ProcessResult:
    """Execute command in sandbox.

    This is the core method all executors must implement. It takes
    a sandbox profile (which defines allowed commands, mounts, etc.)
    and executes the command in that sandbox environment.

    Args:
        profile: Sandbox profile configuration
        command: Command to execute as list of strings
                Example: ["ls", "-la", "/tmp"]
        timeout: Timeout in seconds (default: 30)

    Returns:
        ProcessResult with:
            - command: The command that was executed (tuple)
            - stdout: Standard output as string
            - stderr: Standard error as string
            - returncode: Exit code (0 = success)
            - duration: Execution time in seconds

    Raises:
        TimeoutError: Command exceeded timeout
        RuntimeError: Execution error (SSH connection failure, etc.)

    Example:
        >>> result = await executor.run_command(
        ...     profile,
        ...     ["echo", "hello"],
        ...     timeout=10
        ... )
        >>> assert result.returncode == 0
        >>> assert "hello" in result.stdout
    """
    raise NotImplementedError("Subclasses must implement run_command")
read_file(profile, path) async

Read file from filesystem.

Default implementation uses 'cat' command via run_command. Subclasses can override for more efficient implementations (e.g., SSH executor could use SFTP).

Args: profile: Sandbox profile (must allow 'cat' command) path: Absolute path to file to read

Returns: File contents as string

Raises: FileNotFoundError: File doesn't exist or can't be read

Example: >>> content = await executor.read_file( ... profile, ... "/etc/os-release" ... ) >>> assert "Linux" in content

Source code in shannot/execution.py
async def read_file(self, profile: SandboxProfile, path: str) -> str:
    """Read file from filesystem.

    Default implementation uses 'cat' command via run_command.
    Subclasses can override for more efficient implementations
    (e.g., SSH executor could use SFTP).

    Args:
        profile: Sandbox profile (must allow 'cat' command)
        path: Absolute path to file to read

    Returns:
        File contents as string

    Raises:
        FileNotFoundError: File doesn't exist or can't be read

    Example:
        >>> content = await executor.read_file(
        ...     profile,
        ...     "/etc/os-release"
        ... )
        >>> assert "Linux" in content
    """
    result = await self.run_command(profile, ["cat", path])
    if result.returncode != 0:
        raise FileNotFoundError(f"Cannot read {path}: {result.stderr}")
    return result.stdout
cleanup() async

Cleanup resources (connections, temp files, etc.).

Called when executor is no longer needed. Subclasses should override to clean up resources like: - SSH connection pools - Temporary files - Background processes

Default implementation does nothing.

Example: >>> executor = SSHExecutor(host="example.com") >>> try: ... result = await executor.run_command(...) ... finally: ... await executor.cleanup() # Close SSH connections

Source code in shannot/execution.py
async def cleanup(self):
    """Cleanup resources (connections, temp files, etc.).

    Called when executor is no longer needed. Subclasses should
    override to clean up resources like:
    - SSH connection pools
    - Temporary files
    - Background processes

    Default implementation does nothing.

    Example:
        >>> executor = SSHExecutor(host="example.com")
        >>> try:
        ...     result = await executor.run_command(...)
        ... finally:
        ...     await executor.cleanup()  # Close SSH connections
    """
    # Default implementation: no cleanup needed
    return None