Skip to content

Commit

Permalink
Trading via Binance (#28)
Browse files Browse the repository at this point in the history
* binance orders + refactoring

* refactoring

* fix linting

* bracket order
  • Loading branch information
idanya authored Jan 20, 2023
1 parent 4c0aeda commit 6f928c8
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 43 deletions.
6 changes: 6 additions & 0 deletions src/algotrader/entities/order_direction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import Enum


class OrderDirection(Enum):
Buy = 1
Sell = 2
8 changes: 7 additions & 1 deletion src/algotrader/pipeline/sources/binance_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def __init__(self, binance_provider: BinanceProvider, symbols: List[str], time_s
self.time_span = time_span
self.queue = Queue()

self._last_received_candle: Dict[str, Candle] = {}

def read(self) -> Iterator[Candle]:
for symbol in self.symbols:
self.binance_provider.start_kline_socket(symbol, self.time_span, self._on_candle)
Expand All @@ -23,7 +25,11 @@ def read(self) -> Iterator[Candle]:
yield self.queue.get()

def _on_candle(self, candle: Candle):
self.queue.put(candle)
if candle.symbol in self._last_received_candle and \
candle.timestamp > self._last_received_candle[candle.symbol].timestamp:
self.queue.put(self._last_received_candle[candle.symbol])

self._last_received_candle[candle.symbol] = candle

def serialize(self) -> Dict:
obj = super().serialize()
Expand Down
32 changes: 24 additions & 8 deletions src/algotrader/pipeline/strategies/history_bucket_compare.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from datetime import datetime
from typing import List, Dict

Expand All @@ -14,22 +15,28 @@
class HistoryBucketCompareStrategy(Strategy):

def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime, timeframe_end: datetime,
indicators_to_compare: List[str], return_field: str, min_event_count: int,
indicators_to_compare: List[str], return_fields: List[str], min_event_count: int,
min_avg_return: float) -> None:
self.timeframe_start = timeframe_start
self.timeframe_end = timeframe_end
self.indicators_to_compare = indicators_to_compare
self.storage_provider = storage_provider
self.indicators_to_compare = indicators_to_compare
self.return_field = return_field
self.return_fields = return_fields
self.min_event_count = min_event_count
self.min_avg_return = min_avg_return

groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare]
return_field = f'attachments.returns.{return_field}'
return_fields = [f'attachments.returns.{return_field}' for return_field in self.return_fields]

self.matchers = storage_provider.get_aggregated_history(timeframe_start, timeframe_end, groupby_fields,
return_field, min_event_count, min_avg_return)
self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start,
timeframe_end,
groupby_fields,
return_fields,
min_event_count,
min_avg_return)

logging.info(f'Found {len(self.long_matchers)} long matchers and {len(self.short_matchers)} short matchers')

def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]:
indicators_buckets: IndicatorsMatchedBuckets = \
Expand All @@ -43,7 +50,7 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal
candle_buckets_map[f'attachments.indicators_matched_buckets.{indicator}.ident'] = indicators_buckets.get(
indicator).ident

for matcher in self.matchers:
for matcher in self.long_matchers:
match = True
for candle_ind, candle_val in candle_buckets_map.items():
if matcher[candle_ind] != candle_val:
Expand All @@ -52,6 +59,15 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal
if match:
return [StrategySignal(candle.symbol, SignalDirection.Long)]

for matcher in self.short_matchers:
match = True
for candle_ind, candle_val in candle_buckets_map.items():
if matcher[candle_ind] != candle_val:
match = False

if match:
return [StrategySignal(candle.symbol, SignalDirection.Short)]

return []

def serialize(self) -> Dict:
Expand All @@ -61,7 +77,7 @@ def serialize(self) -> Dict:
'timeframe_start': self.timeframe_start,
'timeframe_end': self.timeframe_end,
'indicators_to_compare': self.indicators_to_compare,
'return_field': self.return_field,
'return_fields': self.return_fields,
'min_event_count': self.min_event_count,
'min_avg_return': self.min_avg_return,
})
Expand All @@ -72,5 +88,5 @@ def deserialize(cls, data: Dict):
storage_provider: StorageProvider = DeserializationService.deserialize(data.get('storage_provider'))

return cls(storage_provider, data.get('timeframe_start'), data.get('timeframe_end'),
data.get('indicators_to_compare'), data.get('return_field'),
data.get('indicators_to_compare'), data.get('return_fields'),
data.get('min_event_count'), data.get('min_avg_return'))
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime,
groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare]
return_field = f'attachments.returns.{return_field}'

self.matchers = storage_provider.get_aggregated_history(timeframe_start, timeframe_end, groupby_fields,
return_field, min_event_count, min_avg_return)
self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start,
timeframe_end, groupby_fields,
return_field, min_event_count,
min_avg_return)

def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]:
indicators_buckets: IndicatorsMatchedBuckets = \
Expand All @@ -42,7 +44,7 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal

candle_values.append(indicators_buckets.get(indicator).ident)

for matcher in self.matchers:
for matcher in self.long_matchers:
matcher_values: list[int] = []
for indicator in self.indicators_to_compare:
matcher_values.append(matcher[f'attachments.indicators_matched_buckets.{indicator}.ident'])
Expand Down
16 changes: 12 additions & 4 deletions src/algotrader/pipeline/terminators/technicals_binner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
from algotrader.entities.candle import Candle
from algotrader.pipeline.processors.candle_cache import CandleCache
from algotrader.pipeline.processors.technicals import IndicatorValue
from algotrader.pipeline.processors.technicals_normalizer import NormalizedIndicators, NORMALIZED_INDICATORS_ATTACHMENT_KEY
from algotrader.pipeline.processors.technicals_normalizer import NormalizedIndicators, \
NORMALIZED_INDICATORS_ATTACHMENT_KEY
from algotrader.pipeline.shared_context import SharedContext
from algotrader.pipeline.terminator import Terminator


class TechnicalsBinner(Terminator):
def __init__(self, symbols: List[str], bins_count: int, output_file_path: str) -> None:
def __init__(self, symbols: List[str], bins_count: int, output_file_path: str,
outliers_removal_percentage: float = 0.05) -> None:
super().__init__()
self.outliers_removal_percentage = outliers_removal_percentage
self.symbols = symbols
self.output_file_path = output_file_path
self.values: Dict[str, List[IndicatorValue]] = {}
Expand All @@ -38,6 +41,9 @@ def _process_candle(self, candle: Candle):
normalized_indicators: NormalizedIndicators = candle.attachments.get_attachment(
NORMALIZED_INDICATORS_ATTACHMENT_KEY)

if not normalized_indicators:
return

for indicator, value in normalized_indicators.items():
if indicator not in self.values:
self.values[indicator] = []
Expand All @@ -59,7 +65,7 @@ def _calculate_bins(self):
def _get_single_float_bins(self, values: List[float]) -> List[Bucket]:
values.sort()

margins = int(len(values) * 0.05)
margins = int(len(values) * self.outliers_removal_percentage)
values = values[margins:len(values) - margins]

step_size = int(math.floor(len(values) / self.bins_count))
Expand All @@ -83,9 +89,11 @@ def serialize(self) -> Dict:
'symbols': self.symbols,
'bins_count': self.bins_count,
'output_file_path': self.output_file_path,
'outliers_removal_percentage': self.outliers_removal_percentage
})
return obj

@classmethod
def deserialize(cls, data: Dict):
return cls(data.get('symbols'), data.get('bins_count'), data.get('output_file_path'))
return cls(data.get('symbols'), data.get('bins_count'), data.get('output_file_path'),
data.get('outliers_removal_percentage'))
46 changes: 44 additions & 2 deletions src/algotrader/providers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@
from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient

from algotrader.entities.candle import Candle
from algotrader.entities.order_direction import OrderDirection
from algotrader.entities.serializable import Deserializable, Serializable
from algotrader.entities.timespan import TimeSpan

StreamedCandleCallback = Callable[[Candle], None]

PRODUCTION = 'https://api.binance.com'
TESTNET = 'https://testnet.binance.vision'


class BinanceProvider(Serializable, Deserializable):
logger = logging.getLogger('BinanceProvider')

def __init__(self, api_key: Optional[str] = '', api_secret: Optional[str] = '', enable_websocket: bool = False):
def __init__(self, api_key: Optional[str] = '', api_secret: Optional[str] = '',
enable_websocket: bool = False, testnet: bool = False):
self.api_key = api_key
self.api_secret = api_secret
self.enable_websocket = enable_websocket
self.client = Spot(api_key, api_secret)
self.client = Spot(api_key, api_secret, base_url=TESTNET if testnet else PRODUCTION)

self.wsManager = WebsocketClient()
if enable_websocket:
Expand Down Expand Up @@ -70,6 +75,35 @@ def _deserialize_candle(self, symbol: str, interval: TimeSpan, data: Dict) -> Ca

return Candle(symbol, interval, timestamp, open, close, high, low, volume)

def send_bracket_order(self, symbol: str, direction: OrderDirection, quantity: float,
triggering_price: float, position_entry_grace: float, spread: float,
time_in_force: str = 'GTC'):

grace_price = triggering_price * (1 + position_entry_grace) if direction == OrderDirection.BUY else \
triggering_price * (1 - position_entry_grace)

take_profit_price = triggering_price * (1 + spread) if direction == OrderDirection.BUY else \
triggering_price * (1 - spread)

stop_loss_price = triggering_price * (1 - spread) if direction == OrderDirection.BUY else \
triggering_price * (1 + spread)

side = self._direction_to_side(direction)
logging.info(f'Sending order for {symbol} {side} {quantity} at {grace_price}...')
order_response = self.client.new_order(symbol=symbol, side=side, type='LIMIT',
quantity=quantity, price=grace_price,
timeInForce=time_in_force)

logging.info(f'Order response: {order_response}')
if order_response['status'] == 'FILLED':
logging.info(f'Order filled, sending take profit and stop loss... '
f'take profit: {take_profit_price}, stop loss: {stop_loss_price}')

opposite_side = self._direction_to_opposite_side(direction)
self.client.new_oco_order(symbol=symbol, side=opposite_side, quantity=quantity, price=take_profit_price,
stopPrice=stop_loss_price, time_in_force='GTC')
return order_response

def get_symbol_history(self, symbol: str, interval: TimeSpan, start_time: datetime,
end_time: datetime = datetime.now()) -> List[Candle]:
self.logger.info(f'Getting {symbol} history from {start_time} to {end_time}...')
Expand Down Expand Up @@ -111,6 +145,14 @@ def deserialize(cls, data: Dict):
def _timestamp_to_datetime(timestamp: int) -> datetime:
return datetime.fromtimestamp(timestamp / 1000)

@staticmethod
def _direction_to_side(direction: OrderDirection) -> str:
return 'BUY' if direction == OrderDirection.Buy else 'SELL'

@staticmethod
def _direction_to_opposite_side(direction: OrderDirection) -> str:
return 'SELL' if direction == OrderDirection.Buy else 'BUY'

@staticmethod
def _timespan_to_interval(timespan: TimeSpan) -> str:
if timespan == TimeSpan.Second:
Expand Down
13 changes: 9 additions & 4 deletions src/algotrader/storage/inmemory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ def __init__(self) -> None:
self.candles: Dict[str, List[Candle]] = {}

def get_symbol_candles(self, symbol: str, time_span: TimeSpan, from_timestamp: datetime,
to_timestamp: datetime) -> List[Candle]:
to_timestamp: datetime, limit: int = 0) -> List[Candle]:

if symbol not in self.candles:
return []

return list(filter(lambda candle:
candle.time_span == time_span and
from_timestamp <= candle.timestamp <= to_timestamp, self.candles[symbol]))
results = list(filter(lambda candle:
candle.time_span == time_span and
from_timestamp <= candle.timestamp <= to_timestamp, self.candles[symbol]))

if limit > 0:
return results[:limit]

return results

def get_candles(self, time_span: TimeSpan, from_timestamp: datetime, to_timestamp: datetime) -> List[Candle]:

Expand Down
Loading

0 comments on commit 6f928c8

Please sign in to comment.