Backend Architecture
The backend is a Python service that handles all data processing. It loads files, executes tool transformations, and returns results to the frontend. The frontend never touches actual data—it works with schemas and previews.
Technology Stack
| Layer | Technology | Why |
|---|---|---|
| Framework | FastAPI | Async, fast, great DX, OpenAPI docs |
| Data | Polars | Lazy evaluation, blazing performance |
| Validation | Pydantic | Type-safe request/response models |
| Packaging | PyInstaller | Single-file executable distribution |
Directory Structure
backend/app/
├── main.py # FastAPI entrypoint
├── config.py # Settings and configuration
│
├── api/ # HTTP layer
│ ├── router.py # Route aggregation
│ ├── routes/ # Endpoint handlers
│ │ ├── health.py # Health check
│ │ ├── schema.py # Schema inference
│ │ ├── preview.py # Data preview
│ │ └── execute.py # Workflow execution
│ └── models/ # Pydantic schemas
│ ├── workflow.py # Workflow, Tool, Wire
│ └── common.py # DataSchema, DataPreview
│
├── domain/ # Business logic
│ ├── tools/ # Tool implementations
│ │ ├── base.py # BaseTool ABC
│ │ ├── registry.py # Tool registry + decorator
│ │ ├── register.py # Imports all tools
│ │ └── implementations/
│ │ ├── input.py
│ │ ├── filter.py
│ │ ├── select.py
│ │ └── ...
│ ├── execution/ # Workflow execution
│ │ ├── executor.py # Main executor
│ │ └── graph.py # Dependency resolution
│ ├── workflow/ # Workflow utilities
│ │ └── archive.py # Audit archiving
│ └── datasources/ # File type handlers
│ ├── factory.py # Datasource factory
│ ├── csv.py # CSV loader
│ └── parquet.py # Parquet loader
│
├── middleware/ # Request middleware
│ └── audit.py # Audit logging
│
└── utils/ # Shared utilities
├── errors.py # Custom exceptions
└── type_mapping.py # Polars ↔ JSON type conversion
Tool System
The heart of the backend. Every transformation (Filter, Select, Join, etc.) is a Tool.
BaseTool Interface
All tools inherit from BaseTool:
class BaseTool(ABC):
@abstractmethod
async def execute(
self,
config: dict[str, Any],
inputs: dict[str, pl.LazyFrame | list[pl.LazyFrame]]
) -> dict[str, pl.LazyFrame]:
"""Execute tool logic. Returns LazyFrames."""
pass
@abstractmethod
async def get_output_schema(
self,
config: dict[str, Any],
input_schemas: dict[str, DataSchema | list[DataSchema]]
) -> dict[str, DataSchema]:
"""Return schema without execution (for UI dropdowns)."""
pass
async def validate_config(self, config: dict[str, Any]) -> list[str]:
"""Validate configuration. Return error messages."""
return []
Key design decisions:
- Async by default: All methods are async, even if they don't need it. Consistency matters.
- LazyFrame in, LazyFrame out: Keep execution lazy. Never call
.collect()unless you must. - Socket-based I/O: Tools can have multiple inputs/outputs. The dict key is the socket ID.
- Schema without execution:
get_output_schemalets the UI know what columns will exist without running the pipeline.
Tool Registry
Tools self-register using a decorator:
# backend/app/domain/tools/implementations/filter.py
from app.domain.tools.registry import register_tool
@register_tool("Filter") # This string must match frontend tool type
class FilterTool(BaseTool):
async def execute(self, config, inputs):
lf = inputs["input"]
expression = config.get("expression", "")
# ... filter logic
return {"output-true": true_result, "output-false": false_result}
The registry is a simple class-level dict:
class ToolRegistry:
_tools: dict[str, Type[BaseTool]] = {}
@classmethod
def register(cls, tool_type: str, tool_class: Type[BaseTool]):
cls._tools[tool_type] = tool_class
@classmethod
def get(cls, tool_type: str) -> BaseTool:
return cls._tools[tool_type]()
All tool implementations are imported in register.py, which triggers the decorators:
# backend/app/domain/tools/register.py
from app.domain.tools.implementations import (
input,
output,
filter,
select,
sort,
formula,
join,
union,
summarize,
)
And register.py is imported in main.py:
# backend/app/main.py
import app.domain.tools.register # noqa: F401
This ensures all tools are registered before any API request.
Execution Engine
The WorkflowExecutor orchestrates tool execution:
Execution Graph
Before executing, we build a dependency graph:
class ExecutionGraph:
def __init__(self, workflow: Workflow):
self.workflow = workflow
self.dependencies = self._build_dependencies()
def get_execution_order(self, target_tool_id: str = None) -> list[str]:
"""Topological sort of dependencies."""
# Returns tool IDs in order they should execute
This handles:
- Determining which tools need to run for a given target
- Detecting cycles (which should be caught by frontend, but we verify)
- Computing dependency depth for parallel execution (future)
Execution Flow
- Build execution graph from workflow
- Determine execution order (topological sort)
- For each tool in order:
- Get inputs from previously executed tools (or datasources)
- Execute tool, get LazyFrame outputs
- Store outputs for downstream tools
- Collect results for target tool (preview or output)
async def execute_workflow(
self,
workflow: Workflow,
target_tool_id: str | None = None,
preview_limit: int | None = None,
) -> dict[str, Any]:
graph = ExecutionGraph(workflow)
execution_order = graph.get_execution_order(target_tool_id)
for tool_id in execution_order:
tool = self._get_tool(tool_id)
inputs = self._gather_inputs(tool_id)
outputs = await tool.execute(tool.config, inputs)
self.results[tool_id] = outputs
# Collect and return target results
return self._collect_results(target_tool_id, preview_limit)
API Endpoints
Health Check
GET /api/health/
Returns service status. Used by frontend to verify backend is running.
Schema
POST /api/workflow/tool/schema
{
"tool_id": "abc123",
"workflow": { ... }
}
Returns output schema for a tool without executing. Used to populate column dropdowns in tool configuration.
Preview / Execute Tool
POST /api/workflow/tool/execute
{
"tool_id": "abc123",
"workflow": { ... },
"preview_limit": 100
}
Executes workflow up to target tool, returns sample rows. This is where .collect() happens.
Execute Workflow
POST /api/workflow/execute
{
"workflow": { ... }
}
Full workflow execution. Output tools write files, execution metrics returned.
Polars Lazy Evaluation
This is why Sigilweaver can handle large datasets efficiently.
Bad (eager evaluation):
df = pl.read_csv("huge.csv") # Loads entire file into memory
df = df.filter(pl.col("age") > 30) # Creates new DataFrame
df = df.select(["name", "age"]) # Creates another DataFrame
Good (lazy evaluation):
lf = pl.scan_csv("huge.csv") # Returns immediately, nothing loaded
lf = lf.filter(pl.col("age") > 30) # Builds query plan
lf = lf.select(["name", "age"]) # Adds to query plan
result = lf.head(100).collect() # NOW it reads file, only what's needed
Tools operate on LazyFrames. We only .collect() when:
- Returning preview data
- Writing output files
- Schema inference requires it (some cases)
Datasources
The datasources/ module handles file loading:
class DatasourceFactory:
@staticmethod
def create(path: str) -> Datasource:
ext = Path(path).suffix.lower()
if ext == ".csv":
return CSVDatasource(path)
elif ext in [".parquet", ".pq"]:
return ParquetDatasource(path)
raise ValueError(f"Unsupported file type: {ext}")
Each datasource implements:
scan()→ LazyFrame (lazy load)infer_schema()→ DataSchema (column names and types)
Error Handling
Custom exceptions in utils/errors.py:
class ToolError(Exception):
"""Error in tool execution"""
class ExecutionError(Exception):
"""Error in workflow execution"""
class ValidationError(Exception):
"""Invalid configuration or workflow"""
These bubble up through the API layer and get converted to appropriate HTTP responses.
Testing
Backend tests focus on:
- Tool execution: Each tool has tests verifying transformations
- Schema propagation: Verify output schemas are computed correctly
- Execution order: Graph building and topological sort
- Edge cases: Missing inputs, invalid config, type mismatches
See Backend Testing for details.
Next: Architectural Decisions or Adding Tools.