From 6f928c82942deca03e8924857b623e5f391b9f2d Mon Sep 17 00:00:00 2001 From: Idan Yael Date: Fri, 20 Jan 2023 11:24:31 +0200 Subject: [PATCH] Trading via Binance (#28) * binance orders + refactoring * refactoring * fix linting * bracket order --- src/algotrader/entities/order_direction.py | 6 ++ .../pipeline/sources/binance_realtime.py | 8 ++- .../strategies/history_bucket_compare.py | 32 ++++++--- .../strategies/history_cosine_similarity.py | 8 ++- .../pipeline/terminators/technicals_binner.py | 16 +++-- src/algotrader/providers/binance.py | 46 ++++++++++++- src/algotrader/storage/inmemory_storage.py | 13 ++-- src/algotrader/storage/mongodb_storage.py | 65 ++++++++++++++----- src/algotrader/storage/storage_provider.py | 7 +- .../trade/simple_sum_signals_executor.py | 4 +- tests/integration/test_binance_provider.py | 4 ++ tests/unit/strategies/test_history_compare.py | 2 +- 12 files changed, 168 insertions(+), 43 deletions(-) create mode 100644 src/algotrader/entities/order_direction.py diff --git a/src/algotrader/entities/order_direction.py b/src/algotrader/entities/order_direction.py new file mode 100644 index 0000000..61bea07 --- /dev/null +++ b/src/algotrader/entities/order_direction.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class OrderDirection(Enum): + Buy = 1 + Sell = 2 diff --git a/src/algotrader/pipeline/sources/binance_realtime.py b/src/algotrader/pipeline/sources/binance_realtime.py index 92519a4..5dc1333 100644 --- a/src/algotrader/pipeline/sources/binance_realtime.py +++ b/src/algotrader/pipeline/sources/binance_realtime.py @@ -15,6 +15,8 @@ def __init__(self, binance_provider: BinanceProvider, symbols: List[str], time_s self.time_span = time_span self.queue = Queue() + self._last_received_candle: Dict[str, Candle] = {} + def read(self) -> Iterator[Candle]: for symbol in self.symbols: self.binance_provider.start_kline_socket(symbol, self.time_span, self._on_candle) @@ -23,7 +25,11 @@ def read(self) -> Iterator[Candle]: yield self.queue.get() def _on_candle(self, candle: Candle): - self.queue.put(candle) + if candle.symbol in self._last_received_candle and \ + candle.timestamp > self._last_received_candle[candle.symbol].timestamp: + self.queue.put(self._last_received_candle[candle.symbol]) + + self._last_received_candle[candle.symbol] = candle def serialize(self) -> Dict: obj = super().serialize() diff --git a/src/algotrader/pipeline/strategies/history_bucket_compare.py b/src/algotrader/pipeline/strategies/history_bucket_compare.py index 38d6fc5..8adc637 100644 --- a/src/algotrader/pipeline/strategies/history_bucket_compare.py +++ b/src/algotrader/pipeline/strategies/history_bucket_compare.py @@ -1,3 +1,4 @@ +import logging from datetime import datetime from typing import List, Dict @@ -14,22 +15,28 @@ class HistoryBucketCompareStrategy(Strategy): def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime, timeframe_end: datetime, - indicators_to_compare: List[str], return_field: str, min_event_count: int, + indicators_to_compare: List[str], return_fields: List[str], min_event_count: int, min_avg_return: float) -> None: self.timeframe_start = timeframe_start self.timeframe_end = timeframe_end self.indicators_to_compare = indicators_to_compare self.storage_provider = storage_provider self.indicators_to_compare = indicators_to_compare - self.return_field = return_field + self.return_fields = return_fields self.min_event_count = min_event_count self.min_avg_return = min_avg_return groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare] - return_field = f'attachments.returns.{return_field}' + return_fields = [f'attachments.returns.{return_field}' for return_field in self.return_fields] - self.matchers = storage_provider.get_aggregated_history(timeframe_start, timeframe_end, groupby_fields, - return_field, min_event_count, min_avg_return) + self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start, + timeframe_end, + groupby_fields, + return_fields, + min_event_count, + min_avg_return) + + logging.info(f'Found {len(self.long_matchers)} long matchers and {len(self.short_matchers)} short matchers') def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]: indicators_buckets: IndicatorsMatchedBuckets = \ @@ -43,7 +50,7 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal candle_buckets_map[f'attachments.indicators_matched_buckets.{indicator}.ident'] = indicators_buckets.get( indicator).ident - for matcher in self.matchers: + for matcher in self.long_matchers: match = True for candle_ind, candle_val in candle_buckets_map.items(): if matcher[candle_ind] != candle_val: @@ -52,6 +59,15 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal if match: return [StrategySignal(candle.symbol, SignalDirection.Long)] + for matcher in self.short_matchers: + match = True + for candle_ind, candle_val in candle_buckets_map.items(): + if matcher[candle_ind] != candle_val: + match = False + + if match: + return [StrategySignal(candle.symbol, SignalDirection.Short)] + return [] def serialize(self) -> Dict: @@ -61,7 +77,7 @@ def serialize(self) -> Dict: 'timeframe_start': self.timeframe_start, 'timeframe_end': self.timeframe_end, 'indicators_to_compare': self.indicators_to_compare, - 'return_field': self.return_field, + 'return_fields': self.return_fields, 'min_event_count': self.min_event_count, 'min_avg_return': self.min_avg_return, }) @@ -72,5 +88,5 @@ def deserialize(cls, data: Dict): storage_provider: StorageProvider = DeserializationService.deserialize(data.get('storage_provider')) return cls(storage_provider, data.get('timeframe_start'), data.get('timeframe_end'), - data.get('indicators_to_compare'), data.get('return_field'), + data.get('indicators_to_compare'), data.get('return_fields'), data.get('min_event_count'), data.get('min_avg_return')) diff --git a/src/algotrader/pipeline/strategies/history_cosine_similarity.py b/src/algotrader/pipeline/strategies/history_cosine_similarity.py index bf2bf17..447489a 100644 --- a/src/algotrader/pipeline/strategies/history_cosine_similarity.py +++ b/src/algotrader/pipeline/strategies/history_cosine_similarity.py @@ -28,8 +28,10 @@ def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime, groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare] return_field = f'attachments.returns.{return_field}' - self.matchers = storage_provider.get_aggregated_history(timeframe_start, timeframe_end, groupby_fields, - return_field, min_event_count, min_avg_return) + self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start, + timeframe_end, groupby_fields, + return_field, min_event_count, + min_avg_return) def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]: indicators_buckets: IndicatorsMatchedBuckets = \ @@ -42,7 +44,7 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal candle_values.append(indicators_buckets.get(indicator).ident) - for matcher in self.matchers: + for matcher in self.long_matchers: matcher_values: list[int] = [] for indicator in self.indicators_to_compare: matcher_values.append(matcher[f'attachments.indicators_matched_buckets.{indicator}.ident']) diff --git a/src/algotrader/pipeline/terminators/technicals_binner.py b/src/algotrader/pipeline/terminators/technicals_binner.py index ecfd3e4..3d84e5b 100644 --- a/src/algotrader/pipeline/terminators/technicals_binner.py +++ b/src/algotrader/pipeline/terminators/technicals_binner.py @@ -9,14 +9,17 @@ from algotrader.entities.candle import Candle from algotrader.pipeline.processors.candle_cache import CandleCache from algotrader.pipeline.processors.technicals import IndicatorValue -from algotrader.pipeline.processors.technicals_normalizer import NormalizedIndicators, NORMALIZED_INDICATORS_ATTACHMENT_KEY +from algotrader.pipeline.processors.technicals_normalizer import NormalizedIndicators, \ + NORMALIZED_INDICATORS_ATTACHMENT_KEY from algotrader.pipeline.shared_context import SharedContext from algotrader.pipeline.terminator import Terminator class TechnicalsBinner(Terminator): - def __init__(self, symbols: List[str], bins_count: int, output_file_path: str) -> None: + def __init__(self, symbols: List[str], bins_count: int, output_file_path: str, + outliers_removal_percentage: float = 0.05) -> None: super().__init__() + self.outliers_removal_percentage = outliers_removal_percentage self.symbols = symbols self.output_file_path = output_file_path self.values: Dict[str, List[IndicatorValue]] = {} @@ -38,6 +41,9 @@ def _process_candle(self, candle: Candle): normalized_indicators: NormalizedIndicators = candle.attachments.get_attachment( NORMALIZED_INDICATORS_ATTACHMENT_KEY) + if not normalized_indicators: + return + for indicator, value in normalized_indicators.items(): if indicator not in self.values: self.values[indicator] = [] @@ -59,7 +65,7 @@ def _calculate_bins(self): def _get_single_float_bins(self, values: List[float]) -> List[Bucket]: values.sort() - margins = int(len(values) * 0.05) + margins = int(len(values) * self.outliers_removal_percentage) values = values[margins:len(values) - margins] step_size = int(math.floor(len(values) / self.bins_count)) @@ -83,9 +89,11 @@ def serialize(self) -> Dict: 'symbols': self.symbols, 'bins_count': self.bins_count, 'output_file_path': self.output_file_path, + 'outliers_removal_percentage': self.outliers_removal_percentage }) return obj @classmethod def deserialize(cls, data: Dict): - return cls(data.get('symbols'), data.get('bins_count'), data.get('output_file_path')) + return cls(data.get('symbols'), data.get('bins_count'), data.get('output_file_path'), + data.get('outliers_removal_percentage')) diff --git a/src/algotrader/providers/binance.py b/src/algotrader/providers/binance.py index b37d55d..ab13e05 100644 --- a/src/algotrader/providers/binance.py +++ b/src/algotrader/providers/binance.py @@ -6,20 +6,25 @@ from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient from algotrader.entities.candle import Candle +from algotrader.entities.order_direction import OrderDirection from algotrader.entities.serializable import Deserializable, Serializable from algotrader.entities.timespan import TimeSpan StreamedCandleCallback = Callable[[Candle], None] +PRODUCTION = 'https://api.binance.com' +TESTNET = 'https://testnet.binance.vision' + class BinanceProvider(Serializable, Deserializable): logger = logging.getLogger('BinanceProvider') - def __init__(self, api_key: Optional[str] = '', api_secret: Optional[str] = '', enable_websocket: bool = False): + def __init__(self, api_key: Optional[str] = '', api_secret: Optional[str] = '', + enable_websocket: bool = False, testnet: bool = False): self.api_key = api_key self.api_secret = api_secret self.enable_websocket = enable_websocket - self.client = Spot(api_key, api_secret) + self.client = Spot(api_key, api_secret, base_url=TESTNET if testnet else PRODUCTION) self.wsManager = WebsocketClient() if enable_websocket: @@ -70,6 +75,35 @@ def _deserialize_candle(self, symbol: str, interval: TimeSpan, data: Dict) -> Ca return Candle(symbol, interval, timestamp, open, close, high, low, volume) + def send_bracket_order(self, symbol: str, direction: OrderDirection, quantity: float, + triggering_price: float, position_entry_grace: float, spread: float, + time_in_force: str = 'GTC'): + + grace_price = triggering_price * (1 + position_entry_grace) if direction == OrderDirection.BUY else \ + triggering_price * (1 - position_entry_grace) + + take_profit_price = triggering_price * (1 + spread) if direction == OrderDirection.BUY else \ + triggering_price * (1 - spread) + + stop_loss_price = triggering_price * (1 - spread) if direction == OrderDirection.BUY else \ + triggering_price * (1 + spread) + + side = self._direction_to_side(direction) + logging.info(f'Sending order for {symbol} {side} {quantity} at {grace_price}...') + order_response = self.client.new_order(symbol=symbol, side=side, type='LIMIT', + quantity=quantity, price=grace_price, + timeInForce=time_in_force) + + logging.info(f'Order response: {order_response}') + if order_response['status'] == 'FILLED': + logging.info(f'Order filled, sending take profit and stop loss... ' + f'take profit: {take_profit_price}, stop loss: {stop_loss_price}') + + opposite_side = self._direction_to_opposite_side(direction) + self.client.new_oco_order(symbol=symbol, side=opposite_side, quantity=quantity, price=take_profit_price, + stopPrice=stop_loss_price, time_in_force='GTC') + return order_response + def get_symbol_history(self, symbol: str, interval: TimeSpan, start_time: datetime, end_time: datetime = datetime.now()) -> List[Candle]: self.logger.info(f'Getting {symbol} history from {start_time} to {end_time}...') @@ -111,6 +145,14 @@ def deserialize(cls, data: Dict): def _timestamp_to_datetime(timestamp: int) -> datetime: return datetime.fromtimestamp(timestamp / 1000) + @staticmethod + def _direction_to_side(direction: OrderDirection) -> str: + return 'BUY' if direction == OrderDirection.Buy else 'SELL' + + @staticmethod + def _direction_to_opposite_side(direction: OrderDirection) -> str: + return 'SELL' if direction == OrderDirection.Buy else 'BUY' + @staticmethod def _timespan_to_interval(timespan: TimeSpan) -> str: if timespan == TimeSpan.Second: diff --git a/src/algotrader/storage/inmemory_storage.py b/src/algotrader/storage/inmemory_storage.py index 6ff7280..6e55e62 100644 --- a/src/algotrader/storage/inmemory_storage.py +++ b/src/algotrader/storage/inmemory_storage.py @@ -13,14 +13,19 @@ def __init__(self) -> None: self.candles: Dict[str, List[Candle]] = {} def get_symbol_candles(self, symbol: str, time_span: TimeSpan, from_timestamp: datetime, - to_timestamp: datetime) -> List[Candle]: + to_timestamp: datetime, limit: int = 0) -> List[Candle]: if symbol not in self.candles: return [] - return list(filter(lambda candle: - candle.time_span == time_span and - from_timestamp <= candle.timestamp <= to_timestamp, self.candles[symbol])) + results = list(filter(lambda candle: + candle.time_span == time_span and + from_timestamp <= candle.timestamp <= to_timestamp, self.candles[symbol])) + + if limit > 0: + return results[:limit] + + return results def get_candles(self, time_span: TimeSpan, from_timestamp: datetime, to_timestamp: datetime) -> List[Candle]: diff --git a/src/algotrader/storage/mongodb_storage.py b/src/algotrader/storage/mongodb_storage.py index 9242019..600cd23 100644 --- a/src/algotrader/storage/mongodb_storage.py +++ b/src/algotrader/storage/mongodb_storage.py @@ -1,5 +1,7 @@ +import json +import logging from datetime import datetime -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Tuple import pymongo from pymongo import MongoClient @@ -44,17 +46,32 @@ def _ensure_connection(self): unique=True, background=True) def get_aggregated_history(self, from_timestamp: datetime, to_timestamp: datetime, groupby_fields: List[str], - return_field: str, min_count: int, min_avg: float) -> \ - List[Dict[str, int]]: + return_fields: List[str], min_count: int, min_return: float) -> \ + Tuple[List[Dict[str, int]], List[Dict[str, int]]]: self._ensure_connection() - pipeline = [ - self._generate_history_match_clause(from_timestamp, to_timestamp, groupby_fields + [return_field]), - self._generate_group_stage(groupby_fields, return_field), - self._generate_min_fields_match_stage(min_count, min_avg) + stage_and_match = [ + self._generate_history_match_clause(from_timestamp, to_timestamp, groupby_fields + return_fields), + self._generate_group_stage(groupby_fields, return_fields), ] + long_pipeline = stage_and_match + [ + self._generate_min_fields_match_stage_long(min_count, return_fields, min_return)] + + short_pipeline = stage_and_match + [ + self._generate_min_fields_match_stage_short(min_count, return_fields, min_return)] + + long_matches = self._run_and_parse_aggregate(long_pipeline) + short_matches = self._run_and_parse_aggregate(short_pipeline) + + return long_matches, short_matches + + def _run_and_parse_aggregate(self, pipeline: List): + self._ensure_connection() + + logging.info(f'Running pipeline: {json.dumps(pipeline, indent=4, sort_keys=True, default=str)}') + results = self.candles_collection.aggregate(pipeline, allowDiskUse=True) matches: List[Dict[str, int]] = [] @@ -65,18 +82,21 @@ def get_aggregated_history(self, from_timestamp: datetime, to_timestamp: datetim return matches @staticmethod - def _generate_history_match_clause(from_timestamp: datetime, to_timestamp: datetime, - fields: List[str]) -> object: + def _generate_history_match_clause(from_timestamp: datetime, to_timestamp: datetime, fields: List[str]) -> object: existing_fields_query = {field: {'$exists': True} for field in fields} existing_fields_query.update({'timestamp': {"$gte": from_timestamp, "$lte": to_timestamp}}) return {'$match': existing_fields_query} @staticmethod - def _generate_group_stage(groupby_fields: List[str], return_field: str) -> object: + def _generate_group_stage(groupby_fields: List[str], return_fields: List[str]) -> object: + avgs = {} + for return_field in return_fields: + avgs.update({MongoDBStorage._serialize_group_field_name(return_field): {'$avg': f'${return_field}'}}) + return { "$group": { "_id": {MongoDBStorage._serialize_group_field_name(field): f'${field}' for field in groupby_fields}, - "avg": {'$avg': f'${return_field}'}, + **avgs, "count": {"$sum": 1}, } } @@ -90,11 +110,26 @@ def _deserialize_group_field_name(field: str) -> str: return field.replace('*', '.') @staticmethod - def _generate_min_fields_match_stage(min_count: int, min_avg: float) -> object: + def _generate_min_fields_match_stage_long(min_count: int, return_fields: List[str], min_return: float) -> object: + min_returns = {'$or': [{MongoDBStorage._serialize_group_field_name(field): {'$gte': min_return}} for field in + return_fields]} + + return { + '$match': { + **min_returns, + "count": {'$gte': min_count}, + } + } + + @staticmethod + def _generate_min_fields_match_stage_short(min_count: int, return_fields: List[str], min_return: float) -> object: + min_returns = {'$or': [{MongoDBStorage._serialize_group_field_name(field): {'$lte': -min_return}} for field in + return_fields]} + return { '$match': { + **min_returns, "count": {'$gte': min_count}, - "avg": {'$gte': min_avg}, } } @@ -122,7 +157,7 @@ def _deserialize_candle(self, data: Dict) -> Candle: return Candle.deserialize(data) def get_symbol_candles(self, symbol: str, time_span: TimeSpan, - from_timestamp: datetime, to_timestamp: datetime) -> List[Candle]: + from_timestamp: datetime, to_timestamp: datetime, limit: int = 0) -> List[Candle]: self._ensure_connection() query = { @@ -132,7 +167,7 @@ def get_symbol_candles(self, symbol: str, time_span: TimeSpan, } return [self._deserialize_candle(candle) for candle in - self.candles_collection.find(query).sort("timestamp")] + self.candles_collection.find(query).sort("timestamp").limit(limit)] def get_candles(self, time_span: TimeSpan, from_timestamp: datetime, to_timestamp: datetime) -> List[Candle]: diff --git a/src/algotrader/storage/storage_provider.py b/src/algotrader/storage/storage_provider.py index ee9701d..8784f70 100644 --- a/src/algotrader/storage/storage_provider.py +++ b/src/algotrader/storage/storage_provider.py @@ -1,6 +1,6 @@ from abc import abstractmethod from datetime import datetime -from typing import List, Dict +from typing import List, Dict, Tuple from algotrader.entities.candle import Candle from algotrader.entities.serializable import Deserializable, Serializable @@ -14,7 +14,7 @@ def save(self, candle: Candle): @abstractmethod def get_symbol_candles(self, symbol: str, time_span: TimeSpan, from_timestamp: datetime, - to_timestamp: datetime) -> List[Candle]: + to_timestamp: datetime, limit: int) -> List[Candle]: pass @abstractmethod @@ -23,5 +23,6 @@ def get_candles(self, time_span: TimeSpan, from_timestamp: datetime, to_timestam @abstractmethod def get_aggregated_history(self, from_timestamp: datetime, to_timestamp: datetime, groupby_fields: List[str], - return_field: str, min_count: int, min_avg: float) -> List[Dict[str, int]]: + return_fields: List[str], min_count: int, min_return: float) -> \ + Tuple[List[Dict[str, int]], List[Dict[str, int]]]: pass diff --git a/src/algotrader/trade/simple_sum_signals_executor.py b/src/algotrader/trade/simple_sum_signals_executor.py index acedd99..833def9 100644 --- a/src/algotrader/trade/simple_sum_signals_executor.py +++ b/src/algotrader/trade/simple_sum_signals_executor.py @@ -13,8 +13,8 @@ def __init__(self) -> None: self.position: Dict[str, float] = {} self.cash = 0 - def _get_order_size(self, price: float) -> int: - return int(DEFAULT_ORDER_VALUE / price) + def _get_order_size(self, price: float) -> float: + return DEFAULT_ORDER_VALUE / price def execute(self, candle: Candle, signals: List[StrategySignal]): # close when there is no signal diff --git a/tests/integration/test_binance_provider.py b/tests/integration/test_binance_provider.py index 67eb308..aae3d34 100644 --- a/tests/integration/test_binance_provider.py +++ b/tests/integration/test_binance_provider.py @@ -18,6 +18,10 @@ class TestBinanceMarketProvider(TestCase): API_KEY = os.environ.get('BINANCE_API_KEY') API_SECRET = os.environ.get('BINANCE_API_SECRET') + def test_get_account(self): + provider = BinanceProvider(self.API_KEY, self.API_SECRET, False, testnet=True) + provider.account() + def test_get_symbol_history(self): provider = BinanceProvider(self.API_KEY, self.API_SECRET, False) diff --git a/tests/unit/strategies/test_history_compare.py b/tests/unit/strategies/test_history_compare.py index 3fa8ce2..7b4c3a7 100644 --- a/tests/unit/strategies/test_history_compare.py +++ b/tests/unit/strategies/test_history_compare.py @@ -44,7 +44,7 @@ def _check(signals: List[StrategySignal]): datetime.now() - timedelta(days=60), datetime.now(), indicators_to_compare=['sma5', 'sma20'], - return_field='ctc1', min_event_count=1, + return_fields=['ctc1'], min_event_count=1, min_avg_return=0.2) # TODO: FakeSignalsExecutor is not called when there is not signal. make sure to fail if it's not called.