class AdaptiveMomentumStrategy(bt.Strategy):
"""
Trading strategy based on adaptive composite momentum.
Entry:
- go long when momentum crosses above the selected signal average
- go short when momentum crosses below the selected signal average
Signal average selection:
On every bar the strategy tests candidate signal periods on a rolling
lookback window. It scores historical crossings by the forward return
after `score_horizon` bars and uses the period with the best score.
Risk:
Fixed percent stop-loss is applied to every position. Optional take
profit and trailing stop can be enabled through params.
"""
params = dict(
price_period=5,
momentum_periods=(8, 13, 21, 34),
endpoint_window=4,
pattern_window=3,
momentum_smooth_period=5,
signal_periods=(5, 8, 13, 21, 34),
score_lookback=160,
score_horizon=5,
min_score_trades=4,
stake=1,
allow_short=True,
reverse_on_signal=True,
stop_loss=0.02,
take_profit=None,
trailing_stop=None,
printlog=False,
)
def __init__(self) -> None:
self.momentum = AdaptiveCompositeMomentum(
self.data.close,
price_period=self.p.price_period,
momentum_periods=self.p.momentum_periods,
endpoint_window=self.p.endpoint_window,
pattern_window=self.p.pattern_window,
smooth_period=self.p.momentum_smooth_period,
)
self.order = None
self.entry_price: float | None = None
self.stop_price: float | None = None
self.take_price: float | None = None
self.best_signal: SignalChoice | None = None
max_signal_period = max(int(p) for p in self.p.signal_periods)
self._min_ready = (
max(self.p.momentum_periods)
+ int(self.p.endpoint_window)
+ int(self.p.pattern_window)
+ int(self.p.momentum_smooth_period)
+ int(self.p.score_lookback)
+ int(self.p.score_horizon)
+ max_signal_period
+ 5
)
def next(self) -> None:
if self.order:
return
if len(self) < self._min_ready:
return
self.best_signal = self._select_signal_average()
momentum_now = float(self.momentum[0])
signal_now = self.best_signal.value
if self._exit_by_risk():
return
crossed_up = self._crossed_up(momentum_now, signal_now, self.best_signal.period)
crossed_down = self._crossed_down(momentum_now, signal_now, self.best_signal.period)
if not self.position:
if crossed_up:
self.order = self.buy(size=self.p.stake)
elif crossed_down and self.p.allow_short:
self.order = self.sell(size=self.p.stake)
return
if self.position.size > 0 and crossed_down and self.p.reverse_on_signal:
target = -self.p.stake if self.p.allow_short else 0
self.order = self.order_target_size(target=target)
elif self.position.size < 0 and crossed_up and self.p.reverse_on_signal:
self.order = self.order_target_size(target=self.p.stake)
def notify_order(self, order: bt.Order) -> None:
if order.status in (order.Submitted, order.Accepted):
return
if order.status == order.Completed:
executed_price = float(order.executed.price)
if self.position.size > 0:
self.entry_price = float(self.position.price) or executed_price
self._set_risk_levels(direction=1)
self.log(f"LONG {self.entry_price:.5f}")
elif self.position.size < 0:
self.entry_price = float(self.position.price) or executed_price
self._set_risk_levels(direction=-1)
self.log(f"SHORT {self.entry_price:.5f}")
else:
self.entry_price = None
self.stop_price = None
self.take_price = None
self.log(f"FLAT {executed_price:.5f}")
if order.isbuy():
self.log(f"BUY fill {executed_price:.5f}")
elif order.issell():
self.log(f"SELL fill {executed_price:.5f}")
elif order.status in (order.Canceled, order.Margin, order.Rejected):
self.log(f"Order failed: {order.getstatusname()}")
self.order = None
def notify_trade(self, trade: bt.Trade) -> None:
if trade.isclosed:
self.log(f"TRADE PNL gross={trade.pnl:.2f}, net={trade.pnlcomm:.2f}")
if not self.position:
self.entry_price = None
self.stop_price = None
self.take_price = None
def _select_signal_average(self) -> SignalChoice:
choices = [
SignalChoice(period=int(period), value=self._signal_value(int(period), 0), score=self._score_signal_period(int(period)))
for period in self.p.signal_periods
]
return max(choices, key=lambda choice: choice.score)
def _score_signal_period(self, period: int) -> float:
start_ago = int(self.p.score_lookback) + int(self.p.score_horizon)
end_ago = int(self.p.score_horizon) + 1
score = 0.0
trades = 0
for ago in range(start_ago, end_ago, -1):
mom_prev = float(self.momentum[-ago - 1])
mom_now = float(self.momentum[-ago])
sig_prev = self._signal_value(period, ago + 1)
sig_now = self._signal_value(period, ago)
close_now = float(self.data.close[-ago])
close_future = float(self.data.close[-ago + int(self.p.score_horizon)])
if close_now == 0:
continue
forward_return = (close_future - close_now) / close_now
if mom_prev <= sig_prev and mom_now > sig_now:
score += forward_return
trades += 1
elif mom_prev >= sig_prev and mom_now < sig_now:
score -= forward_return
trades += 1
if trades < int(self.p.min_score_trades):
return float("-inf")
return score / trades
def _signal_value(self, period: int, ago: int) -> float:
values = [float(self.momentum[-ago - i]) for i in range(period)]
return sum(values) / period
def _crossed_up(self, momentum_now: float, signal_now: float, period: int) -> bool:
momentum_prev = float(self.momentum[-1])
signal_prev = self._signal_value(period, 1)
return momentum_prev <= signal_prev and momentum_now > signal_now
def _crossed_down(self, momentum_now: float, signal_now: float, period: int) -> bool:
momentum_prev = float(self.momentum[-1])
signal_prev = self._signal_value(period, 1)
return momentum_prev >= signal_prev and momentum_now < signal_now
def _set_risk_levels(self, direction: int) -> None:
if self.entry_price is None:
return
stop_loss = self.p.stop_loss
take_profit = self.p.take_profit
if stop_loss:
self.stop_price = self.entry_price * (1 - direction * float(stop_loss))
if take_profit:
self.take_price = self.entry_price * (1 + direction * float(take_profit))
def _exit_by_risk(self) -> bool:
if not self.position or self.entry_price is None:
return False
close = float(self.data.close[0])
if self.p.trailing_stop:
trail = float(self.p.trailing_stop)
if self.position.size > 0:
trail_stop = close * (1 - trail)
self.stop_price = max(self.stop_price or trail_stop, trail_stop)
else:
trail_stop = close * (1 + trail)
self.stop_price = min(self.stop_price or trail_stop, trail_stop)
if self.position.size > 0:
hit_stop = self.stop_price is not None and close <= self.stop_price
hit_take = self.take_price is not None and close >= self.take_price
else:
hit_stop = self.stop_price is not None and close >= self.stop_price
hit_take = self.take_price is not None and close <= self.take_price
if hit_stop or hit_take:
reason = "STOP" if hit_stop else "TAKE"
self.log(f"{reason} close={close:.5f}")
self.order = self.close()
return True
return False
def log(self, text: str) -> None:
if self.p.printlog:
date = self.data.datetime.date(0).isoformat()
print(f"{date} {text}")








