Pipeline Overview¶
The pipeline module orchestrates the full portfolio construction workflow — from raw price data through preprocessing, optimization, validation, and rebalancing into a single function call. It composes sklearn-compatible transformers and skfolio optimizers into a unified Pipeline object that can be cross-validated, tuned, and serialized.
Architecture¶
The optimizer library follows a linear data-flow architecture:
The conversion from prices to returns happens outside the sklearn pipeline (it changes data semantics from levels to differences), while everything inside the brackets is a single sklearn.pipeline.Pipeline object.
┌─────────────────────────────────────────────────────────────┐
│ run_full_pipeline() │
│ │
│ prices ──→ prices_to_returns() ──→ returns DataFrame │
│ │ │
│ ┌─────────────────────────────┤ │
│ │ build_portfolio_pipeline() │ │
│ │ │ │
│ │ validate ──→ outliers ──→ impute │
│ │ ──→ SelectComplete ──→ DropZeroVariance │
│ │ ──→ DropCorrelated ──→ [SelectKExtremes] │
│ │ ──→ optimizer (skfolio) │
│ └─────────────────────────────┘ │
│ │ │
│ backtest (walk-forward CV) ←───────────┘ │
│ fit full data → final weights │
│ rebalancing check (if previous_weights) │
│ │
│ → PortfolioResult │
└──────────────────────────────────────────────────────────────┘
Why prices_to_returns() runs outside¶
The sklearn pipeline convention requires that fit(X) and transform(X) operate on the same kind of data. Price-to-return conversion changes the data semantics (levels become differences, one row is consumed), so it runs before pipeline construction. Inside the pipeline, every transformer receives and returns a return DataFrame.
Flattened pipeline for parameter access¶
build_portfolio_pipeline() flattens the pre-selection sub-pipeline steps into the top-level pipeline so that get_params() exposes all nested parameters for hyperparameter tuning:
from optimizer.pipeline import build_portfolio_pipeline
from optimizer.optimization import MeanRiskConfig, build_mean_risk
optimizer = build_mean_risk(MeanRiskConfig.for_max_sharpe())
pipeline = build_portfolio_pipeline(optimizer)
# All pre-selection + optimizer params are accessible
print(pipeline.get_params().keys())
# dict_keys(['validate__max_abs_return', 'outliers__winsorize_threshold',
# 'drop_correlated__threshold', 'optimizer__risk_measure', ...])
Core Functions¶
run_full_pipeline¶
The primary entry point. Converts prices to returns, builds the pipeline, optionally backtests, fits on the full dataset, and checks rebalancing thresholds:
from optimizer.optimization import MeanRiskConfig, build_mean_risk
from optimizer.pipeline import run_full_pipeline
from optimizer.validation import WalkForwardConfig
optimizer = build_mean_risk(MeanRiskConfig.for_max_sharpe())
result = run_full_pipeline(
prices=price_df,
optimizer=optimizer,
cv_config=WalkForwardConfig.for_quarterly_rolling(),
)
print(result.weights) # pd.Series: ticker → weight
print(result.summary) # dict: sharpe_ratio, max_drawdown, ...
print(result.backtest.sharpe_ratio) # out-of-sample Sharpe
Parameters:
| Parameter | Type | Description |
|---|---|---|
prices |
pd.DataFrame |
Price matrix (dates x tickers) |
optimizer |
skfolio optimizer | From any build_*() factory |
pre_selection_config |
PreSelectionConfig or None |
Data cleaning config |
sector_mapping |
dict[str, str] or None |
Ticker → sector for imputation |
cv_config |
WalkForwardConfig or None |
None skips backtesting |
previous_weights |
ndarray or None |
For rebalancing analysis |
rebalancing_config |
ThresholdRebalancingConfig / HybridRebalancingConfig or None |
Rebalancing strategy |
current_date |
pd.Timestamp or None |
For hybrid rebalancing |
last_review_date |
pd.Timestamp or None |
For hybrid rebalancing |
y_prices |
pd.DataFrame or None |
Benchmark/factor prices |
n_jobs |
int or None |
Parallel jobs for backtesting |
run_full_pipeline_with_selection¶
Extends run_full_pipeline with upstream stock selection. When fundamentals is provided, the function:
- Screens the universe for investability
- Computes and standardizes factor scores
- Applies macro regime tilts (optional)
- Computes composite score and selects stocks
- Delegates to
run_full_pipeline()on the selected tickers
from optimizer.pipeline import run_full_pipeline_with_selection
from optimizer.factors import SelectionConfig, CompositeScoringConfig
result = run_full_pipeline_with_selection(
prices=price_df,
optimizer=optimizer,
fundamentals=fundamentals_df,
volume_history=volume_df,
scoring_config=CompositeScoringConfig(),
selection_config=SelectionConfig(n_stocks=50),
cv_config=WalkForwardConfig.for_quarterly_rolling(),
)
When fundamentals=None, all selection steps are skipped and the function delegates directly to run_full_pipeline().
Lower-level composable functions¶
For more control, use the individual building blocks:
from optimizer.pipeline import optimize, backtest, tune_and_optimize, build_portfolio_pipeline
from skfolio.preprocessing import prices_to_returns
# Manual pipeline composition
X = prices_to_returns(prices)
pipeline = build_portfolio_pipeline(optimizer)
# Option 1: Just optimize (no backtest)
result = optimize(pipeline, X)
# Option 2: Backtest first, then optimize
bt = backtest(pipeline, X, cv_config=WalkForwardConfig.for_quarterly_rolling())
result = optimize(pipeline, X)
result.backtest = bt
# Option 3: Tune hyperparameters then optimize
result = tune_and_optimize(
pipeline, X,
param_grid={"optimizer__l2_coef": [0.0, 0.01, 0.1]},
)
PortfolioResult¶
All pipeline functions return a PortfolioResult dataclass:
| Field | Type | Description |
|---|---|---|
weights |
pd.Series |
Final asset weights (ticker → weight) |
portfolio |
skfolio Portfolio |
In-sample portfolio with .sharpe_ratio, .max_drawdown, .composition |
backtest |
MultiPeriodPortfolio / Population / None |
Out-of-sample results; None when backtesting was skipped |
pipeline |
sklearn Pipeline |
The fitted pipeline, reusable for predict() on new data |
summary |
dict[str, float] |
Key metrics: mean, annualized_mean, variance, standard_deviation, sharpe_ratio, sortino_ratio, max_drawdown, cvar |
rebalance_needed |
bool or None |
Whether drift exceeds thresholds; None when no previous weights |
turnover |
float or None |
One-way turnover vs previous weights |
Transaction Cost Deduction¶
For net-of-cost backtest analysis, use compute_net_backtest_returns:
from optimizer.pipeline import compute_net_backtest_returns
net_returns = compute_net_backtest_returns(
gross_returns=backtest_returns,
weight_changes=weight_change_df,
cost_bps=10.0, # 10 basis points per unit of turnover
)
Code Examples¶
Minimal pipeline¶
from optimizer.optimization import MeanRiskConfig, build_mean_risk
from optimizer.pipeline import run_full_pipeline
optimizer = build_mean_risk(MeanRiskConfig.for_min_variance())
result = run_full_pipeline(prices=prices, optimizer=optimizer)
print(result.weights)
With rebalancing¶
from optimizer.rebalancing import ThresholdRebalancingConfig
import numpy as np
result = run_full_pipeline(
prices=prices,
optimizer=optimizer,
previous_weights=np.array([0.25, 0.25, 0.25, 0.25]),
rebalancing_config=ThresholdRebalancingConfig(threshold=0.05),
)
print(f"Rebalance needed: {result.rebalance_needed}")
print(f"Turnover: {result.turnover:.4f}")
With stock selection¶
from optimizer.factors import SelectionConfig, CompositeScoringConfig
from optimizer.universe import InvestabilityScreenConfig
result = run_full_pipeline_with_selection(
prices=prices,
optimizer=optimizer,
fundamentals=fundamentals,
volume_history=volume,
investability_config=InvestabilityScreenConfig.for_developed_markets(),
scoring_config=CompositeScoringConfig(),
selection_config=SelectionConfig(n_stocks=50),
cv_config=WalkForwardConfig.for_quarterly_rolling(),
)
Hyperparameter tuning¶
from optimizer.pipeline import tune_and_optimize, build_portfolio_pipeline
from skfolio.preprocessing import prices_to_returns
X = prices_to_returns(prices)
pipeline = build_portfolio_pipeline(optimizer)
result = tune_and_optimize(
pipeline, X,
param_grid={
"optimizer__l2_coef": [0.0, 0.01, 0.1],
"drop_correlated__threshold": [0.90, 0.95],
},
)
print(f"Best params: {result.pipeline.get_params()}")
Gotchas and Tips¶
prices_to_returns() is not in the pipeline
The price-to-return conversion runs outside the sklearn pipeline. Do not add it as a pipeline step — it changes data dimensionality (drops one row) which breaks cross-validation fold alignment.
previous_weights alignment
When previous_weights is passed to run_full_pipeline(), the function auto-aligns them on the post-pre-selection universe and re-normalizes. If pre-selection drops assets, their previous weights are set to zero and the remainder is rescaled to sum to 1.
Benchmark returns via y_prices
For BenchmarkTracker or any model that requires fit(X, y), pass benchmark prices via y_prices. They are converted to returns alongside asset prices.
Sector mapping
Sector mapping is injected as a plain dict[str, str] (ticker → sector label), not queried from a database. Assets not in the mapping are assigned to an "__unmapped__" sector.
Quick Reference¶
| Task | Code |
|---|---|
| Basic optimization | run_full_pipeline(prices, optimizer) |
| With backtest | run_full_pipeline(prices, optimizer, cv_config=WalkForwardConfig()) |
| With rebalancing | run_full_pipeline(prices, optimizer, previous_weights=w, rebalancing_config=cfg) |
| With stock selection | run_full_pipeline_with_selection(prices, optimizer, fundamentals=df) |
| Manual pipeline | build_portfolio_pipeline(optimizer) then optimize(pipeline, X) |
| Tune + optimize | tune_and_optimize(pipeline, X, param_grid={...}) |
| Net-of-cost returns | compute_net_backtest_returns(gross, changes, cost_bps=10) |