Skip to content

Commit

Permalink
added technicals and optimizations (#29)
Browse files Browse the repository at this point in the history
* added technicals and optimizations

* formatting

* bbands normalize

* updated git ignore
  • Loading branch information
idanya authored Mar 2, 2023
1 parent 6f928c8 commit 456f5fc
Show file tree
Hide file tree
Showing 17 changed files with 87 additions and 45 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
_trial_temp/
.idea
.DS_Store
*.pyc
Expand Down
4 changes: 4 additions & 0 deletions src/algotrader/calc/calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ class TechnicalCalculation(Enum):
OBV = 'obv'
VAR = 'var'
VOSC = 'vosc'
STOCH = 'stoch'
FISHER = 'fisher'
AROONOSC = 'aroonosc'
BBANDS = 'bbands'
2 changes: 1 addition & 1 deletion src/algotrader/calc/technicals.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def fisher(self, period: int) -> List[float]:
return ti.fisher(self._highs, self._lows, period)

def aroonosc(self, period: int) -> List[float]:
if len(self._highs) < period:
if len(self._highs) <= period:
return []

return ti.aroonosc(self._highs, self._lows, period)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,17 @@
"ema20",
"correlation"
],
"return_field": "ctc1",
"return_fields": [
"ctc-1",
"ctc-2",
"ctc-3",
"ctc-4",
"ctc-5",
"ctc-6",
"ctc-7",
"ctc-8",
"ctc-9"
],
"min_event_count": 50,
"min_avg_return": 0.2
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@
"ZTS"
],
"bins_count": 10,
"output_file_path": "/Users/idanyael/personal-dev/algo-trader/src/algotrader/examples/pipeline-templates/bins.json"
"output_file_path": "/Users/idanyael/personal-dev/algo-trader/src/algotrader/examples/pipeline-templates/bins.json",
"outliers_removal_percentage": 0.05
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,8 @@
}
},
"processor": {
"returnsCount": 5
"returnsCount": 5,
"fieldPrefix": "ctc"
},
"terminator": null
}
4 changes: 3 additions & 1 deletion src/algotrader/pipeline/builders/backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def build_mongodb_history_buckets_backtester(bins_file_path: str) -> Pipeline:
data_from_time = STATIC_NOW - timedelta(days=365 * 3)
source = MongoDBSource(mongodb_storage, symbols, TimeSpan.Day, backtest_from_time, STATIC_NOW)

return_fields = [f'ctc-{i}' for i in range(1, 10)]

history_compare_strategy = HistoryBucketCompareStrategy(mongodb_storage,
data_from_time,
backtest_from_time,
Expand All @@ -51,7 +53,7 @@ def build_mongodb_history_buckets_backtester(bins_file_path: str) -> Pipeline:
'rsi7', 'rsi14',
'stddev5', 'ema5',
'ema20', 'correlation'],
return_field='ctc1', min_event_count=50,
return_fields=return_fields, min_event_count=50,
min_avg_return=0.2)

cache_processor = CandleCache()
Expand Down
2 changes: 1 addition & 1 deletion src/algotrader/pipeline/builders/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def build_returns_calculator(days_back: int = DEFAULT_DAYS_BACK) -> Pipeline:

sink = StorageSinkProcessor(mongodb_storage)
cache_processor = CandleCache(sink)
processor = ReturnsCalculatorProcessor(5, cache_processor)
processor = ReturnsCalculatorProcessor('ctc', 5, cache_processor)

return Pipeline(source, processor)

Expand Down
13 changes: 4 additions & 9 deletions src/algotrader/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging
from typing import Optional, Dict

from rich.progress import Progress, TextColumn, BarColumn

from algotrader.entities.serializable import Serializable, Deserializable
from algotrader.pipeline.processor import Processor
from algotrader.pipeline.shared_context import SharedContext
Expand Down Expand Up @@ -37,13 +35,10 @@ def deserialize(cls, data: Dict):
def run(self, context: SharedContext) -> None:
self.logger.info('Starting pipeline...')

with Progress(TextColumn('{task.completed} Candle(s) processed'), BarColumn()) as progress:
processing_task = progress.add_task("Processing", total=None)

for candle in self.source.read():
self.logger.debug('Processing candle: %s\r', candle.serialize())
self.processor.process(context, candle)
progress.update(processing_task, advance=1)
for candle in self.source.read():
self.logger.debug('Processing candle: %s\r', candle.serialize())
self.processor.process(context, candle)

if self.terminator:
self.logger.debug('initiating termination...')
self.terminator.terminate(context)
16 changes: 10 additions & 6 deletions src/algotrader/pipeline/processors/returns.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ class Returns(GenericCandleAttachment[float]):


class ReturnsCalculatorProcessor(Processor):
def __init__(self, returns_count: int, next_processor: Optional[Processor] = None):
def __init__(self, field_prefix: str, returns_count: int, next_processor: Optional[Processor] = None):
super().__init__(next_processor)
self.field_prefix = field_prefix
self.returns_count = returns_count

def process(self, context: SharedContext, candle: Candle):
cache_reader = CandleCache.context_reader(context)
symbol_candles = cache_reader.get_symbol_candles(candle.symbol) or []

if len(symbol_candles) > self.returns_count:
candle.attachments.add_attachement(RETURNS_ATTACHMENT_KEY, Returns())

if len(symbol_candles) >= self.returns_count:
candle_returns = self._calc_returns(candle, symbol_candles)
candle.attachments.add_attachement(RETURNS_ATTACHMENT_KEY, candle_returns)

Expand All @@ -36,16 +39,17 @@ def process(self, context: SharedContext, candle: Candle):

def _calc_returns(self, current_candle: Candle, candles: List[Candle]) -> Returns:
candle_returns = Returns()
for i in range(1, self.returns_count):
candle_returns.set(f'ctc{i}', (1 - current_candle.close / candles[-i].close) * 100)
for i in range(1, self.returns_count + 1):
candle_returns.set(f'{self.field_prefix}-{i}', (1 - current_candle.close / candles[-i].close) * 100)

return candle_returns

def serialize(self) -> Dict:
return {
'returnsCount': self.returns_count
'returnsCount': self.returns_count,
'fieldPrefix': self.field_prefix
}

@classmethod
def deserialize(cls, data: Dict) -> Optional[Processor]:
return cls(data['returnsCount'], cls._deserialize_next_processor(data))
return cls(data['fieldPrefix'], data['returnsCount'], cls._deserialize_next_processor(data))
4 changes: 3 additions & 1 deletion src/algotrader/pipeline/processors/technicals.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class Indicators(GenericCandleAttachment[IndicatorValue]):

Indicators()

MAX_CANDLES_FOR_CALC = 50


class TechnicalsProcessor(Processor):
"""
Expand All @@ -36,7 +38,7 @@ def __init__(self, config: TechnicalsProcessorConfig, next_processor: Optional[P
def process(self, context: SharedContext, candle: Candle):
cache_reader = CandleCache.context_reader(context)
symbol_candles = cache_reader.get_symbol_candles(candle.symbol) or []
calculator = TechnicalCalculator(symbol_candles + [candle])
calculator = TechnicalCalculator(symbol_candles[-MAX_CANDLES_FOR_CALC:] + [candle])

candle_indicators = Indicators()
self._calculate(calculator, candle_indicators)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class NormalizedIndicators(GenericCandleAttachment[IndicatorValue]):

NormalizedIndicators()

VWAP_NORMALIZE_PREFIXES = ['sma', 'ema']
VWAP_NORMALIZE_PREFIXES = ['sma', 'ema', 'typical', 'bbands']

NormalizeFunc = Callable[[List[Candle], IndicatorValue], IndicatorValue]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime,
self.min_avg_return = min_avg_return

groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare]
return_field = f'attachments.returns.{return_field}'
return_fields = [f'attachments.returns.ctc-{i}' for i in range(1, 20)]

self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start,
timeframe_end, groupby_fields,
return_field, min_event_count,
return_fields, min_event_count,
min_avg_return)

def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]:
Expand Down
4 changes: 4 additions & 0 deletions src/algotrader/storage/mongodb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def __init__(self, host: str = 'localhost', port: int = 27017, database: str = D
self.db: Optional[Database] = None
self.candles_collection: Optional[Collection] = None

def get_collection(self) -> Collection:
self._ensure_connection()
return self.candles_collection

def _ensure_connection(self):
if self.client:
return
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/strategies/test_history_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _check(signals: List[StrategySignal]):
datetime.now() - timedelta(days=60),
datetime.now(),
indicators_to_compare=['sma5', 'sma20'],
return_field='ctc1', min_event_count=1,
return_fields=['ctc1'], min_event_count=1,
min_avg_return=0.2)

# TODO: FakeSignalsExecutor is not called when there is not signal. make sure to fail if it's not called.
Expand Down Expand Up @@ -100,7 +100,7 @@ def _check(signals: List[StrategySignal]):
datetime.now(),
datetime.now(),
indicators_to_compare=['sma5', 'sma20'],
return_field='ctc1', min_event_count=1,
return_fields=['ctc1'], min_event_count=1,
min_avg_return=0.2)

# TODO: FakeSignalsExecutor is not called when there is not signal. make sure to fail if it's not called.
Expand Down
43 changes: 25 additions & 18 deletions tests/unit/test_returns_calculator_processor.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,43 @@
import random
from datetime import datetime
from datetime import datetime, timedelta
from unittest import TestCase

from algotrader.entities.candle import Candle
from algotrader.entities.timespan import TimeSpan
from fakes.pipeline_validators import ValidationProcessor
from fakes.source import FakeSource
from algotrader.pipeline.pipeline import Pipeline
from algotrader.pipeline.processors.candle_cache import CandleCache
from algotrader.pipeline.processors.returns import ReturnsCalculatorProcessor, RETURNS_ATTACHMENT_KEY, Returns
from algotrader.pipeline.processors.returns import ReturnsCalculatorProcessor, RETURNS_ATTACHMENT_KEY
from algotrader.pipeline.reverse_source import ReverseSource
from algotrader.pipeline.runner import PipelineRunner
from algotrader.pipeline.shared_context import SharedContext
from unit import generate_candle_with_price
from fakes.pipeline_validators import TerminatorValidator
from fakes.source import FakeSource
from unit import generate_candle_with_price, TEST_SYMBOL


class TestReturnsCalculatorProcessor(TestCase):
def setUp(self) -> None:
super().setUp()
self.source = FakeSource(
[generate_candle_with_price(TimeSpan.Day, datetime.now(), random.randint(1, c)) for c in range(1, 50)])
[generate_candle_with_price(TimeSpan.Day, datetime.now() + timedelta(minutes=c), c) for c in range(1, 50)])

def test(self):
def _check(context: SharedContext, candle: Candle):
def _check_returns(context: SharedContext):
self.assertIsNotNone(context)
context.put_kv_data('check_count', context.get_kv_data('check_count', 0) + 1)
check_count = context.get_kv_data('check_count', 0)
cache_reader = CandleCache.context_reader(context)
candles = cache_reader.get_symbol_candles(TEST_SYMBOL)

self.assertFalse(candles[0].attachments.get_attachment(RETURNS_ATTACHMENT_KEY).has('ctc-1'))
self.assertFalse(candles[1].attachments.get_attachment(RETURNS_ATTACHMENT_KEY).has('ctc-1'))
self.assertFalse(candles[2].attachments.get_attachment(RETURNS_ATTACHMENT_KEY).has('ctc-1'))

ctc1 = candles[3].attachments.get_attachment(RETURNS_ATTACHMENT_KEY)['ctc-1']
ctc2 = candles[3].attachments.get_attachment(RETURNS_ATTACHMENT_KEY)['ctc-2']
ctc3 = candles[3].attachments.get_attachment(RETURNS_ATTACHMENT_KEY)['ctc-3']
self.assertTrue(ctc1 < ctc2 < ctc3)

cache_processor = CandleCache()
processor = ReturnsCalculatorProcessor('ctc', 3, cache_processor)

if check_count > 6:
candle_returns: Returns = candle.attachments.get_attachment(RETURNS_ATTACHMENT_KEY)
self.assertTrue(candle_returns.has('ctc1'))
terminator = TerminatorValidator(_check_returns)

validator = ValidationProcessor(_check)
cache_processor = CandleCache(validator)
processor = ReturnsCalculatorProcessor(5, cache_processor)
PipelineRunner(Pipeline(self.source, processor)).run()
self.source = ReverseSource(self.source)
PipelineRunner(Pipeline(self.source, processor, terminator)).run()
11 changes: 11 additions & 0 deletions tests/unit/test_technicals_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def _check(context: SharedContext, candle: Candle):
IndicatorConfig('sma5', TechnicalCalculation.SMA, [5]),
IndicatorConfig('macd', TechnicalCalculation.MACD, [2, 5, 9]),
IndicatorConfig('cci7', TechnicalCalculation.CCI, [7]),
IndicatorConfig('arooosc', TechnicalCalculation.AROONOSC, [7]),
])

processor = TechnicalsProcessor(config, cache_processor)
Expand All @@ -69,9 +70,18 @@ def _check(context: SharedContext, candle: Candle):

normalized_indicators: NormalizedIndicators = candle.attachments.get_attachment(
NORMALIZED_INDICATORS_ATTACHMENT_KEY)

bbands5_value = indicators.get('bbands5')
normalized_bbands5_value = normalized_indicators.get('bbands5')

sma5_value = indicators.get('sma5')
normalized_sma5_value = normalized_indicators.get('sma5')

vwap = (candle.close + candle.high + candle.low) / candle.volume

for i in range(len(bbands5_value)):
self.assertTrue(bbands5_value[i] / vwap, normalized_bbands5_value[i])

self.assertTrue(sma5_value / vwap, normalized_sma5_value)

validator = ValidationProcessor(_check)
Expand All @@ -80,6 +90,7 @@ def _check(context: SharedContext, candle: Candle):

config = TechnicalsProcessorConfig([
IndicatorConfig('sma5', TechnicalCalculation.SMA, [5]),
IndicatorConfig('bbands5', TechnicalCalculation.BBANDS, [5]),
])

technicals = TechnicalsProcessor(config, technicals_normalizer)
Expand Down

0 comments on commit 456f5fc

Please sign in to comment.