Skip to main content

Backend Architecture

The backend is a Python service that handles all data processing. It loads files, executes tool transformations, and returns results to the frontend. The frontend never touches actual data—it works with schemas and previews.

Technology Stack

LayerTechnologyWhy
FrameworkFastAPIAsync, fast, great DX, OpenAPI docs
DataPolarsLazy evaluation, blazing performance
ValidationPydanticType-safe request/response models
PackagingPyInstallerSingle-file executable distribution

Directory Structure

backend/app/
├── main.py # FastAPI entrypoint
├── config.py # Settings and configuration

├── api/ # HTTP layer
│ ├── router.py # Route aggregation
│ ├── routes/ # Endpoint handlers
│ │ ├── health.py # Health check
│ │ ├── schema.py # Schema inference
│ │ ├── preview.py # Data preview
│ │ └── execute.py # Workflow execution
│ └── models/ # Pydantic schemas
│ ├── workflow.py # Workflow, Tool, Wire
│ └── common.py # DataSchema, DataPreview

├── domain/ # Business logic
│ ├── tools/ # Tool implementations
│ │ ├── base.py # BaseTool ABC
│ │ ├── registry.py # Tool registry + decorator
│ │ ├── register.py # Imports all tools
│ │ └── implementations/
│ │ ├── input.py
│ │ ├── filter.py
│ │ ├── select.py
│ │ └── ...
│ ├── execution/ # Workflow execution
│ │ ├── executor.py # Main executor
│ │ └── graph.py # Dependency resolution
│ ├── workflow/ # Workflow utilities
│ │ └── archive.py # Audit archiving
│ └── datasources/ # File type handlers
│ ├── factory.py # Datasource factory
│ ├── csv.py # CSV loader
│ └── parquet.py # Parquet loader

├── middleware/ # Request middleware
│ └── audit.py # Audit logging

└── utils/ # Shared utilities
├── errors.py # Custom exceptions
└── type_mapping.py # Polars ↔ JSON type conversion

Tool System

The heart of the backend. Every transformation (Filter, Select, Join, etc.) is a Tool.

BaseTool Interface

All tools inherit from BaseTool:

class BaseTool(ABC):
@abstractmethod
async def execute(
self,
config: dict[str, Any],
inputs: dict[str, pl.LazyFrame | list[pl.LazyFrame]]
) -> dict[str, pl.LazyFrame]:
"""Execute tool logic. Returns LazyFrames."""
pass

@abstractmethod
async def get_output_schema(
self,
config: dict[str, Any],
input_schemas: dict[str, DataSchema | list[DataSchema]]
) -> dict[str, DataSchema]:
"""Return schema without execution (for UI dropdowns)."""
pass

async def validate_config(self, config: dict[str, Any]) -> list[str]:
"""Validate configuration. Return error messages."""
return []

Key design decisions:

  1. Async by default: All methods are async, even if they don't need it. Consistency matters.
  2. LazyFrame in, LazyFrame out: Keep execution lazy. Never call .collect() unless you must.
  3. Socket-based I/O: Tools can have multiple inputs/outputs. The dict key is the socket ID.
  4. Schema without execution: get_output_schema lets the UI know what columns will exist without running the pipeline.

Tool Registry

Tools self-register using a decorator:

# backend/app/domain/tools/implementations/filter.py
from app.domain.tools.registry import register_tool

@register_tool("Filter") # This string must match frontend tool type
class FilterTool(BaseTool):
async def execute(self, config, inputs):
lf = inputs["input"]
expression = config.get("expression", "")
# ... filter logic
return {"output-true": true_result, "output-false": false_result}

The registry is a simple class-level dict:

class ToolRegistry:
_tools: dict[str, Type[BaseTool]] = {}

@classmethod
def register(cls, tool_type: str, tool_class: Type[BaseTool]):
cls._tools[tool_type] = tool_class

@classmethod
def get(cls, tool_type: str) -> BaseTool:
return cls._tools[tool_type]()

All tool implementations are imported in register.py, which triggers the decorators:

# backend/app/domain/tools/register.py
from app.domain.tools.implementations import (
input,
output,
filter,
select,
sort,
formula,
join,
union,
summarize,
)

And register.py is imported in main.py:

# backend/app/main.py
import app.domain.tools.register # noqa: F401

This ensures all tools are registered before any API request.

Execution Engine

The WorkflowExecutor orchestrates tool execution:

Execution Graph

Before executing, we build a dependency graph:

class ExecutionGraph:
def __init__(self, workflow: Workflow):
self.workflow = workflow
self.dependencies = self._build_dependencies()

def get_execution_order(self, target_tool_id: str = None) -> list[str]:
"""Topological sort of dependencies."""
# Returns tool IDs in order they should execute

This handles:

  • Determining which tools need to run for a given target
  • Detecting cycles (which should be caught by frontend, but we verify)
  • Computing dependency depth for parallel execution (future)

Execution Flow

  1. Build execution graph from workflow
  2. Determine execution order (topological sort)
  3. For each tool in order:
    • Get inputs from previously executed tools (or datasources)
    • Execute tool, get LazyFrame outputs
    • Store outputs for downstream tools
  4. Collect results for target tool (preview or output)
async def execute_workflow(
self,
workflow: Workflow,
target_tool_id: str | None = None,
preview_limit: int | None = None,
) -> dict[str, Any]:
graph = ExecutionGraph(workflow)
execution_order = graph.get_execution_order(target_tool_id)

for tool_id in execution_order:
tool = self._get_tool(tool_id)
inputs = self._gather_inputs(tool_id)
outputs = await tool.execute(tool.config, inputs)
self.results[tool_id] = outputs

# Collect and return target results
return self._collect_results(target_tool_id, preview_limit)

API Endpoints

Health Check

GET /api/health/

Returns service status. Used by frontend to verify backend is running.

Schema

POST /api/workflow/tool/schema
{
"tool_id": "abc123",
"workflow": { ... }
}

Returns output schema for a tool without executing. Used to populate column dropdowns in tool configuration.

Preview / Execute Tool

POST /api/workflow/tool/execute
{
"tool_id": "abc123",
"workflow": { ... },
"preview_limit": 100
}

Executes workflow up to target tool, returns sample rows. This is where .collect() happens.

Execute Workflow

POST /api/workflow/execute
{
"workflow": { ... }
}

Full workflow execution. Output tools write files, execution metrics returned.

Polars Lazy Evaluation

This is why Sigilweaver can handle large datasets efficiently.

Bad (eager evaluation):

df = pl.read_csv("huge.csv")  # Loads entire file into memory
df = df.filter(pl.col("age") > 30) # Creates new DataFrame
df = df.select(["name", "age"]) # Creates another DataFrame

Good (lazy evaluation):

lf = pl.scan_csv("huge.csv")  # Returns immediately, nothing loaded
lf = lf.filter(pl.col("age") > 30) # Builds query plan
lf = lf.select(["name", "age"]) # Adds to query plan
result = lf.head(100).collect() # NOW it reads file, only what's needed

Tools operate on LazyFrames. We only .collect() when:

  • Returning preview data
  • Writing output files
  • Schema inference requires it (some cases)

Datasources

The datasources/ module handles file loading:

class DatasourceFactory:
@staticmethod
def create(path: str) -> Datasource:
ext = Path(path).suffix.lower()
if ext == ".csv":
return CSVDatasource(path)
elif ext in [".parquet", ".pq"]:
return ParquetDatasource(path)
raise ValueError(f"Unsupported file type: {ext}")

Each datasource implements:

  • scan() → LazyFrame (lazy load)
  • infer_schema() → DataSchema (column names and types)

Error Handling

Custom exceptions in utils/errors.py:

class ToolError(Exception):
"""Error in tool execution"""

class ExecutionError(Exception):
"""Error in workflow execution"""

class ValidationError(Exception):
"""Invalid configuration or workflow"""

These bubble up through the API layer and get converted to appropriate HTTP responses.

Testing

Backend tests focus on:

  1. Tool execution: Each tool has tests verifying transformations
  2. Schema propagation: Verify output schemas are computed correctly
  3. Execution order: Graph building and topological sort
  4. Edge cases: Missing inputs, invalid config, type mismatches

See Backend Testing for details.


Next: Architectural Decisions or Adding Tools.