mw-forecast
Middleware: Forecast Generator
Generates statistical forecasts using configurable methods. Takes trend-adjusted data and projects forward.
Config [mw-forecast-config]
# Input: expects 'data' array from previous pipeline step (trend-calculated)
# Each record should have: item_id, location_id, week, adjusted_quantity, trend_pct, category
method: exponential_smoothing
alpha: 0.3
horizon_weeks: 4
confidence_level: 0.80
ttl_seconds: 1800Fetch [mw-forecast-fetch]
import json
import math
import re
from datetime import datetime
from collections import defaultdict
# Config is unified: node config + pipeline params + session context
# PipelineProvider loads and merges these automatically
method = config.get('method', 'exponential_smoothing')
alpha = config.get('alpha', 0.3)
horizon = config.get('horizon_weeks', 4)
confidence = config.get('confidence_level', 0.80)
# Input data comes from previous pipeline step (accumulated context)
input_data = config.get('data', [])
if not input_data:
raise ValueError("No input data - mw-forecast requires 'data' from previous pipeline step")
data = input_data
# Group by item-location
grouped = defaultdict(list)
for record in data:
key = record.get('item_location_key', f"{record.get('item_id', '')}@{record.get('location_id', '')}")
grouped[key].append(record)
forecasts = []
for key, records in grouped.items():
sorted_records = sorted(records, key=lambda x: x.get('week', ''))
if not sorted_records:
continue
quantities = [r.get('adjusted_quantity', r.get('quantity', 0)) for r in sorted_records]
last_week = sorted_records[-1].get('week', '2026-W04')
trend_pct = sorted_records[-1].get('trend_pct', 0)
seasonality_factor = sorted_records[-1].get('seasonality_factor', 1.0)
# Calculate base forecast
if method == 'exponential_smoothing':
smoothed = quantities[0]
for q in quantities[1:]:
smoothed = alpha * q + (1 - alpha) * smoothed
base_forecast = smoothed
elif method == 'moving_average':
ma_period = min(4, len(quantities))
base_forecast = sum(quantities[-ma_period:]) / ma_period
else:
base_forecast = quantities[-1] if quantities else 0
trend_multiplier = 1 + (trend_pct / 100) / 52
if len(quantities) >= 2:
errors = [abs(quantities[i] - quantities[i-1]) for i in range(1, len(quantities))]
std_error = math.sqrt(sum(e**2 for e in errors) / len(errors)) if errors else base_forecast * 0.1
else:
std_error = base_forecast * 0.15
z_scores = {0.80: 1.28, 0.90: 1.645, 0.95: 1.96}
z = z_scores.get(confidence, 1.28)
last_record = sorted_records[-1]
item_id = last_record.get('item_id', '')
location_id = last_record.get('location_id', '')
week_match = re.search(r'(\d+)-W(\d+)', last_week)
year = int(week_match.group(1)) if week_match else 2026
week_num = int(week_match.group(2)) if week_match else 4
for h in range(1, horizon + 1):
forecast_week = week_num + h
forecast_year = year
if forecast_week > 52:
forecast_week -= 52
forecast_year += 1
projected = base_forecast * (trend_multiplier ** h)
forecast_qty = round(projected * seasonality_factor)
lower = round(max(0, forecast_qty - z * std_error * math.sqrt(h)))
upper = round(forecast_qty + z * std_error * math.sqrt(h))
forecasts.append({
'item_id': item_id,
'location_id': location_id,
'item_location_key': key,
'week': f"{forecast_year}-W{str(forecast_week).zfill(2)}",
'forecast_quantity': forecast_qty,
'lower_bound': lower,
'upper_bound': upper,
'confidence': confidence,
'method': method,
'trend_pct': trend_pct,
'seasonality_factor': seasonality_factor,
'horizon_period': h,
'category': last_record.get('category', 'unknown')
})
result = {
'data': forecasts,
'meta': {
'generated_at': datetime.utcnow().isoformat(),
'stage': 'forecast',
'method': method,
'horizon_weeks': horizon,
'input_count': len(data),
'forecast_count': len(forecasts),
'unique_item_locations': len(grouped),
'source': 'mw-forecast'
}
}
print(json.dumps(result, indent=2))β Fence Execution Error: No input data - mw-forecast requires 'data' from previous pipeline step Traceback (most recent call last): File "/app/oculus/providers/python_provider.py", line 468, in execute exec(fence_content, exec_globals, exec_locals) File "
", line 17, in ValueError: No input data - mw-forecast requires 'data' from previous pipeline step
Output Contract
Generates new forecast records:
forecast_quantity: Point forecastlower_bound/upper_bound: Confidence intervalconfidence: Confidence level usedmethod: Forecast methodhorizon_period: Weeks ahead (1, 2, 3, 4...)
North
slots:
- slug: mw-trend
context:
- Input from trend calculator
- 'Pipeline flow: trend feeds into forecast generation'South
slots:
- slug: mw-scenario
context:
- Next step - scenario modifiers
- 'Pipeline flow: forecast feeds into scenario modifier'Provenance
Document
- Status: π΄ Unverified
Changelog
- 2026-01-25 19:09: Node created by mcp - Creating forecast generator middleware - step 3 in pipeline
β northmw-trend
β southmw-scenario