Написал скрипт на Python, чтобы рассчитать доходность ETF WIMM (бывший VTBM).

Сравнил доходность ETF WIMM с официальной инфляцией.
На картинке голубые цифры — это доходность WIMM. К сожалению доходность отстает от инфляции за редким исключением.

Решил тут попробовать машинное обучение прикрутить к прогнозированию или построению стратегии.
Вот ссылка на colab:
_https://colab.research.google.com/drive/1Rw_kBYK12lxKPQZCX28nMVBCPb7gPI11?usp=sharing
В общем получилась какая-то фигня. Вероятность около 50% что вход на следующем баре будет прибыльным (начальные условия ТП=СЛ, размер равен размеру рендж бара).
Народ, а как отбирать бары на которых вероятность предсказания выше?
Написал скрипт, который переделывает тиковые данные в range заданной размерности.
Но есть нюанс, когда идет быстрый рынок, некоторые бары могут иметь одинаковое время открытия, что приводит к некоторому несоответствию range баров.

<code>"""
Скрипт из файлов с тиковыми данными делает файл с рандже барами
"""
import re
from datetime import datetime
from pathlib import *
import pandas as pd
def zero_hour(cell):
""" Функция преобразует время (с финама приходят часы без нулей (с марта 2021), которые pandas не воспринимает)"""
cell = f'{int(cell)}'
tmp_time = datetime.strptime(cell, "%H%M%S")
return tmp_time.strftime("%H%M%S")
def run(tick_files: list[Path], razmer: int, target_dir: Path):
for ind_file, tick_file in enumerate(tick_files, start=1): # Итерация по тиковым файлам
list_split = re.split('_', tick_file.name, maxsplit=0) # Разделение имени файла по '_'
tiker = list_split[0] # Получение тикера из имени файла
date_quote_file = re.findall(r'\d+', str(tick_file)) # Получение цифр из пути к файлу
target_name = f'{tiker}_range{razmer}_{date_quote_file[0]}.txt' # Создание имени новому файлу
target_file_range: Path = Path(target_dir / target_name) # Составление пути к новому файлу
if Path.is_file(target_file_range):
print(f'Файл уже существует {target_file_range}')
continue
else:
df_ticks_file: pd = pd.read_csv(tick_file, delimiter=',') # Считываем тиковые данные в DF
# Создание DF под рандже бары одного тикового файла
df: pd = pd.DataFrame(columns='<DATE> <TIME> <OPEN> <HIGH> <LOW> <CLOSE> <VOL>'.split(' '))
for tick in df_ticks_file.itertuples(): # Итерация по строкам тикового DF
print('\rCompleted file: {:.2f}%. Completed files: {:.2f}%'.format(
tick[0] * 100 / len(df_ticks_file.index),
ind_file * 100 / len(tick_files)
),
end=''
)
if tick[0] == 0:
# Добавление строки в DF с рандже барами
df.loc[len(df.index)] = [int(tick[1]), int(tick[2]), tick[3], tick[3], tick[3], tick[3], tick[4]]
continue
# Если бар сформирован по размеру возрастающий бар
if df.loc[len(df.index) - 1, '<LOW>'] + razmer < tick[3]:
df.loc[len(df.index) - 1, '<CLOSE>'] = df.loc[len(df.index) - 1, '<LOW>'] + razmer
df.loc[len(df.index) - 1, '<HIGH>'] = df.loc[len(df.index) - 1, '<CLOSE>']
# Добавление строки в DF с дельта барами
df.loc[len(df.index)] = [int(tick[1]), int(tick[2]), tick[3], tick[3], tick[3], tick[3], tick[4]]
continue
# break
# Если бар сформирован по размеру падающий бар
if df.loc[len(df) - 1, '<HIGH>'] - razmer > tick[3]:
df.loc[len(df) - 1, '<CLOSE>'] = df.loc[len(df) - 1, '<HIGH>'] - razmer
df.loc[len(df) - 1, '<LOW>'] = df.loc[len(df) - 1, '<CLOSE>']
# Добавление строки в DF с дельта барами
df.loc[len(df.index)] = [int(tick[1]), int(tick[2]), tick[3], tick[3], tick[3], tick[3], tick[4]]
continue
# break
# Заполняем(изменяем) последнюю строку DF с рандже баром --------------------------------------
# Записываем <CLOSE> --------------------------------------------------------------------------
df.loc[len(df.index) - 1, '<CLOSE>'] = tick[3] # Записываем последнюю цену как цену close бара
# Записываем <HIGH> ---------------------------------------------------------------------------
if float(tick[3]) > df.loc[len(df) - 1, '<HIGH>']: # Если цена последнего тика больше чем high
df.loc[len(df.index) - 1, '<HIGH>'] = tick[3] # Записываем цену последнего тика как high
# Записываем <LOW> ----------------------------------------------------------------------------
if float(tick[3]) < df.loc[len(df.index) - 1, '<LOW>']:
df.loc[len(df.index) - 1, '<LOW>'] = tick[3] # Записываем цену последней сделки как low
# Записываем <VOL> ----------------------------------------------------------------------------
df.loc[len(df.index) - 1, '<VOL>'] += tick[4] # Увеличиваем объем
# Изменение типа колонок
df[['<DATE>', '<TIME>', '<VOL>']] = df[['<DATE>', '<TIME>', '<VOL>']].astype(int)
# Преобразуем столбец <TIME>, где нужно добавив 0 перед часом
df['<TIME>'] = df.apply(lambda x: zero_hour(x['<TIME>']), axis=1)
df.to_csv(target_file_range, index=False) # Запись в файл для одного тикового файла
# break
if __name__ == "__main__":
razmer: int = 250
ticker: str = 'RTS'
year_tick: str = '2022'
source_dir_tick: Path = Path(f'c:/data_quote/data_finam_{ticker}_tick') # Путь к ресурсному каталогу
target_dir: Path = Path(f'c:/data_quote/data_prepare_{ticker}_range') # Путь к целевому каталогу
# Создание списка путей к файлам с тиками
tick_files: list[Path] = list(source_dir_tick.glob(f'*{year_tick}*.csv'))
run(tick_files, razmer, target_dir)
</code>Привет, Всем!
Хотел бы рассчитать размерность range баров, чтобы их количество в одном дне было бы как и количество 5 мин баров.
Написал скрипт но значения так разнятся. Может кто-то уже занимался таким вопросом?
Рассчитанное значение величины range в поле <RAZMER>

<code>"""
Для расчета рендж баров эквивалентных 5 мин
"""
from pathlib import *
import pandas as pd
import talib
def body(open: float, close: float) -> float:
return abs(close - open)
if __name__ == "__main__":
# 198 баров 5м в дне (с 7:00)
period: int = 198
source_file: Path = Path('c:\data_quote\data_finam_RTS_5m\SPFB.RTS_210301_220131.csv')
df: pd = pd.read_csv(source_file, delimiter=',') # Считываем тиковые данные в DF
# Преобразуем столбец <TIME>, где нужно добавив 0 перед часом
df['<BODY>'] = df.apply(lambda x: body(x['<OPEN>'], x['<CLOSE>']), axis=1)
df['<RAZMER>'] = talib.MA(df['<BODY>'], timeperiod=period, matype=0)
df_15: pd = df.loc[df['<TIME>'] == 150000] # Бары в 15:00
print(df_15.tail(20))
"""
Получается, что рендж бар для фьючерса RTS должен быть размером 250, чтобы количество баров в дне,
примерно совпадало с с количеством 5 мин баров.
"""</code>В продолжение предыдущей публикации, сделал графики на которых можно сравнить доходы по разным бумагам.
Интерпретацию результатов описал в предыдущей публикации.
Особенность получающегося графика — это коряво выглядит ось Х(годы начала инвестирования), пришлось отказаться от строкового формата, ради смещения новых столбиков.

Решил в Python протестировать стратегию «Купил и Держи». Причем захотелось посмотреть какой будет доход если инвестировать ежемесячно равные суммы в течении определенного периода.
Немного об интерпретации результатов:
1. Дивиденды не учитываются. Учитывается только курсовой рост.
2. Доход по стратегии показан напротив года начала инвестирования, хотя фактически он соответствует дате начала инвестирования + заданный период.
3. На картинках QQQ, $100 ежемесячно в течении 10 лет


Построение нестандартных графиков в Python при помощи библиотеки finplot.# В КВИКе запускаем луа-скрипт QuikLuaPython.lua
import socket
import threading
from datetime import datetime, timezone
import pandas as pd
import finplot as fplt
fplt.display_timezone = timezone.utc
class DeltaBar():
def __init__(self):
self.df = pd.DataFrame(columns='date_time open high low close delta delta_time_sec'.split(' '))
self.df.loc[len(self.df)] = [0, 0, 0, 0, 0, 0, 0]
def parser(self, parse):
if parse[0] == '1' and parse[1] == 'RIH1':
if abs(self.df.iloc[len(self.df) - 1]['delta']) >= 500:
self.df.loc[len(self.df)] = [0, 0, 0, 0, 0, 0, 0] # Добавляем строку в DF
self.df.iloc[len(self.df) - 1]['close'] = float(parse[4]) # Записываем последнюю цену как цену close бара
if self.df.iloc[len(self.df) - 1]['date_time'] == 0:
self.df.iloc[len(self.df) - 1]['date_time'] = \
datetime.strptime(f'{parse[7]} {parse[8][0:-1]}', "%d.%m.%Y %H:%M:%S.%f").replace(microsecond=0)
if self.df.iloc[len(self.df) - 1]['open'] == 0:
self.df.iloc[len(self.df) - 1]['open'] = float(parse[4])
if float(parse[4]) > self.df.iloc[len(self.df) - 1]['high']:
self.df.iloc[len(self.df) - 1]['high'] = float(parse[4])
if (float(parse[4]) < self.df.iloc[len(self.df) - 1]['low']) or \
(self.df.iloc[len(self.df) - 1]['low'] == 0):
self.df.iloc[len(self.df) - 1]['low'] = float(parse[4])
if parse[5] == '1026':
self.df.iloc[len(self.df) - 1]['delta'] += float(parse[6])
if parse[5] == '1025':
self.df.iloc[len(self.df) - 1]['delta'] -= float(parse[6])
self.df.iloc[len(self.df) - 1]['delta_time_sec'] = \
datetime.strptime(f'{parse[7]} {parse[8][0:-1]}', "%d.%m.%Y %H:%M:%S.%f") - \
self.df.iloc[len(self.df) - 1]['date_time']
self.df.iloc[len(self.df) - 1]['delta_time_sec'] = self.df.iloc[len(self.df) - 1]['delta_time_sec'].seconds
def service():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('127.0.0.1', 3587)) # Хост-этот компьютер, порт - 3587
while True:
res = sock.recv(2048).decode('utf-8')
if res == '<qstp>\n': # строка приходит от клиента при остановке луа-скрипта в КВИКе
break
else:
delta_bar.parser(res.split(' ')) # Здесь вызываете свой парсер. Для примера функция: parser (parse)
sock.close()
def update():
df = delta_bar.df
# Меняем индекс и делаем его типом datetime
df = df.set_index(pd.to_datetime(df['date_time'], format='%Y-%m-%d %H:%M:%S'))
# print(delta_bar.df)
# pick columns for our three data sources: candlesticks and TD
candlesticks = df['open close high low'.split()]
volumes = df['open close delta_time_sec'.split()]
if not plots:
# first time we create the plots
global ax
plots.append(fplt.candlestick_ochl(candlesticks))
plots.append(fplt.volume_ocv(volumes, ax=ax.overlay()))
else:
# every time after we just update the data sources on each plot
plots[0].update_data(candlesticks)
plots[1].update_data(volumes)
if __name__ == '__main__':
delta_bar = DeltaBar()
# Запускаем сервер в своем потоке
t = threading.Thread(name='service', target=service)
t.start()
plots = []
ax = fplt.create_plot('RIH1', init_zoom_periods=100, maximize=False)
update()
fplt.timer_callback(update, 2.0) # update (using synchronous rest call) every N seconds
fplt.show()


# -*- coding: utf-8 -*-
"""
Читает файл csv в DataFrame. Добавляет колонку с кодом свечи по Лиховидову.
Расчет (большой, средний, маленький) ведется по свечам тогоже времени за предшествующие дни.
Количество предшествующих дней выбирается. Нужно предусмотреть csv файл с большей историей чем start_date на day_delta
"""
import pandas as pd
import numpy as np
from pathlib import Path
class CandleCode:
def __init__(self, start_date, day_delta, dir_source, file_source):
self.start_date = start_date
self.day_delta = day_delta
self.df = pd.DataFrame()
self.dir_source = dir_source
self.file_source = file_source
def csv_to_df(self):
"""
Читает файл csv delimiter=';' в DataFrame
:param dir_source: Папка откуда берем csv файл для обработки
:param file_source: Исходный файл
:return:
"""
self.df = pd.read_csv(f'{self.dir_source}/{self.file_source}', delimiter=';') # Загружаем файл в DF
# Меняем индекс и делаем его типом datetime
self.df = self.df.set_index(pd.to_datetime(self.df['date_time'], format='%Y-%m-%d %H:%M:%S'))
# Удаляем колонку с датой и временем, т.к. дата и время у нас теперь в индексе
self.df = self.df.drop('date_time', axis=1)
def prev_df_to_dic_code(self, previous_df):
"""
Из DataFrame предшествующего расчетной свече создает словарь с перцентилями для расчета
(большой, средний, маленький) диапазон тела свечи и его теней.
:param previous_df: Получает аргументе DataFrame, с такимже временем свечей, предшествующий расчетной свече
:return: Возвращяет словарь перцентилей 33% и 66%
"""
percentile_dic = {} # Создаем пустой словарь в который будем писать перцентили
for index, row in previous_df.iterrows(): # Перебираем строки dataframe previous_df
if row['open'] > row['close']: # Свеча на понижение
previous_df.loc[index, 'shadow_high'] = row['high'] - row['open']
previous_df.loc[index, 'shadow_low'] = row['close'] - row['low']
previous_df.loc[index, 'candle_body'] = row['open'] - row['close']
else: # Свеча на повышение
previous_df.loc[index, 'shadow_high'] = row['high'] - row['close']
previous_df.loc[index, 'shadow_low'] = row['open'] - row['low']
previous_df.loc[index, 'candle_body'] = row['close'] - row['open']
percentile_dic['shadow_high_33'] = np.percentile(previous_df['shadow_high'], 33)
percentile_dic['shadow_high_66'] = np.percentile(previous_df['shadow_high'], 66)
percentile_dic['shadow_low_33'] = np.percentile(previous_df['shadow_low'], 33)
percentile_dic['shadow_low_66'] = np.percentile(previous_df['shadow_low'], 66)
percentile_dic['candle_body_33'] = np.percentile(previous_df['candle_body'], 33)
percentile_dic['candle_body_66'] = np.percentile(previous_df['candle_body'], 66)
return percentile_dic
def file_out(self, start, end, df_candle_code):
"""
Функция записывает результирующий DF в csv файл
:param start: Для имени выходного файла, начальная дата
:param end: Для имени выходного файла, конечная дата
:param df_candle_code: DataFrame который записываем в файл
:return:
"""
name_file_out = Path(f'{self.dir_source}/{self.file_source[:-4]}_{start}_{end}_lihovidov.csv')
df_candle_code.to_csv(name_file_out)
def run(self):
df_candle_code = self.df.copy() # Создаем копию DF, исключение предупреждений
# Срез DF в котором будет дополнительная колонка с кодами свечей
df_candle_code = df_candle_code.loc[self.start_date:]
df_candle_code['candle_code'] = np.nan # Создание дополнительного столбца и заполнение его NaN
for index, row in df_candle_code.iterrows(): # Перебираем строки dataframe df_candle_code
print()
print(index)
delta_day = pd.to_timedelta(f'{self.day_delta} days') # Преобразование типа
start_previous_df = index.date() - delta_day # Вычисляем начальную дату DF
end_previous_df = index.date() - pd.to_timedelta('1 days') # Вычисляем конечную дату DF
# Создаем DF предшествующий текущей строке
previous_df = self.df.loc[start_previous_df.strftime("%Y-%m-%d"): end_previous_df.strftime("%Y-%m-%d")]
previous_df = previous_df.loc[index.time()] # Оставляем только строки соответствующие времени тек. строки
percentile_dic = self.prev_df_to_dic_code(previous_df) # Получаем словарь перцентилей
code_str = '' # Строка в которую будем собирать код для текущей свечи
# Свеча на понижение (медвежья)
if row['open'] > row['close']: # Свеча на понижение (медвежья)
code_str += '0'
# Для тела медвежьей свечи
if row['open'] - row['close'] > percentile_dic[
'candle_body_66']: # 00 - медвежья свеча с телом больших размеров
code_str += '00'
elif row['open'] - row['close'] > percentile_dic[
'candle_body_33']: # 01 - медвежья свеча с телом средних размеров
code_str += '01'
elif row['open'] - row['close'] > 0: # 10 - медвежья свеча с телом небольших размеров
code_str += '10'
# Для верхней тени медвежьей свечи
if row['high'] - row['open'] > percentile_dic['shadow_high_66']: # 11 - верхняя тень больших размеров
code_str += '11'
elif row['high'] - row['open'] > percentile_dic['shadow_high_33']: # 10 - верхняя тень средних размеров
code_str += '10'
elif row['high'] - row['open'] > 0: # 01 - верхняя тень небольших размеров
code_str += '01'
else: # 00 - верхняя тень отсутствует
code_str += '00'
# Для нижней тени медвежьей свечи
if row['close'] - row['low'] > percentile_dic['shadow_low_66']: # 00 - нижняя тень больших размеров
code_str += '00'
elif row['close'] - row['low'] > percentile_dic['shadow_low_33']: # 01 - нижняя тень средних размеров
code_str += '01'
elif row['close'] - row['low'] > 0: # 10 - нижняя тень небольших размеров
code_str += '10'
else: # 11 - нижняя тень отсутствует
code_str += '11'
# Свеча на повышение (бычья)
elif row['open'] < row['close']: # Свеча на повышение (бычья)
code_str += '1'
# Для тела бычьей свечи
if row['close'] - row['open'] > percentile_dic[
'candle_body_66']: # 11 - бычья свеча с телом больших размеров.
code_str += '11'
elif row['close'] - row['open'] > percentile_dic[
'candle_body_33']: # 10 - бычья свеча с телом средних размеров
code_str += '10'
elif row['close'] - row['open'] > 0: # 01 - бычья свеча с телом небольших размеров
code_str += '01'
# Для верхней тени бычьей свечи
if row['high'] - row['close'] > percentile_dic['shadow_high_66']: # 11 - верхняя тень больших размеров
code_str += '11'
elif row['high'] - row['close'] > percentile_dic[
'shadow_high_33']: # 10 - верхняя тень средних размеров
code_str += '10'
elif row['high'] - row['close'] > 0: # 01 - верхняя тень небольших размеров
code_str += '01'
else: # 00 - верхняя тень отсутствует
code_str += '00'
# Для нижней тени бычьей свечи
if row['open'] - row['low'] > percentile_dic['shadow_low_66']: # 00 - нижняя тень больших размеров
code_str += '00'
elif row['open'] - row['low'] > percentile_dic['shadow_low_33']: # 01 - нижняя тень средних размеров
code_str += '01'
elif row['open'] - row['low'] > 0: # 10 - нижняя тень небольших размеров
code_str += '10'
else: # 11 - нижняя тень отсутствует
code_str += '11'
# Дожи
else: # Дожи
if row['high'] - row['open'] > row['open'] - row['low']: # Верхняя тень больше, медвежий дожи
code_str += '011'
else: # Верхняя тень меньше, бычий дожи
code_str += '100'
# Для верхней тени дожи
if row['high'] - row['close'] > percentile_dic['shadow_high_66']: # 11 - верхняя тень больших размеров
code_str += '11'
elif row['high'] - row['close'] > percentile_dic[
'shadow_high_33']: # 10 - верхняя тень средних размеров
code_str += '10'
elif row['high'] - row['close'] > 0: # 01 - верхняя тень небольших размеров
code_str += '01'
else: # 00 - верхняя тень отсутствует
code_str += '00'
# Для нижней тени дожи
if row['open'] - row['low'] > percentile_dic['shadow_low_66']: # 00 - нижняя тень больших размеров
code_str += '00'
elif row['open'] - row['low'] > percentile_dic['shadow_low_33']: # 01 - нижняя тень средних размеров
code_str += '01'
elif row['open'] - row['low'] > 0: # 10 - нижняя тень небольших размеров
code_str += '10'
else: # 11 - нижняя тень отсутствует
code_str += '11'
df_candle_code.loc[[index], ['candle_code']] = int(code_str, 2)
print(int(code_str, 2))
self.file_out(df_candle_code.index[0].date(), df_candle_code.index[-1].date(), df_candle_code)
if __name__ == '__main__':
dir_source = 'c:/data_prepare_quote_csv' # Папка откуда берем csv файл для обработки
file_source = 'SPFB.RTS_5min.csv' # Исходный файл
start_date = '2020-09-01' # С какой даты будем строить DF с кодами свечей
day_delta = 365 # Дельта в днях для расчета показателей (большой, средний, маленький). Предшествует start_date
code = CandleCode(start_date, day_delta, dir_source, file_source)
code.csv_to_df()
code.run()

