lantern

mw-forecast

Middleware: Forecast Generator

Generates statistical forecasts using configurable methods. Takes trend-adjusted data and projects forward.

Config [mw-forecast-config]

# Input: expects 'data' array from previous pipeline step (trend-calculated)
# Each record should have: item_id, location_id, week, adjusted_quantity, trend_pct, category
method: exponential_smoothing
alpha: 0.3
horizon_weeks: 4
confidence_level: 0.80
ttl_seconds: 1800

Fetch [mw-forecast-fetch]

import json
import math
import re
from datetime import datetime
from collections import defaultdict

# Config is unified: node config + pipeline params + session context
# PipelineProvider loads and merges these automatically
method = config.get('method', 'exponential_smoothing')
alpha = config.get('alpha', 0.3)
horizon = config.get('horizon_weeks', 4)
confidence = config.get('confidence_level', 0.80)

# Input data comes from previous pipeline step (accumulated context)
input_data = config.get('data', [])
if not input_data:
    raise ValueError("No input data - mw-forecast requires 'data' from previous pipeline step")

data = input_data

# Group by item-location
grouped = defaultdict(list)
for record in data:
    key = record.get('item_location_key', f"{record.get('item_id', '')}@{record.get('location_id', '')}")
    grouped[key].append(record)

forecasts = []
for key, records in grouped.items():
    sorted_records = sorted(records, key=lambda x: x.get('week', ''))
    if not sorted_records:
        continue
    
    quantities = [r.get('adjusted_quantity', r.get('quantity', 0)) for r in sorted_records]
    last_week = sorted_records[-1].get('week', '2026-W04')
    trend_pct = sorted_records[-1].get('trend_pct', 0)
    seasonality_factor = sorted_records[-1].get('seasonality_factor', 1.0)
    
    # Calculate base forecast
    if method == 'exponential_smoothing':
        smoothed = quantities[0]
        for q in quantities[1:]:
            smoothed = alpha * q + (1 - alpha) * smoothed
        base_forecast = smoothed
    elif method == 'moving_average':
        ma_period = min(4, len(quantities))
        base_forecast = sum(quantities[-ma_period:]) / ma_period
    else:
        base_forecast = quantities[-1] if quantities else 0
    
    trend_multiplier = 1 + (trend_pct / 100) / 52
    
    if len(quantities) >= 2:
        errors = [abs(quantities[i] - quantities[i-1]) for i in range(1, len(quantities))]
        std_error = math.sqrt(sum(e**2 for e in errors) / len(errors)) if errors else base_forecast * 0.1
    else:
        std_error = base_forecast * 0.15
    
    z_scores = {0.80: 1.28, 0.90: 1.645, 0.95: 1.96}
    z = z_scores.get(confidence, 1.28)
    
    last_record = sorted_records[-1]
    item_id = last_record.get('item_id', '')
    location_id = last_record.get('location_id', '')
    
    week_match = re.search(r'(\d+)-W(\d+)', last_week)
    year = int(week_match.group(1)) if week_match else 2026
    week_num = int(week_match.group(2)) if week_match else 4
    
    for h in range(1, horizon + 1):
        forecast_week = week_num + h
        forecast_year = year
        if forecast_week > 52:
            forecast_week -= 52
            forecast_year += 1
        
        projected = base_forecast * (trend_multiplier ** h)
        forecast_qty = round(projected * seasonality_factor)
        lower = round(max(0, forecast_qty - z * std_error * math.sqrt(h)))
        upper = round(forecast_qty + z * std_error * math.sqrt(h))
        
        forecasts.append({
            'item_id': item_id,
            'location_id': location_id,
            'item_location_key': key,
            'week': f"{forecast_year}-W{str(forecast_week).zfill(2)}",
            'forecast_quantity': forecast_qty,
            'lower_bound': lower,
            'upper_bound': upper,
            'confidence': confidence,
            'method': method,
            'trend_pct': trend_pct,
            'seasonality_factor': seasonality_factor,
            'horizon_period': h,
            'category': last_record.get('category', 'unknown')
        })

result = {
    'data': forecasts,
    'meta': {
        'generated_at': datetime.utcnow().isoformat(),
        'stage': 'forecast',
        'method': method,
        'horizon_weeks': horizon,
        'input_count': len(data),
        'forecast_count': len(forecasts),
        'unique_item_locations': len(grouped),
        'source': 'mw-forecast'
    }
}

print(json.dumps(result, indent=2))

❌ Fence Execution Error: No input data - mw-forecast requires 'data' from previous pipeline step Traceback (most recent call last): File "/app/oculus/providers/python_provider.py", line 468, in execute exec(fence_content, exec_globals, exec_locals) File "", line 17, in ValueError: No input data - mw-forecast requires 'data' from previous pipeline step

Output Contract

Generates new forecast records:

  • forecast_quantity: Point forecast
  • lower_bound / upper_bound: Confidence interval
  • confidence: Confidence level used
  • method: Forecast method
  • horizon_period: Weeks ahead (1, 2, 3, 4...)

North

slots:
- slug: mw-trend
  context:
  - Input from trend calculator
  - 'Pipeline flow: trend feeds into forecast generation'

South

slots:
- slug: mw-scenario
  context:
  - Next step - scenario modifiers
  - 'Pipeline flow: forecast feeds into scenario modifier'

Provenance

Document

  • Status: πŸ”΄ Unverified

Changelog

  • 2026-01-25 19:09: Node created by mcp - Creating forecast generator middleware - step 3 in pipeline
↑ northmw-trend
↓ southmw-scenario