From 456f5fc8c19cd0d39a8aece9a1b3c6b0ebceb42a Mon Sep 17 00:00:00 2001 From: Idan Yael Date: Thu, 2 Mar 2023 20:27:25 +0200 Subject: [PATCH] added technicals and optimizations (#29) * added technicals and optimizations * formatting * bbands normalize * updated git ignore --- .gitignore | 1 + src/algotrader/calc/calculations.py | 4 ++ src/algotrader/calc/technicals.py | 2 +- .../backtest_history_buckets_backtester.json | 12 +++++- ...st_technicals_with_buckets_calculator.json | 3 +- .../loader_simple_returns_calculator.json | 3 +- src/algotrader/pipeline/builders/backtest.py | 4 +- src/algotrader/pipeline/builders/loaders.py | 2 +- src/algotrader/pipeline/pipeline.py | 13 ++---- src/algotrader/pipeline/processors/returns.py | 16 ++++--- .../pipeline/processors/technicals.py | 4 +- .../processors/technicals_normalizer.py | 2 +- .../strategies/history_cosine_similarity.py | 4 +- src/algotrader/storage/mongodb_storage.py | 4 ++ tests/unit/strategies/test_history_compare.py | 4 +- .../unit/test_returns_calculator_processor.py | 43 +++++++++++-------- tests/unit/test_technicals_processor.py | 11 +++++ 17 files changed, 87 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index e408460..4bdcecf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +_trial_temp/ .idea .DS_Store *.pyc diff --git a/src/algotrader/calc/calculations.py b/src/algotrader/calc/calculations.py index 0522c3c..4ad8dbb 100644 --- a/src/algotrader/calc/calculations.py +++ b/src/algotrader/calc/calculations.py @@ -16,3 +16,7 @@ class TechnicalCalculation(Enum): OBV = 'obv' VAR = 'var' VOSC = 'vosc' + STOCH = 'stoch' + FISHER = 'fisher' + AROONOSC = 'aroonosc' + BBANDS = 'bbands' diff --git a/src/algotrader/calc/technicals.py b/src/algotrader/calc/technicals.py index 344dd3b..13fc01b 100644 --- a/src/algotrader/calc/technicals.py +++ b/src/algotrader/calc/technicals.py @@ -64,7 +64,7 @@ def fisher(self, period: int) -> List[float]: return ti.fisher(self._highs, self._lows, period) def aroonosc(self, period: int) -> List[float]: - if len(self._highs) < period: + if len(self._highs) <= period: return [] return ti.aroonosc(self._highs, self._lows, period) diff --git a/src/algotrader/examples/pipeline-templates/backtest_history_buckets_backtester.json b/src/algotrader/examples/pipeline-templates/backtest_history_buckets_backtester.json index 4108eaf..54527ab 100644 --- a/src/algotrader/examples/pipeline-templates/backtest_history_buckets_backtester.json +++ b/src/algotrader/examples/pipeline-templates/backtest_history_buckets_backtester.json @@ -547,7 +547,17 @@ "ema20", "correlation" ], - "return_field": "ctc1", + "return_fields": [ + "ctc-1", + "ctc-2", + "ctc-3", + "ctc-4", + "ctc-5", + "ctc-6", + "ctc-7", + "ctc-8", + "ctc-9" + ], "min_event_count": 50, "min_avg_return": 0.2 } diff --git a/src/algotrader/examples/pipeline-templates/backtest_technicals_with_buckets_calculator.json b/src/algotrader/examples/pipeline-templates/backtest_technicals_with_buckets_calculator.json index 72e8761..13fb378 100644 --- a/src/algotrader/examples/pipeline-templates/backtest_technicals_with_buckets_calculator.json +++ b/src/algotrader/examples/pipeline-templates/backtest_technicals_with_buckets_calculator.json @@ -1162,6 +1162,7 @@ "ZTS" ], "bins_count": 10, - "output_file_path": "/Users/idanyael/personal-dev/algo-trader/src/algotrader/examples/pipeline-templates/bins.json" + "output_file_path": "/Users/idanyael/personal-dev/algo-trader/src/algotrader/examples/pipeline-templates/bins.json", + "outliers_removal_percentage": 0.05 } } \ No newline at end of file diff --git a/src/algotrader/examples/pipeline-templates/loader_simple_returns_calculator.json b/src/algotrader/examples/pipeline-templates/loader_simple_returns_calculator.json index 14b1eaa..01c421b 100644 --- a/src/algotrader/examples/pipeline-templates/loader_simple_returns_calculator.json +++ b/src/algotrader/examples/pipeline-templates/loader_simple_returns_calculator.json @@ -515,7 +515,8 @@ } }, "processor": { - "returnsCount": 5 + "returnsCount": 5, + "fieldPrefix": "ctc" }, "terminator": null } \ No newline at end of file diff --git a/src/algotrader/pipeline/builders/backtest.py b/src/algotrader/pipeline/builders/backtest.py index 2224b66..14c942a 100644 --- a/src/algotrader/pipeline/builders/backtest.py +++ b/src/algotrader/pipeline/builders/backtest.py @@ -43,6 +43,8 @@ def build_mongodb_history_buckets_backtester(bins_file_path: str) -> Pipeline: data_from_time = STATIC_NOW - timedelta(days=365 * 3) source = MongoDBSource(mongodb_storage, symbols, TimeSpan.Day, backtest_from_time, STATIC_NOW) + return_fields = [f'ctc-{i}' for i in range(1, 10)] + history_compare_strategy = HistoryBucketCompareStrategy(mongodb_storage, data_from_time, backtest_from_time, @@ -51,7 +53,7 @@ def build_mongodb_history_buckets_backtester(bins_file_path: str) -> Pipeline: 'rsi7', 'rsi14', 'stddev5', 'ema5', 'ema20', 'correlation'], - return_field='ctc1', min_event_count=50, + return_fields=return_fields, min_event_count=50, min_avg_return=0.2) cache_processor = CandleCache() diff --git a/src/algotrader/pipeline/builders/loaders.py b/src/algotrader/pipeline/builders/loaders.py index 5718a6f..03f255b 100644 --- a/src/algotrader/pipeline/builders/loaders.py +++ b/src/algotrader/pipeline/builders/loaders.py @@ -104,7 +104,7 @@ def build_returns_calculator(days_back: int = DEFAULT_DAYS_BACK) -> Pipeline: sink = StorageSinkProcessor(mongodb_storage) cache_processor = CandleCache(sink) - processor = ReturnsCalculatorProcessor(5, cache_processor) + processor = ReturnsCalculatorProcessor('ctc', 5, cache_processor) return Pipeline(source, processor) diff --git a/src/algotrader/pipeline/pipeline.py b/src/algotrader/pipeline/pipeline.py index 3aff408..e812dad 100644 --- a/src/algotrader/pipeline/pipeline.py +++ b/src/algotrader/pipeline/pipeline.py @@ -1,8 +1,6 @@ import logging from typing import Optional, Dict -from rich.progress import Progress, TextColumn, BarColumn - from algotrader.entities.serializable import Serializable, Deserializable from algotrader.pipeline.processor import Processor from algotrader.pipeline.shared_context import SharedContext @@ -37,13 +35,10 @@ def deserialize(cls, data: Dict): def run(self, context: SharedContext) -> None: self.logger.info('Starting pipeline...') - with Progress(TextColumn('{task.completed} Candle(s) processed'), BarColumn()) as progress: - processing_task = progress.add_task("Processing", total=None) - - for candle in self.source.read(): - self.logger.debug('Processing candle: %s\r', candle.serialize()) - self.processor.process(context, candle) - progress.update(processing_task, advance=1) + for candle in self.source.read(): + self.logger.debug('Processing candle: %s\r', candle.serialize()) + self.processor.process(context, candle) if self.terminator: + self.logger.debug('initiating termination...') self.terminator.terminate(context) diff --git a/src/algotrader/pipeline/processors/returns.py b/src/algotrader/pipeline/processors/returns.py index c5997b1..0a3175d 100644 --- a/src/algotrader/pipeline/processors/returns.py +++ b/src/algotrader/pipeline/processors/returns.py @@ -19,15 +19,18 @@ class Returns(GenericCandleAttachment[float]): class ReturnsCalculatorProcessor(Processor): - def __init__(self, returns_count: int, next_processor: Optional[Processor] = None): + def __init__(self, field_prefix: str, returns_count: int, next_processor: Optional[Processor] = None): super().__init__(next_processor) + self.field_prefix = field_prefix self.returns_count = returns_count def process(self, context: SharedContext, candle: Candle): cache_reader = CandleCache.context_reader(context) symbol_candles = cache_reader.get_symbol_candles(candle.symbol) or [] - if len(symbol_candles) > self.returns_count: + candle.attachments.add_attachement(RETURNS_ATTACHMENT_KEY, Returns()) + + if len(symbol_candles) >= self.returns_count: candle_returns = self._calc_returns(candle, symbol_candles) candle.attachments.add_attachement(RETURNS_ATTACHMENT_KEY, candle_returns) @@ -36,16 +39,17 @@ def process(self, context: SharedContext, candle: Candle): def _calc_returns(self, current_candle: Candle, candles: List[Candle]) -> Returns: candle_returns = Returns() - for i in range(1, self.returns_count): - candle_returns.set(f'ctc{i}', (1 - current_candle.close / candles[-i].close) * 100) + for i in range(1, self.returns_count + 1): + candle_returns.set(f'{self.field_prefix}-{i}', (1 - current_candle.close / candles[-i].close) * 100) return candle_returns def serialize(self) -> Dict: return { - 'returnsCount': self.returns_count + 'returnsCount': self.returns_count, + 'fieldPrefix': self.field_prefix } @classmethod def deserialize(cls, data: Dict) -> Optional[Processor]: - return cls(data['returnsCount'], cls._deserialize_next_processor(data)) + return cls(data['fieldPrefix'], data['returnsCount'], cls._deserialize_next_processor(data)) diff --git a/src/algotrader/pipeline/processors/technicals.py b/src/algotrader/pipeline/processors/technicals.py index 47be0af..6971939 100644 --- a/src/algotrader/pipeline/processors/technicals.py +++ b/src/algotrader/pipeline/processors/technicals.py @@ -22,6 +22,8 @@ class Indicators(GenericCandleAttachment[IndicatorValue]): Indicators() +MAX_CANDLES_FOR_CALC = 50 + class TechnicalsProcessor(Processor): """ @@ -36,7 +38,7 @@ def __init__(self, config: TechnicalsProcessorConfig, next_processor: Optional[P def process(self, context: SharedContext, candle: Candle): cache_reader = CandleCache.context_reader(context) symbol_candles = cache_reader.get_symbol_candles(candle.symbol) or [] - calculator = TechnicalCalculator(symbol_candles + [candle]) + calculator = TechnicalCalculator(symbol_candles[-MAX_CANDLES_FOR_CALC:] + [candle]) candle_indicators = Indicators() self._calculate(calculator, candle_indicators) diff --git a/src/algotrader/pipeline/processors/technicals_normalizer.py b/src/algotrader/pipeline/processors/technicals_normalizer.py index 88a0c1d..f1a9874 100644 --- a/src/algotrader/pipeline/processors/technicals_normalizer.py +++ b/src/algotrader/pipeline/processors/technicals_normalizer.py @@ -21,7 +21,7 @@ class NormalizedIndicators(GenericCandleAttachment[IndicatorValue]): NormalizedIndicators() -VWAP_NORMALIZE_PREFIXES = ['sma', 'ema'] +VWAP_NORMALIZE_PREFIXES = ['sma', 'ema', 'typical', 'bbands'] NormalizeFunc = Callable[[List[Candle], IndicatorValue], IndicatorValue] diff --git a/src/algotrader/pipeline/strategies/history_cosine_similarity.py b/src/algotrader/pipeline/strategies/history_cosine_similarity.py index 447489a..f51aab8 100644 --- a/src/algotrader/pipeline/strategies/history_cosine_similarity.py +++ b/src/algotrader/pipeline/strategies/history_cosine_similarity.py @@ -26,11 +26,11 @@ def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime, self.min_avg_return = min_avg_return groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare] - return_field = f'attachments.returns.{return_field}' + return_fields = [f'attachments.returns.ctc-{i}' for i in range(1, 20)] self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start, timeframe_end, groupby_fields, - return_field, min_event_count, + return_fields, min_event_count, min_avg_return) def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]: diff --git a/src/algotrader/storage/mongodb_storage.py b/src/algotrader/storage/mongodb_storage.py index 600cd23..7a59056 100644 --- a/src/algotrader/storage/mongodb_storage.py +++ b/src/algotrader/storage/mongodb_storage.py @@ -30,6 +30,10 @@ def __init__(self, host: str = 'localhost', port: int = 27017, database: str = D self.db: Optional[Database] = None self.candles_collection: Optional[Collection] = None + def get_collection(self) -> Collection: + self._ensure_connection() + return self.candles_collection + def _ensure_connection(self): if self.client: return diff --git a/tests/unit/strategies/test_history_compare.py b/tests/unit/strategies/test_history_compare.py index 7b4c3a7..952f19f 100644 --- a/tests/unit/strategies/test_history_compare.py +++ b/tests/unit/strategies/test_history_compare.py @@ -72,7 +72,7 @@ def _check(signals: List[StrategySignal]): datetime.now() - timedelta(days=60), datetime.now(), indicators_to_compare=['sma5', 'sma20'], - return_field='ctc1', min_event_count=1, + return_fields=['ctc1'], min_event_count=1, min_avg_return=0.2) # TODO: FakeSignalsExecutor is not called when there is not signal. make sure to fail if it's not called. @@ -100,7 +100,7 @@ def _check(signals: List[StrategySignal]): datetime.now(), datetime.now(), indicators_to_compare=['sma5', 'sma20'], - return_field='ctc1', min_event_count=1, + return_fields=['ctc1'], min_event_count=1, min_avg_return=0.2) # TODO: FakeSignalsExecutor is not called when there is not signal. make sure to fail if it's not called. diff --git a/tests/unit/test_returns_calculator_processor.py b/tests/unit/test_returns_calculator_processor.py index 0e03d63..ef40e3c 100644 --- a/tests/unit/test_returns_calculator_processor.py +++ b/tests/unit/test_returns_calculator_processor.py @@ -1,36 +1,43 @@ -import random -from datetime import datetime +from datetime import datetime, timedelta from unittest import TestCase -from algotrader.entities.candle import Candle from algotrader.entities.timespan import TimeSpan -from fakes.pipeline_validators import ValidationProcessor -from fakes.source import FakeSource from algotrader.pipeline.pipeline import Pipeline from algotrader.pipeline.processors.candle_cache import CandleCache -from algotrader.pipeline.processors.returns import ReturnsCalculatorProcessor, RETURNS_ATTACHMENT_KEY, Returns +from algotrader.pipeline.processors.returns import ReturnsCalculatorProcessor, RETURNS_ATTACHMENT_KEY +from algotrader.pipeline.reverse_source import ReverseSource from algotrader.pipeline.runner import PipelineRunner from algotrader.pipeline.shared_context import SharedContext -from unit import generate_candle_with_price +from fakes.pipeline_validators import TerminatorValidator +from fakes.source import FakeSource +from unit import generate_candle_with_price, TEST_SYMBOL class TestReturnsCalculatorProcessor(TestCase): def setUp(self) -> None: super().setUp() self.source = FakeSource( - [generate_candle_with_price(TimeSpan.Day, datetime.now(), random.randint(1, c)) for c in range(1, 50)]) + [generate_candle_with_price(TimeSpan.Day, datetime.now() + timedelta(minutes=c), c) for c in range(1, 50)]) def test(self): - def _check(context: SharedContext, candle: Candle): + def _check_returns(context: SharedContext): self.assertIsNotNone(context) - context.put_kv_data('check_count', context.get_kv_data('check_count', 0) + 1) - check_count = context.get_kv_data('check_count', 0) + cache_reader = CandleCache.context_reader(context) + candles = cache_reader.get_symbol_candles(TEST_SYMBOL) + + self.assertFalse(candles[0].attachments.get_attachment(RETURNS_ATTACHMENT_KEY).has('ctc-1')) + self.assertFalse(candles[1].attachments.get_attachment(RETURNS_ATTACHMENT_KEY).has('ctc-1')) + self.assertFalse(candles[2].attachments.get_attachment(RETURNS_ATTACHMENT_KEY).has('ctc-1')) + + ctc1 = candles[3].attachments.get_attachment(RETURNS_ATTACHMENT_KEY)['ctc-1'] + ctc2 = candles[3].attachments.get_attachment(RETURNS_ATTACHMENT_KEY)['ctc-2'] + ctc3 = candles[3].attachments.get_attachment(RETURNS_ATTACHMENT_KEY)['ctc-3'] + self.assertTrue(ctc1 < ctc2 < ctc3) + + cache_processor = CandleCache() + processor = ReturnsCalculatorProcessor('ctc', 3, cache_processor) - if check_count > 6: - candle_returns: Returns = candle.attachments.get_attachment(RETURNS_ATTACHMENT_KEY) - self.assertTrue(candle_returns.has('ctc1')) + terminator = TerminatorValidator(_check_returns) - validator = ValidationProcessor(_check) - cache_processor = CandleCache(validator) - processor = ReturnsCalculatorProcessor(5, cache_processor) - PipelineRunner(Pipeline(self.source, processor)).run() + self.source = ReverseSource(self.source) + PipelineRunner(Pipeline(self.source, processor, terminator)).run() diff --git a/tests/unit/test_technicals_processor.py b/tests/unit/test_technicals_processor.py index 46281e1..5cfb508 100644 --- a/tests/unit/test_technicals_processor.py +++ b/tests/unit/test_technicals_processor.py @@ -52,6 +52,7 @@ def _check(context: SharedContext, candle: Candle): IndicatorConfig('sma5', TechnicalCalculation.SMA, [5]), IndicatorConfig('macd', TechnicalCalculation.MACD, [2, 5, 9]), IndicatorConfig('cci7', TechnicalCalculation.CCI, [7]), + IndicatorConfig('arooosc', TechnicalCalculation.AROONOSC, [7]), ]) processor = TechnicalsProcessor(config, cache_processor) @@ -69,9 +70,18 @@ def _check(context: SharedContext, candle: Candle): normalized_indicators: NormalizedIndicators = candle.attachments.get_attachment( NORMALIZED_INDICATORS_ATTACHMENT_KEY) + + bbands5_value = indicators.get('bbands5') + normalized_bbands5_value = normalized_indicators.get('bbands5') + sma5_value = indicators.get('sma5') normalized_sma5_value = normalized_indicators.get('sma5') + vwap = (candle.close + candle.high + candle.low) / candle.volume + + for i in range(len(bbands5_value)): + self.assertTrue(bbands5_value[i] / vwap, normalized_bbands5_value[i]) + self.assertTrue(sma5_value / vwap, normalized_sma5_value) validator = ValidationProcessor(_check) @@ -80,6 +90,7 @@ def _check(context: SharedContext, candle: Candle): config = TechnicalsProcessorConfig([ IndicatorConfig('sma5', TechnicalCalculation.SMA, [5]), + IndicatorConfig('bbands5', TechnicalCalculation.BBANDS, [5]), ]) technicals = TechnicalsProcessor(config, technicals_normalizer)