From 72dc4794b412b46e556a4a550bc2468701346a81 Mon Sep 17 00:00:00 2001 From: jstalex Date: Thu, 27 Jul 2023 10:10:49 +0300 Subject: [PATCH 1/5] change db schema --- .../cmd/candles_downloader/download_candles.go | 11 ++++++----- examples/interval_bot/internal/bot/candles.go | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/examples/interval_bot/cmd/candles_downloader/download_candles.go b/examples/interval_bot/cmd/candles_downloader/download_candles.go index b4eb269..fa55384 100644 --- a/examples/interval_bot/cmd/candles_downloader/download_candles.go +++ b/examples/interval_bot/cmd/candles_downloader/download_candles.go @@ -177,20 +177,21 @@ func main() { var schema = ` create table if not exists candles ( - id integer primary key autoincrement, - instrument_uid text, + id integer primary key autoincrement, + instrument_uid text, open real, close real, high real, low real, volume integer, time integer, - is_complete integer + is_complete integer, + unique (instrument_uid, time) ); create table if not exists updates ( - instrument_id text unique, - time integer + instrument_id text unique, + time integer ); ` diff --git a/examples/interval_bot/internal/bot/candles.go b/examples/interval_bot/internal/bot/candles.go index e135245..97cc4c4 100644 --- a/examples/interval_bot/internal/bot/candles.go +++ b/examples/interval_bot/internal/bot/candles.go @@ -37,7 +37,8 @@ create table if not exists candles ( low real, volume integer, time integer, - is_complete integer + is_complete integer, + unique (instrument_uid, time) ); create table if not exists updates ( From 4ddc3297b1d5ff18f2137c16480cb6288f42e8bd Mon Sep 17 00:00:00 2001 From: jstalex Date: Thu, 27 Jul 2023 11:09:35 +0300 Subject: [PATCH 2/5] fix comments --- .../interval_bot/cmd/candles_downloader/download_candles.go | 2 +- examples/interval_bot/internal/bot/candles.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/interval_bot/cmd/candles_downloader/download_candles.go b/examples/interval_bot/cmd/candles_downloader/download_candles.go index fa55384..cf63f69 100644 --- a/examples/interval_bot/cmd/candles_downloader/download_candles.go +++ b/examples/interval_bot/cmd/candles_downloader/download_candles.go @@ -166,7 +166,7 @@ func main() { if err != nil { logger.Errorf(err.Error()) } - logger.Infof("store in db complete candle %v/%v", i+1, len(instrumentIds)) + logger.Infof("store in db complete instrument %v/%v", i+1, len(instrumentIds)) err = bar.Add(1) if err != nil { logger.Errorf(err.Error()) diff --git a/examples/interval_bot/internal/bot/candles.go b/examples/interval_bot/internal/bot/candles.go index 97cc4c4..ffc54ac 100644 --- a/examples/interval_bot/internal/bot/candles.go +++ b/examples/interval_bot/internal/bot/candles.go @@ -142,7 +142,7 @@ func (c *CandlesStorage) ticker(key string) string { func (c *CandlesStorage) Candles(id string, from, to time.Time) ([]*pb.HistoricCandle, error) { allCandles, ok := c.candles[id] if !ok { - return nil, fmt.Errorf("%v instrument not found, at first LoadCandlesHistory()", id) + return nil, fmt.Errorf("%v instrument not found, at first LoadCandlesHistory() or use candles_dowloader", id) } indexes := [2]int{} times := [2]time.Time{from, to} @@ -158,7 +158,7 @@ func (c *CandlesStorage) Candles(id string, from, to time.Time) ([]*pb.HistoricC } } if currIndex == 0 { - return nil, fmt.Errorf("%v candles not found in storage, try to UpdateCandlesHistory() from = %v\n", c.ticker(id), from) + return nil, fmt.Errorf("%v candles not found in storage, try to UpdateCandlesHistory() from = %v or use candles_downloader\n", c.ticker(id), from) } if indexes[1] == 0 { return allCandles[indexes[0]:], nil From 32344ad9a1501a3a271c9b45ac439f01af9b6da6 Mon Sep 17 00:00:00 2001 From: jstalex Date: Thu, 27 Jul 2023 13:58:39 +0300 Subject: [PATCH 3/5] fix end candle duplicate --- investgo/marketdata.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/investgo/marketdata.go b/investgo/marketdata.go index 27d2d1e..8cdcfb8 100644 --- a/investgo/marketdata.go +++ b/investgo/marketdata.go @@ -168,7 +168,10 @@ func (md *MarketDataServiceClient) GetHistoricCandles(req *GetHistoricCandlesReq if err != nil { return nil, err } - candles = append(candles, resp.GetCandles()...) + if len(resp.GetCandles()) < 1 { + continue + } + candles = append(candles, resp.GetCandles()[1:]...) if requests == 299 { if md.config.DisableResourceExhaustedRetry { time.Sleep(time.Minute) From d73b90aee47f9572d79bba0fb534917f4e659fbc Mon Sep 17 00:00:00 2001 From: jstalex Date: Thu, 27 Jul 2023 15:28:24 +0300 Subject: [PATCH 4/5] add candles first update --- .../interval_bot/cmd/backtest/backtest.go | 4 + .../candles_downloader/download_candles.go | 17 ++- examples/interval_bot/cmd/main.go | 4 + examples/interval_bot/internal/bot/bot.go | 2 +- examples/interval_bot/internal/bot/candles.go | 121 +++++++++++------- 5 files changed, 94 insertions(+), 54 deletions(-) diff --git a/examples/interval_bot/cmd/backtest/backtest.go b/examples/interval_bot/cmd/backtest/backtest.go index 8f66cbb..d556e9f 100644 --- a/examples/interval_bot/cmd/backtest/backtest.go +++ b/examples/interval_bot/cmd/backtest/backtest.go @@ -229,6 +229,10 @@ func main() { logger.Infof("got %v instruments", len(instrumentIds)) // Добавляем инструменты в конфиг intervalConfig.Instruments = instrumentIds + // initDate не может быть раньше StorageFromTime + if initDate.Before(intervalConfig.StorageFromTime) { + intervalConfig.StorageFromTime = initDate + } // создание интервального бота intervalBot, err := bot.NewBot(ctx, client, intervalConfig) if err != nil { diff --git a/examples/interval_bot/cmd/candles_downloader/download_candles.go b/examples/interval_bot/cmd/candles_downloader/download_candles.go index cf63f69..97bc686 100644 --- a/examples/interval_bot/cmd/candles_downloader/download_candles.go +++ b/examples/interval_bot/cmd/candles_downloader/download_candles.go @@ -2,7 +2,9 @@ package main import ( "context" + "errors" "github.com/jmoiron/sqlx" + "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3" "github.com/schollz/progressbar/v3" "github.com/tinkoff/invest-api-go-sdk/investgo" @@ -162,7 +164,7 @@ func main() { logger.Infof("got %v candles for %v", len(candles), id) - err = storeCandlesInDB(db, id, now, candles) + err = storeCandlesInDB(db, id, FROM, now, candles) if err != nil { logger.Errorf(err.Error()) } @@ -191,7 +193,8 @@ create table if not exists candles ( create table if not exists updates ( instrument_id text unique, - time integer + first_time integer, + last_time integer ); ` @@ -212,7 +215,7 @@ func initDB(path string) (*sqlx.DB, error) { } // storeCandlesInDB - Сохранение исторических свечей инструмента в бд -func storeCandlesInDB(db *sqlx.DB, uid string, update time.Time, hc []*pb.HistoricCandle) error { +func storeCandlesInDB(db *sqlx.DB, uid string, first, last time.Time, hc []*pb.HistoricCandle) error { tx, err := db.Begin() if err != nil { return err @@ -239,7 +242,11 @@ func storeCandlesInDB(db *sqlx.DB, uid string, update time.Time, hc []*pb.Histor candle.GetTime().AsTime().Unix(), candle.GetIsComplete()) if err != nil { - return err + if errors.As(err, &sqlite3.Error{}) { + continue + } else { + return err + } } } @@ -248,7 +255,7 @@ func storeCandlesInDB(db *sqlx.DB, uid string, update time.Time, hc []*pb.Histor } // записываем в базу время последнего обновления - _, err = db.Exec(`insert or replace into updates(instrument_id, time) values (?, ?)`, uid, update.Unix()) + _, err = db.Exec(`insert or replace into updates(instrument_id, first_time, last_time) values (?, ?, ?)`, uid, first.Unix(), last.Unix()) if err != nil { return err } diff --git a/examples/interval_bot/cmd/main.go b/examples/interval_bot/cmd/main.go index d42f636..11ddd94 100644 --- a/examples/interval_bot/cmd/main.go +++ b/examples/interval_bot/cmd/main.go @@ -166,6 +166,10 @@ func main() { // передаем инструменты в конфиг intervalConfig.Instruments = instrumentIds + // запрашиваемое время для свечей не может быть раньше StorageFromTime + if time.Now().Add(-time.Hour * 24 * time.Duration(intervalConfig.DaysToCalculateInterval)).Before(intervalConfig.StorageFromTime) { + intervalConfig.StorageFromTime = time.Now().Add(-time.Hour * 24 * time.Duration(intervalConfig.DaysToCalculateInterval)) + } // создание интервального бота intervalBot, err := bot.NewBot(ctx, client, intervalConfig) diff --git a/examples/interval_bot/internal/bot/bot.go b/examples/interval_bot/internal/bot/bot.go index a39c4d0..8ae5ef6 100644 --- a/examples/interval_bot/internal/bot/bot.go +++ b/examples/interval_bot/internal/bot/bot.go @@ -136,7 +136,7 @@ func NewBot(ctx context.Context, client *investgo.Client, config IntervalStrateg instrumentsForStorage[instrument] = StorageInstrument{ CandleInterval: config.StorageCandleInterval, PriceStep: resp.GetInstrument().GetMinPriceIncrement(), - LastUpdate: config.StorageFromTime, + FirstUpdate: config.StorageFromTime, ticker: resp.GetInstrument().GetTicker(), } } diff --git a/examples/interval_bot/internal/bot/candles.go b/examples/interval_bot/internal/bot/candles.go index ffc54ac..afd67d6 100644 --- a/examples/interval_bot/internal/bot/candles.go +++ b/examples/interval_bot/internal/bot/candles.go @@ -1,8 +1,10 @@ package bot import ( + "errors" "fmt" "github.com/jmoiron/sqlx" + "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3" "github.com/tinkoff/invest-api-go-sdk/investgo" pb "github.com/tinkoff/invest-api-go-sdk/proto" @@ -14,6 +16,7 @@ import ( type StorageInstrument struct { CandleInterval pb.CandleInterval PriceStep *pb.Quotation + FirstUpdate time.Time LastUpdate time.Time ticker string } @@ -43,7 +46,8 @@ create table if not exists candles ( create table if not exists updates ( instrument_id text unique , - time integer + first_time integer, + last_time integer ); ` @@ -61,19 +65,21 @@ func NewCandlesStorage(dbpath string, update bool, required map[string]StorageIn return nil, err } cs.db = db - // получаем инструменты, которые уже есть в бд - unique, err := cs.uniqueInstruments() + // получаем время первого и последнего обновления инструментов, которые уже есть в бд + DBUpdates, err := cs.lastUpdates() if err != nil { return nil, err } - // если инструмента в бд нет, то загружаем данные по нему + cs.logger.Infof("got %v unique instruments from storage", len(DBUpdates)) + // если инструмента в бд нет, то загружаем данные по нему, если есть, но недостаточно, то догружаем свечи for id, instrument := range required { - if _, ok := unique[id]; !ok { + if _, ok := DBUpdates[id]; !ok { + cs.logger.Infof("candles for %v not found, downloading...", id) now := time.Now() newCandles, err := cs.mds.GetHistoricCandles(&investgo.GetHistoricCandlesRequest{ Instrument: id, Interval: instrument.CandleInterval, - From: instrument.LastUpdate, + From: instrument.FirstUpdate, To: now, File: false, FileName: "", @@ -89,11 +95,36 @@ func NewCandlesStorage(dbpath string, update bool, required map[string]StorageIn return nil, err } } else { - cs.instruments[id] = instrument + // first time check + // если все ок, то просто обновляем время обновления + if instrument.FirstUpdate.After(DBUpdates[id].FirstUpdate) { + instrument.FirstUpdate = DBUpdates[id].FirstUpdate + instrument.LastUpdate = DBUpdates[id].LastUpdate + cs.instruments[id] = instrument + } else { + cs.logger.Infof("older candles for %v not found, downloading...", cs.ticker(id)) + // если нужно догрузить более старые свечи + oldCandles, err := cs.mds.GetHistoricCandles(&investgo.GetHistoricCandlesRequest{ + Instrument: id, + Interval: instrument.CandleInterval, + From: instrument.FirstUpdate, + To: DBUpdates[id].FirstUpdate, + File: false, + FileName: "", + }) + if err != nil { + return nil, err + } + instrument.LastUpdate = DBUpdates[id].LastUpdate + cs.instruments[id] = instrument + err = cs.storeCandlesInDB(id, instrument.LastUpdate, oldCandles) + if err != nil { + return nil, err + } + } } } - // вычитываем из бд даты последних обновлений - err = cs.lastUpdates() + // если нужно обновить с lastUpdate до сейчас if update { // обновляем в бд данные по всем инструментам for id := range required { @@ -111,7 +142,6 @@ func NewCandlesStorage(dbpath string, update bool, required map[string]StorageIn } cs.candles[id] = tmp } - return cs, err } @@ -267,56 +297,46 @@ func (c *CandlesStorage) UpdateCandlesHistory(id string) error { return c.storeCandlesInDB(id, now, newCandles) } -// lastUpdates - Обновление времени последнего обновления свечей по инструментам в мапе Instruments -func (c *CandlesStorage) lastUpdates() error { +// lastUpdates - Возвращает первое и последнее обновление для инструментов из бд +func (c *CandlesStorage) lastUpdates() (map[string]StorageInstrument, error) { c.logger.Infof("update lastUpdate time from storage...") - var lastUpdUnix int64 + var lastUpdUnix, firstUpdUnix int64 var tempId string + updatesFromDB := make(map[string]StorageInstrument, len(c.instruments)) rows, err := c.db.Query(`select * from updates`) if err != nil { - return err + return nil, err } for rows.Next() { - err = rows.Scan(&tempId, &lastUpdUnix) + err = rows.Scan(&tempId, &firstUpdUnix, &lastUpdUnix) if err != nil { - return err + return nil, err } - instrument, ok := c.instruments[tempId] - if !ok { - // этот инструмент из базы нам сейчас не нужен - continue + updatesFromDB[tempId] = StorageInstrument{ + FirstUpdate: time.Unix(firstUpdUnix, 0), + LastUpdate: time.Unix(lastUpdUnix, 0), } - instrument.LastUpdate = time.Unix(lastUpdUnix, 0) - c.instruments[tempId] = instrument - } - //for id, candles := range c.instruments { - // err := c.db.Get(&lastUpdUnix, `select max(time) from candles where instrument_uid=?`, id) - // if err != nil { - // return err - // } - // candles.LastUpdate = time.Unix(lastUpdUnix, 0) - // c.instruments[id] = candles - //} - return nil -} - -// uniqueInstruments - Метод возвращает мапу с уникальными значениями uid инструментов в бд -func (c *CandlesStorage) uniqueInstruments() (map[string]struct{}, error) { - instruments := make([]string, 0) - err := c.db.Select(&instruments, `select distinct instrument_id from updates`) - if err != nil { - return nil, err } - m := make(map[string]struct{}) - for _, instrument := range instruments { - m[instrument] = struct{}{} - } - c.logger.Infof("got %v unique instruments from storage", len(m)) - return m, nil + return updatesFromDB, nil } +//// uniqueInstruments - Метод возвращает мапу с уникальными значениями uid инструментов в бд +//func (c *CandlesStorage) uniqueInstruments() (map[string]struct{}, error) { +// instruments := make([]string, 0) +// err := c.db.Select(&instruments, `select distinct instrument_id from updates`) +// if err != nil { +// return nil, err +// } +// m := make(map[string]struct{}) +// for _, instrument := range instruments { +// m[instrument] = struct{}{} +// } +// c.logger.Infof("got %v unique instruments from storage", len(m)) +// return m, nil +//} + // initDB - Инициализация бд func (c *CandlesStorage) initDB(path string) (*sqlx.DB, error) { db, err := sqlx.Open("sqlite3", path) @@ -361,7 +381,11 @@ func (c *CandlesStorage) storeCandlesInDB(uid string, update time.Time, hc []*pb candle.GetTime().AsTime().Unix(), candle.GetIsComplete()) if err != nil { - return err + if errors.As(err, &sqlite3.Error{}) { + continue + } else { + return err + } } } @@ -370,7 +394,8 @@ func (c *CandlesStorage) storeCandlesInDB(uid string, update time.Time, hc []*pb } // записываем в базу время последнего обновления - _, err = c.db.Exec(`insert or replace into updates(instrument_id, time) values (?, ?)`, uid, update.Unix()) + _, err = c.db.Exec(`insert or replace into updates(instrument_id, first_time, last_time) values (?, ?, ?)`, + uid, c.instruments[uid].FirstUpdate.Unix(), update.Unix()) if err != nil { return err } From 54e8b8096aeabf441b07d6194cbca82c78931f57 Mon Sep 17 00:00:00 2001 From: jstalex Date: Fri, 28 Jul 2023 11:09:46 +0300 Subject: [PATCH 5/5] wrap db tx in function --- .../candles_downloader/download_candles.go | 67 ++++++++++--------- examples/interval_bot/internal/bot/candles.go | 67 ++++++++++--------- 2 files changed, 73 insertions(+), 61 deletions(-) diff --git a/examples/interval_bot/cmd/candles_downloader/download_candles.go b/examples/interval_bot/cmd/candles_downloader/download_candles.go index 97bc686..84b7ded 100644 --- a/examples/interval_bot/cmd/candles_downloader/download_candles.go +++ b/examples/interval_bot/cmd/candles_downloader/download_candles.go @@ -216,44 +216,51 @@ func initDB(path string) (*sqlx.DB, error) { // storeCandlesInDB - Сохранение исторических свечей инструмента в бд func storeCandlesInDB(db *sqlx.DB, uid string, first, last time.Time, hc []*pb.HistoricCandle) error { - tx, err := db.Begin() - if err != nil { - return err - } + err := func() error { + tx, err := db.Begin() + if err != nil { + return err + } - insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) + defer func() { + if err = tx.Commit(); err != nil { + log.Printf(err.Error()) + } + }() + + insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) values (?, ?, ?, ?, ?, ?, ?, ?) `) - if err != nil { - return err - } - defer func() { - if err := insertCandle.Close(); err != nil { - log.Printf(err.Error()) + if err != nil { + return err } - }() + defer func() { + if err := insertCandle.Close(); err != nil { + log.Printf(err.Error()) + } + }() - for _, candle := range hc { - _, err := insertCandle.Exec(uid, - candle.GetOpen().ToFloat(), - candle.GetClose().ToFloat(), - candle.GetHigh().ToFloat(), - candle.GetLow().ToFloat(), - candle.GetVolume(), - candle.GetTime().AsTime().Unix(), - candle.GetIsComplete()) - if err != nil { - if errors.As(err, &sqlite3.Error{}) { - continue - } else { - return err + for _, candle := range hc { + _, err := insertCandle.Exec(uid, + candle.GetOpen().ToFloat(), + candle.GetClose().ToFloat(), + candle.GetHigh().ToFloat(), + candle.GetLow().ToFloat(), + candle.GetVolume(), + candle.GetTime().AsTime().Unix(), + candle.GetIsComplete()) + if err != nil { + if errors.As(err, &sqlite3.Error{}) { + continue + } else { + return err + } } } - } - - if err := tx.Commit(); err != nil { + return nil + }() + if err != nil { return err } - // записываем в базу время последнего обновления _, err = db.Exec(`insert or replace into updates(instrument_id, first_time, last_time) values (?, ?, ?)`, uid, first.Unix(), last.Unix()) if err != nil { diff --git a/examples/interval_bot/internal/bot/candles.go b/examples/interval_bot/internal/bot/candles.go index afd67d6..238cadc 100644 --- a/examples/interval_bot/internal/bot/candles.go +++ b/examples/interval_bot/internal/bot/candles.go @@ -355,44 +355,49 @@ func (c *CandlesStorage) initDB(path string) (*sqlx.DB, error) { // storeCandlesInDB - Сохранение исторических свечей инструмента в бд func (c *CandlesStorage) storeCandlesInDB(uid string, update time.Time, hc []*pb.HistoricCandle) error { - tx, err := c.db.Begin() - if err != nil { - return err - } - - insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) + err := func() error { + tx, err := c.db.Begin() + if err != nil { + return err + } + defer func() { + if err := tx.Commit(); err != nil { + c.logger.Errorf(err.Error()) + } + }() + insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) values (?, ?, ?, ?, ?, ?, ?, ?) `) - if err != nil { - return err - } - defer func() { - if err := insertCandle.Close(); err != nil { - c.logger.Errorf(err.Error()) + if err != nil { + return err } - }() + defer func() { + if err := insertCandle.Close(); err != nil { + c.logger.Errorf(err.Error()) + } + }() - for _, candle := range hc { - _, err := insertCandle.Exec(uid, - candle.GetOpen().ToFloat(), - candle.GetClose().ToFloat(), - candle.GetHigh().ToFloat(), - candle.GetLow().ToFloat(), - candle.GetVolume(), - candle.GetTime().AsTime().Unix(), - candle.GetIsComplete()) - if err != nil { - if errors.As(err, &sqlite3.Error{}) { - continue - } else { - return err + for _, candle := range hc { + _, err := insertCandle.Exec(uid, + candle.GetOpen().ToFloat(), + candle.GetClose().ToFloat(), + candle.GetHigh().ToFloat(), + candle.GetLow().ToFloat(), + candle.GetVolume(), + candle.GetTime().AsTime().Unix(), + candle.GetIsComplete()) + if err != nil { + if errors.As(err, &sqlite3.Error{}) { + continue + } else { + return err + } } } - } - - if err := tx.Commit(); err != nil { + return nil + }() + if err != nil { return err } - // записываем в базу время последнего обновления _, err = c.db.Exec(`insert or replace into updates(instrument_id, first_time, last_time) values (?, ?, ?)`, uid, c.instruments[uid].FirstUpdate.Unix(), update.Unix())