Блог им. tranquility

Починка websockets-криптоконнектора как повод поговорить о парадигме конкурентного программирования (питон)

В общем, решил как-то я написать websockets коннектор к одной криптобирже на С++. Решил, что неплохо было бы найти работающий простеникий коннектор и адаптировать его под себя. На С++ вообще ничего вменяемого найти не получилось, зато нашел нечто на питоне:
github.com/Crypto-toolbox/hitbtc
Штука показалась годной и стал я ее переводить на С++… Кстати, весьма полезное занятие оказалось — узнал определенные вещи из современных стандартов С++11/14, т.к. без них переводить питоновский код — много, долго и грустно)) И вот, в какой-то момент я подумал, что неплохо было бы проверить, а коннектор питоновский, который я взял за образец — он-то вообще работает?? Оказалось, что нет) Пакет websocket для работы с соединениями за 2 года устрарел и не работает, например, вот в этом месте:
self.conn = websocket.WebSocketApp(
            self.url,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close
        )
пакет больше не экспортирует класс WebSocketApp, документацию вменяемую найти сразу не получилось и поэтому возникла потребность заменить websocket на что-то более актуальное. И это актуальное нашлось: websockets.readthedocs.io/en/stable/intro.html
Правда, в документации по этой ссылке прямо написано:

Are there onopen, onmessage, onerror, and onclose callbacks?

No, there aren’t.

websockets provides high-level, coroutine-based APIs. Compared to callbacks, coroutines make it easier to manage control flow in concurrent code.

If you prefer callback-based APIs, you should use another library.

Упс! А проект на обратных вызовах написан, с потоками, блокирующими очередями, таймерами… По этой ссылке пишут, что это все несовместимо с «конкурентным» кодом:
realpython.com/async-io-python/
Благо, всей этой радости (таймеров, потоков, блокирующих очередей) не так много в проекте и их в принципе можно выкинуть, если будет работать — потом уже реализовать этот функционал будет проще.
Собственно, что я и сделал (грубо и непринужденно):
github.com/pecec/hitbtc-ws-connector
Очень помогла информация с этой страницы:
stackoverflow.com/questions/35529754/python-async-websocket-client-with-async-timer?rq=1
Без нее мне бы пришлось еще долго «курить» то полотно с риалпайтон и разбираться в нюансах конкурентного программирования, ибо на странице библиотеки подходящих примеров нет. Если это «подходящий»:
websockets.readthedocs.io/en/stable/intro.html#common-patterns — уж извините тогда меня за мою недалекость))
В результате, имеем рабочий код, который просто качаем, распаковываем, идем в папку example, запускаем скрипт example.py и наслаждаемся его работой:
две корутины:
async def connectorControlThread( _connector ):
    print( 'counting to 20...' )
    for i in range( 1, 21 ):
        await asyncio.sleep(1)
        print( 'i=%d' % i )
    await _connector.subscribe_ticker( symbol='ETHBTC', cancel = True )
    await asyncio.sleep(2)
    await _connector.stop()
async def myCallback( _raw ):
    print( str(_raw) )
в течение ~20 секунд одновременно пишут в консоль, первая — приходящие тики в формате json, на которые была совершена подписка из основного потока скрипта, вторая — отсчитывает 20 секунд (в нее я бы поместил код своей торговой системы, только, вероятно еще надо корутин добавить, например, отдельную для работы с ордерами, балансами). Скрипт работает даже без прописывания своих ключей для логина на биржу в файл pubseckeys.txt — полагаю, они нужны для манипуляций с балансами и работы с ордерами. Но у меня с моими ключами авторизация завершается успешным ответом от сервера.
В общем, больше особо расписывать не буду, кому надо — разберется сам, а я если что всегда готов ответить на вопросы, в рамках моей компетенции, правда. Мне просто надо было убедиться, что питоновский код соединяется, логинится, подписывается и получает маркетдату. Теперь я уже законно могу хотеть того же от своего C++ кода.
Однако, после всего этого опыта меня интересует такой вопрос — каллбек, который я вынес за пределы объекта коннектора как-то противоречит концепции конкурентного программирования? Как тогда реализовывать код ТС без изменения кода коннектора? Через наследование и переопределение корутин, обменивающихся данными с сервером? Что-то такое не выглядит удобным… Если нет, зачем в документации к библиотеке авторы пишут, что она не дружит с каллбеками?

P.S. Данный код сырой, чтобы пускать его в «бой» с ним надо еще повозиться. Например — реализовать функцию переподключения, которую я сломал в процессе своей починки. По правде говоря, она и до этого была несколько ущербной: например, я так понял, что она вместе с переподпиской на маркетдату в той версии будет посылать старые ордера, которые сохраняются в ту же историю команд… Команда серверу «unsubscribe» не работает (наверняка не она одна), надо проверить параметры в документации к api hitbtc.
P.P.S. Что-то я начинаю беспокоиться за питон, чувствую, запомоят его всяким этим async/await порожняком так, что в четвертой версии без него вообще никакой серьеной программы написать не получится, в результате уже 3 семейства питонов будет 2.7, 3.7+ и 4.x. Прогресс-прогрессом, но когда тебя так вот за шкирку в него тащат — неприятно что ли как-то…
★1
> каллбек, который я вынес за пределы объекта коннектора как-то противоречит концепции конкурентного программирования?

Не противоречит, только зачем он async def, если внутри все вызовы синхронные?

> Если нет, зачем в документации к библиотеке авторы пишут, что она не дружит с каллбеками?

Тут имеются ввиду классические колбеки на каждый чих при работе с сетью, когда пишут не блокирующий код без async/await. Нет ничего страшного, особенно в питоне, в передаче функции или класса в качестве аргумента.

Код у вас ужасный (множеством run_until_complete вы фактически всю идею асинхронности хороните и превращаете в синхронный код) — такое впечатление, что вы не до конца понимаете всю эту async/await  машинерию. Никто вас по большому счету не заставляет лезть в светлое будущее — из стандартной библиотеки не убрали модули, которые позволяют писать по старинке — можете написать блокирующий код или не блокирующий код с колбеками. 
avatar

Михаил

Михаил, отличный развернутый ответ 
avatar

ipsnow

Михаил, 
> Не противоречит, только зачем он async def, если внутри все вызовы синхронные?

Рассчет на то, что в нем могут появиться синхронизирующие блоки await

>Код у вас ужасный (множеством run_until_complete вы фактически всю идею асинхронности хороните и превращаете в синхронный код) —

Так блок кода с соединением, логином, подпиской — и должен быть «синхронным», т.е. последовательным, не могу я подписаться раньше чем соединиться, правильно? Если просто убрать обертку run_until_complete над hitbtc.subscribe_ticker( symbol='ETHBTC' ) тем же, будет ошибка:
RuntimeWarning: coroutine 'HitBTC.subscribe_ticker' was never awaited
Чтобы ее исправить, надо убрать всю эту async/await мишуру далее вниз по стеку наследования и мы упремся в вызовы send/recv библиотеки websockets, которые async def и в результате придется возвращать async/await обратно. Т.е., на самом деле, непонятно ваше замечание. Не ходите сами с кодом повозиться, исправить то, что режет вам глаз?)

> такое впечатление, что вы не до конца понимаете всю эту async/await  машинерию.

У меня у самого такое впечатление. async def — блок кода, который может выполняться параллельно с другими async def. await — блок, который будет ожидать своей очереди пока в другом async def выполняется другой await, верно?

А что касается кода — большая его часть не моя, вообще-то. Но сама идея у него неплоха. Доведя его до ума, можно с любой криптобиржей работать — только модуль utils.py под каждую переписать — это чисто техническая работа. Я только просканировал фолиант на риалпайтон, пару примеров на стековерфлов — этого оказалось достаточным чтобы починить то что сломано до той меры, в какой мне бело необходимо. А ввиду того, что я сам в свое время (полгода назад где-то) не мог найти альтернативы Crypto-toolbox/hitbtc коннектору на питоне, кому-нибудь его работающая версия, в моем исполнении, будет полезна.

> Никто вас по большому счету не заставляет лезть в светлое будущее
ну как же? хочешь вебсокеты — а живой библиотеки с каллбеками еще поискать. Я же экстраполирую ситуацию на будущее.

>не убрали модули, которые позволяют писать по старинке
вот доживем до 4-го питона — еще посмотрим))
avatar

tranquility

tranquility, 

>Рассчет на то, что в нем могут появиться синхронизирующие блоки await

Мне сложно представить за вам нужны будут await — разумнее держать необходимые данные в памяти, а не тягать их постоянно с помощью долгих вызовов и await.

>Так блок кода с соединением, логином, подпиской — и должен быть «синхронным», т.е. последовательным, не могу я подписаться раньше чем соединиться, правильно? 

Так дождитесь их с помощью await. 

>Если просто убрать обертку run_until_complete над hitbtc.subscribe_ticker( symbol='ETHBTC' ) тем же, будет ошибка:

RuntimeWarning: coroutine 'HitBTC.subscribe_ticker' was never awaited. Чтобы ее исправить, надо убрать всю эту async/await мишуру далее вниз по стеку наследования и мы упремся в вызовы send/recv библиотеки websockets, которые async def и в результате придется возвращать async/await обратно. Т.е., на самом деле, непонятно ваше замечание. Не ходите сами с кодом повозиться, исправить то, что режет вам глаз?)

Судя по ошибке вы видимо делаете вызов корутин без await. run_until_complete относится к низкоуровневому API. Его не рекомендуют в большинстве случаев использовать, а тем более тягать для выполнения каждой корутины. 

Нужно вызывать все корутины с помощью await, а самую главную вызывать с помощью asyncio.run

Сделал pull request с этим куском. 

> У меня у самого такое впечатление. async def — блок кода, который может выполняться параллельно с другими async def. await — блок, который будет ожидать своей очереди пока в другом async def выполняется другой await, верно? 

Параллельно ничего не выполняется — все бежит реально в одном потоке. Просто происходит переключение на операциях, которые выполняются долго из-за i/o и поддерживают asyncio. 

>ну как же? хочешь вебсокеты — а живой библиотеки с каллбеками еще поискать. Я же экстраполирую ситуацию на будущее. вот доживем до 4-го питона — еще посмотрим))

Сокеты и потоки с процессами есть в стандартной библиотеке, а 4 питона даже в проекте нет, и как-то нет тенденции убирать какие-то модули при переходе от 2 к 3 версии. Не думаю, что такое будет и при переходе к гипотетической 4 версии.

avatar

Михаил

Михаил, 

>Сделал pull request с этим куском.

Во это дело! Спасибо! Конечно, код с вашими изменениями стал приятнее. Обновил в своем репозитории.
avatar

tranquility

каллбек, который я вынес за пределы объекта коннектора как-то противоречит концепции конкурентного программирования?

Не питонист, но уверен, что авторы имели ввиду не все коллбеки, а только те, что связаны с вводом/выводом. Можно сказать «коллбеки для асинхронности».  Сами же по себе коллбеки с асинхронностью никак не связаны, так что их использование рядом с корутинами и чем угодно — абсолютно нормально.
avatar

ipsnow

ipsnow, очень хорошее уточнение.
avatar

Михаил

На java, может тоже чего полезного подсмотрите:

github.com/knowm/XChange
github.com/bitrich-info/xchange-stream
avatar

Eugene Logunov

На Golang пиши. Синхронно.
avatar

ivanovr

ivanovr, так меня плюсы вполне устраивают — все что нужно для счастья есть там, даже лямбда-функции) А главное — прозрачность, распространенность (любой вопрос быстро гуглится, время на изучение учебников тратить больше не надо, по большенству вопросов) и железобетонная надежность. Ну да ладно, не думаю что стоит разводить тут спор какой язык лучше. Уж вебсокеты в го должны иметься, неплохой бонус для тех, кто на нем пишет.
avatar

tranquility


теги блога tranquility

....все тэги



2010-2020
UPDONW