middleware-pipeline-pattern
Middleware Pipeline Pattern
Implemented: 2025-11-02
System: Oculus API
Pattern: Composable data transformations at fence execution boundary
The Architecture
Middleware runs at L3.5 in the AST pipeline - after fence execution, before caching:
L1: Parse markdown → AST
L2: Resolve includes
L2.5: Substitute variables (${node.key})
L3: Execute fences → raw data
L3.5: Apply middleware transformations ← NEW!
L4: Cache results
L5: Render to markdown/JSONWhy L3.5 Matters
Running middleware after execution but before cache means:
- Raw data never hits cache (datetime objects serialized to ISO strings)
- Transformations are cached (don't re-extract tables on every render)
- Frontend gets clean data without knowing about middleware
- Multiple views possible (raw vs table) from same execution
The Implementation
Base Pattern
class Middleware:
def should_run(self, context: Dict[str, Any]) -> bool:
# Pattern matching on fence type
return fnmatch(context.get('fence_type', ''), self.fence_pattern)
def process(self, data: Any, context: Dict[str, Any]) -> Any:
# Transform: data_in → data_out
return transformed_dataPipeline Composition
pipeline = MiddlewarePipeline()
pipeline.register(DatetimeNormalizer()) # Fix serialization
pipeline.register(AWSResponseCleaner()) # Strip metadata
pipeline.register(TableExtractor()) # Extract tables
result['data'] = pipeline.process(result['data'], context)The Transformations
DatetimeNormalizer
Recursively converts Python datetime objects to ISO strings:
datetime(2025, 10, 30, 9, 17, 42) → "2025-10-30T09:17:42+00:00Z"AWSResponseCleaner
Strips AWS ResponseMetadata (pattern: aws:*)
TableExtractor
JMESPath-style array projection:
TableConfig:
array_path: Reservations[].Instances[]
columns:
InstanceId: InstanceId
State: State.Name
Type: InstanceTypeThe Breakthrough: Recursive Array Projection
The key insight was implementing proper recursive array projection for paths like Reservations[].Instances[]:
def _extract_path(self, data: Any, path: str) -> Any:
# When hitting [], apply remaining path to each element
if part == '[]':
if i + 1 < len(parts):
remaining_path = '.'.join(parts[i+1:])
results = []
for item in current:
result = self._extract_path(item, remaining_path)
if isinstance(result, list):
results.extend(result) # Flatten
else:
results.append(result)
return resultsThis allows:
- Traverse to array
- Project remaining path over each element
- Flatten nested results
- Return clean list
The Result
From (1000+ lines nested AWS JSON with 50+ fields per instance):
{
"Reservations": [{
"Instances": [{ ...50 fields... }]
}],
"ResponseMetadata": {...}
}To (clean 5-column table):
[
{
"InstanceId": "i-03cb619646738ce3f",
"State": "running",
"Type": "m6g.2xlarge",
"IP": "10.241.61.200",
"LaunchTime": "2025-10-30T09:17:42+00:00Z"
}
]Design Principles
- Composable - Chain transformations in sequence
- Conditional - Pattern matching determines execution
- Safe - Errors don't break pipeline, return original data
- Extensible - Add new middleware without modifying core
- Cached - Transformations cached, not re-executed
Future: Graph Nodes as Middleware
The interface was designed so middleware can later be:
- Python classes (now)
- Graph nodes (future)
- Any callable with signature:
fn(data, context) → data
This enables dynamic middleware defined in Oculus nodes rather than Python code.
The Pattern Name
Tabula Rasa - Creating a blank slate from complex data by progressively removing noise until only essential information remains.
Achievement: Complex nested AWS responses → Clean tabular data
Method: Composable transformations at execution boundary
Impact: Infrastructure that makes everyone using it more effective
Slots
North
slots: []South
slots: []East
slots: []West
slots:
- pattern-graphnode-anatomy