diff --git a/examples/interval_bot/cmd/backtest/backtest.go b/examples/interval_bot/cmd/backtest/backtest.go index 8f66cbb..d556e9f 100644 --- a/examples/interval_bot/cmd/backtest/backtest.go +++ b/examples/interval_bot/cmd/backtest/backtest.go @@ -229,6 +229,10 @@ func main() { logger.Infof("got %v instruments", len(instrumentIds)) // Добавляем инструменты в конфиг intervalConfig.Instruments = instrumentIds + // initDate не может быть раньше StorageFromTime + if initDate.Before(intervalConfig.StorageFromTime) { + intervalConfig.StorageFromTime = initDate + } // создание интервального бота intervalBot, err := bot.NewBot(ctx, client, intervalConfig) if err != nil { diff --git a/examples/interval_bot/cmd/candles_downloader/download_candles.go b/examples/interval_bot/cmd/candles_downloader/download_candles.go index b4eb269..84b7ded 100644 --- a/examples/interval_bot/cmd/candles_downloader/download_candles.go +++ b/examples/interval_bot/cmd/candles_downloader/download_candles.go @@ -2,7 +2,9 @@ package main import ( "context" + "errors" "github.com/jmoiron/sqlx" + "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3" "github.com/schollz/progressbar/v3" "github.com/tinkoff/invest-api-go-sdk/investgo" @@ -162,11 +164,11 @@ func main() { logger.Infof("got %v candles for %v", len(candles), id) - err = storeCandlesInDB(db, id, now, candles) + err = storeCandlesInDB(db, id, FROM, now, candles) if err != nil { logger.Errorf(err.Error()) } - logger.Infof("store in db complete candle %v/%v", i+1, len(instrumentIds)) + logger.Infof("store in db complete instrument %v/%v", i+1, len(instrumentIds)) err = bar.Add(1) if err != nil { logger.Errorf(err.Error()) @@ -177,20 +179,22 @@ func main() { var schema = ` create table if not exists candles ( - id integer primary key autoincrement, - instrument_uid text, + id integer primary key autoincrement, + instrument_uid text, open real, close real, high real, low real, volume integer, time integer, - is_complete integer + is_complete integer, + unique (instrument_uid, time) ); create table if not exists updates ( - instrument_id text unique, - time integer + instrument_id text unique, + first_time integer, + last_time integer ); ` @@ -211,43 +215,54 @@ func initDB(path string) (*sqlx.DB, error) { } // storeCandlesInDB - Сохранение исторических свечей инструмента в бд -func storeCandlesInDB(db *sqlx.DB, uid string, update time.Time, hc []*pb.HistoricCandle) error { - tx, err := db.Begin() - if err != nil { - return err - } - - insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) - values (?, ?, ?, ?, ?, ?, ?, ?) `) - if err != nil { - return err - } - defer func() { - if err := insertCandle.Close(); err != nil { - log.Printf(err.Error()) +func storeCandlesInDB(db *sqlx.DB, uid string, first, last time.Time, hc []*pb.HistoricCandle) error { + err := func() error { + tx, err := db.Begin() + if err != nil { + return err } - }() - for _, candle := range hc { - _, err := insertCandle.Exec(uid, - candle.GetOpen().ToFloat(), - candle.GetClose().ToFloat(), - candle.GetHigh().ToFloat(), - candle.GetLow().ToFloat(), - candle.GetVolume(), - candle.GetTime().AsTime().Unix(), - candle.GetIsComplete()) + defer func() { + if err = tx.Commit(); err != nil { + log.Printf(err.Error()) + } + }() + + insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) + values (?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return err } - } + defer func() { + if err := insertCandle.Close(); err != nil { + log.Printf(err.Error()) + } + }() - if err := tx.Commit(); err != nil { + for _, candle := range hc { + _, err := insertCandle.Exec(uid, + candle.GetOpen().ToFloat(), + candle.GetClose().ToFloat(), + candle.GetHigh().ToFloat(), + candle.GetLow().ToFloat(), + candle.GetVolume(), + candle.GetTime().AsTime().Unix(), + candle.GetIsComplete()) + if err != nil { + if errors.As(err, &sqlite3.Error{}) { + continue + } else { + return err + } + } + } + return nil + }() + if err != nil { return err } - // записываем в базу время последнего обновления - _, err = db.Exec(`insert or replace into updates(instrument_id, time) values (?, ?)`, uid, update.Unix()) + _, err = db.Exec(`insert or replace into updates(instrument_id, first_time, last_time) values (?, ?, ?)`, uid, first.Unix(), last.Unix()) if err != nil { return err } diff --git a/examples/interval_bot/cmd/main.go b/examples/interval_bot/cmd/main.go index d42f636..11ddd94 100644 --- a/examples/interval_bot/cmd/main.go +++ b/examples/interval_bot/cmd/main.go @@ -166,6 +166,10 @@ func main() { // передаем инструменты в конфиг intervalConfig.Instruments = instrumentIds + // запрашиваемое время для свечей не может быть раньше StorageFromTime + if time.Now().Add(-time.Hour * 24 * time.Duration(intervalConfig.DaysToCalculateInterval)).Before(intervalConfig.StorageFromTime) { + intervalConfig.StorageFromTime = time.Now().Add(-time.Hour * 24 * time.Duration(intervalConfig.DaysToCalculateInterval)) + } // создание интервального бота intervalBot, err := bot.NewBot(ctx, client, intervalConfig) diff --git a/examples/interval_bot/internal/bot/bot.go b/examples/interval_bot/internal/bot/bot.go index a39c4d0..8ae5ef6 100644 --- a/examples/interval_bot/internal/bot/bot.go +++ b/examples/interval_bot/internal/bot/bot.go @@ -136,7 +136,7 @@ func NewBot(ctx context.Context, client *investgo.Client, config IntervalStrateg instrumentsForStorage[instrument] = StorageInstrument{ CandleInterval: config.StorageCandleInterval, PriceStep: resp.GetInstrument().GetMinPriceIncrement(), - LastUpdate: config.StorageFromTime, + FirstUpdate: config.StorageFromTime, ticker: resp.GetInstrument().GetTicker(), } } diff --git a/examples/interval_bot/internal/bot/candles.go b/examples/interval_bot/internal/bot/candles.go index e135245..238cadc 100644 --- a/examples/interval_bot/internal/bot/candles.go +++ b/examples/interval_bot/internal/bot/candles.go @@ -1,8 +1,10 @@ package bot import ( + "errors" "fmt" "github.com/jmoiron/sqlx" + "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3" "github.com/tinkoff/invest-api-go-sdk/investgo" pb "github.com/tinkoff/invest-api-go-sdk/proto" @@ -14,6 +16,7 @@ import ( type StorageInstrument struct { CandleInterval pb.CandleInterval PriceStep *pb.Quotation + FirstUpdate time.Time LastUpdate time.Time ticker string } @@ -37,12 +40,14 @@ create table if not exists candles ( low real, volume integer, time integer, - is_complete integer + is_complete integer, + unique (instrument_uid, time) ); create table if not exists updates ( instrument_id text unique , - time integer + first_time integer, + last_time integer ); ` @@ -60,19 +65,21 @@ func NewCandlesStorage(dbpath string, update bool, required map[string]StorageIn return nil, err } cs.db = db - // получаем инструменты, которые уже есть в бд - unique, err := cs.uniqueInstruments() + // получаем время первого и последнего обновления инструментов, которые уже есть в бд + DBUpdates, err := cs.lastUpdates() if err != nil { return nil, err } - // если инструмента в бд нет, то загружаем данные по нему + cs.logger.Infof("got %v unique instruments from storage", len(DBUpdates)) + // если инструмента в бд нет, то загружаем данные по нему, если есть, но недостаточно, то догружаем свечи for id, instrument := range required { - if _, ok := unique[id]; !ok { + if _, ok := DBUpdates[id]; !ok { + cs.logger.Infof("candles for %v not found, downloading...", id) now := time.Now() newCandles, err := cs.mds.GetHistoricCandles(&investgo.GetHistoricCandlesRequest{ Instrument: id, Interval: instrument.CandleInterval, - From: instrument.LastUpdate, + From: instrument.FirstUpdate, To: now, File: false, FileName: "", @@ -88,11 +95,36 @@ func NewCandlesStorage(dbpath string, update bool, required map[string]StorageIn return nil, err } } else { - cs.instruments[id] = instrument + // first time check + // если все ок, то просто обновляем время обновления + if instrument.FirstUpdate.After(DBUpdates[id].FirstUpdate) { + instrument.FirstUpdate = DBUpdates[id].FirstUpdate + instrument.LastUpdate = DBUpdates[id].LastUpdate + cs.instruments[id] = instrument + } else { + cs.logger.Infof("older candles for %v not found, downloading...", cs.ticker(id)) + // если нужно догрузить более старые свечи + oldCandles, err := cs.mds.GetHistoricCandles(&investgo.GetHistoricCandlesRequest{ + Instrument: id, + Interval: instrument.CandleInterval, + From: instrument.FirstUpdate, + To: DBUpdates[id].FirstUpdate, + File: false, + FileName: "", + }) + if err != nil { + return nil, err + } + instrument.LastUpdate = DBUpdates[id].LastUpdate + cs.instruments[id] = instrument + err = cs.storeCandlesInDB(id, instrument.LastUpdate, oldCandles) + if err != nil { + return nil, err + } + } } } - // вычитываем из бд даты последних обновлений - err = cs.lastUpdates() + // если нужно обновить с lastUpdate до сейчас if update { // обновляем в бд данные по всем инструментам for id := range required { @@ -110,7 +142,6 @@ func NewCandlesStorage(dbpath string, update bool, required map[string]StorageIn } cs.candles[id] = tmp } - return cs, err } @@ -141,7 +172,7 @@ func (c *CandlesStorage) ticker(key string) string { func (c *CandlesStorage) Candles(id string, from, to time.Time) ([]*pb.HistoricCandle, error) { allCandles, ok := c.candles[id] if !ok { - return nil, fmt.Errorf("%v instrument not found, at first LoadCandlesHistory()", id) + return nil, fmt.Errorf("%v instrument not found, at first LoadCandlesHistory() or use candles_dowloader", id) } indexes := [2]int{} times := [2]time.Time{from, to} @@ -157,7 +188,7 @@ func (c *CandlesStorage) Candles(id string, from, to time.Time) ([]*pb.HistoricC } } if currIndex == 0 { - return nil, fmt.Errorf("%v candles not found in storage, try to UpdateCandlesHistory() from = %v\n", c.ticker(id), from) + return nil, fmt.Errorf("%v candles not found in storage, try to UpdateCandlesHistory() from = %v or use candles_downloader\n", c.ticker(id), from) } if indexes[1] == 0 { return allCandles[indexes[0]:], nil @@ -266,55 +297,45 @@ func (c *CandlesStorage) UpdateCandlesHistory(id string) error { return c.storeCandlesInDB(id, now, newCandles) } -// lastUpdates - Обновление времени последнего обновления свечей по инструментам в мапе Instruments -func (c *CandlesStorage) lastUpdates() error { +// lastUpdates - Возвращает первое и последнее обновление для инструментов из бд +func (c *CandlesStorage) lastUpdates() (map[string]StorageInstrument, error) { c.logger.Infof("update lastUpdate time from storage...") - var lastUpdUnix int64 + var lastUpdUnix, firstUpdUnix int64 var tempId string + updatesFromDB := make(map[string]StorageInstrument, len(c.instruments)) rows, err := c.db.Query(`select * from updates`) if err != nil { - return err + return nil, err } for rows.Next() { - err = rows.Scan(&tempId, &lastUpdUnix) + err = rows.Scan(&tempId, &firstUpdUnix, &lastUpdUnix) if err != nil { - return err + return nil, err } - instrument, ok := c.instruments[tempId] - if !ok { - // этот инструмент из базы нам сейчас не нужен - continue + updatesFromDB[tempId] = StorageInstrument{ + FirstUpdate: time.Unix(firstUpdUnix, 0), + LastUpdate: time.Unix(lastUpdUnix, 0), } - instrument.LastUpdate = time.Unix(lastUpdUnix, 0) - c.instruments[tempId] = instrument } - //for id, candles := range c.instruments { - // err := c.db.Get(&lastUpdUnix, `select max(time) from candles where instrument_uid=?`, id) - // if err != nil { - // return err - // } - // candles.LastUpdate = time.Unix(lastUpdUnix, 0) - // c.instruments[id] = candles - //} - return nil + return updatesFromDB, nil } -// uniqueInstruments - Метод возвращает мапу с уникальными значениями uid инструментов в бд -func (c *CandlesStorage) uniqueInstruments() (map[string]struct{}, error) { - instruments := make([]string, 0) - err := c.db.Select(&instruments, `select distinct instrument_id from updates`) - if err != nil { - return nil, err - } - m := make(map[string]struct{}) - for _, instrument := range instruments { - m[instrument] = struct{}{} - } - c.logger.Infof("got %v unique instruments from storage", len(m)) - return m, nil -} +//// uniqueInstruments - Метод возвращает мапу с уникальными значениями uid инструментов в бд +//func (c *CandlesStorage) uniqueInstruments() (map[string]struct{}, error) { +// instruments := make([]string, 0) +// err := c.db.Select(&instruments, `select distinct instrument_id from updates`) +// if err != nil { +// return nil, err +// } +// m := make(map[string]struct{}) +// for _, instrument := range instruments { +// m[instrument] = struct{}{} +// } +// c.logger.Infof("got %v unique instruments from storage", len(m)) +// return m, nil +//} // initDB - Инициализация бд func (c *CandlesStorage) initDB(path string) (*sqlx.DB, error) { @@ -334,42 +355,52 @@ func (c *CandlesStorage) initDB(path string) (*sqlx.DB, error) { // storeCandlesInDB - Сохранение исторических свечей инструмента в бд func (c *CandlesStorage) storeCandlesInDB(uid string, update time.Time, hc []*pb.HistoricCandle) error { - tx, err := c.db.Begin() - if err != nil { - return err - } - - insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) - values (?, ?, ?, ?, ?, ?, ?, ?) `) - if err != nil { - return err - } - defer func() { - if err := insertCandle.Close(); err != nil { - c.logger.Errorf(err.Error()) + err := func() error { + tx, err := c.db.Begin() + if err != nil { + return err } - }() - - for _, candle := range hc { - _, err := insertCandle.Exec(uid, - candle.GetOpen().ToFloat(), - candle.GetClose().ToFloat(), - candle.GetHigh().ToFloat(), - candle.GetLow().ToFloat(), - candle.GetVolume(), - candle.GetTime().AsTime().Unix(), - candle.GetIsComplete()) + defer func() { + if err := tx.Commit(); err != nil { + c.logger.Errorf(err.Error()) + } + }() + insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete) + values (?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return err } - } - - if err := tx.Commit(); err != nil { + defer func() { + if err := insertCandle.Close(); err != nil { + c.logger.Errorf(err.Error()) + } + }() + + for _, candle := range hc { + _, err := insertCandle.Exec(uid, + candle.GetOpen().ToFloat(), + candle.GetClose().ToFloat(), + candle.GetHigh().ToFloat(), + candle.GetLow().ToFloat(), + candle.GetVolume(), + candle.GetTime().AsTime().Unix(), + candle.GetIsComplete()) + if err != nil { + if errors.As(err, &sqlite3.Error{}) { + continue + } else { + return err + } + } + } + return nil + }() + if err != nil { return err } - // записываем в базу время последнего обновления - _, err = c.db.Exec(`insert or replace into updates(instrument_id, time) values (?, ?)`, uid, update.Unix()) + _, err = c.db.Exec(`insert or replace into updates(instrument_id, first_time, last_time) values (?, ?, ?)`, + uid, c.instruments[uid].FirstUpdate.Unix(), update.Unix()) if err != nil { return err } diff --git a/investgo/marketdata.go b/investgo/marketdata.go index 27d2d1e..8cdcfb8 100644 --- a/investgo/marketdata.go +++ b/investgo/marketdata.go @@ -168,7 +168,10 @@ func (md *MarketDataServiceClient) GetHistoricCandles(req *GetHistoricCandlesReq if err != nil { return nil, err } - candles = append(candles, resp.GetCandles()...) + if len(resp.GetCandles()) < 1 { + continue + } + candles = append(candles, resp.GetCandles()[1:]...) if requests == 299 { if md.config.DisableResourceExhaustedRetry { time.Sleep(time.Minute)