Skip to content

Commit

Permalink
Merge pull request #17 from Tinkoff/dev
Browse files Browse the repository at this point in the history
fix storage
  • Loading branch information
jstalex authored Jul 31, 2023
2 parents d3ed955 + 54e8b80 commit 4f8fbb5
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 114 deletions.
4 changes: 4 additions & 0 deletions examples/interval_bot/cmd/backtest/backtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func main() {
logger.Infof("got %v instruments", len(instrumentIds))
// Добавляем инструменты в конфиг
intervalConfig.Instruments = instrumentIds
// initDate не может быть раньше StorageFromTime
if initDate.Before(intervalConfig.StorageFromTime) {
intervalConfig.StorageFromTime = initDate
}
// создание интервального бота
intervalBot, err := bot.NewBot(ctx, client, intervalConfig)
if err != nil {
Expand Down
85 changes: 50 additions & 35 deletions examples/interval_bot/cmd/candles_downloader/download_candles.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"errors"
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
_ "github.com/mattn/go-sqlite3"
"github.com/schollz/progressbar/v3"
"github.com/tinkoff/invest-api-go-sdk/investgo"
Expand Down Expand Up @@ -162,11 +164,11 @@ func main() {

logger.Infof("got %v candles for %v", len(candles), id)

err = storeCandlesInDB(db, id, now, candles)
err = storeCandlesInDB(db, id, FROM, now, candles)
if err != nil {
logger.Errorf(err.Error())
}
logger.Infof("store in db complete candle %v/%v", i+1, len(instrumentIds))
logger.Infof("store in db complete instrument %v/%v", i+1, len(instrumentIds))
err = bar.Add(1)
if err != nil {
logger.Errorf(err.Error())
Expand All @@ -177,20 +179,22 @@ func main() {

var schema = `
create table if not exists candles (
id integer primary key autoincrement,
instrument_uid text,
id integer primary key autoincrement,
instrument_uid text,
open real,
close real,
high real,
low real,
volume integer,
time integer,
is_complete integer
is_complete integer,
unique (instrument_uid, time)
);
create table if not exists updates (
instrument_id text unique,
time integer
instrument_id text unique,
first_time integer,
last_time integer
);
`

Expand All @@ -211,43 +215,54 @@ func initDB(path string) (*sqlx.DB, error) {
}

// storeCandlesInDB - Сохранение исторических свечей инструмента в бд
func storeCandlesInDB(db *sqlx.DB, uid string, update time.Time, hc []*pb.HistoricCandle) error {
tx, err := db.Begin()
if err != nil {
return err
}

insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete)
values (?, ?, ?, ?, ?, ?, ?, ?) `)
if err != nil {
return err
}
defer func() {
if err := insertCandle.Close(); err != nil {
log.Printf(err.Error())
func storeCandlesInDB(db *sqlx.DB, uid string, first, last time.Time, hc []*pb.HistoricCandle) error {
err := func() error {
tx, err := db.Begin()
if err != nil {
return err
}
}()

for _, candle := range hc {
_, err := insertCandle.Exec(uid,
candle.GetOpen().ToFloat(),
candle.GetClose().ToFloat(),
candle.GetHigh().ToFloat(),
candle.GetLow().ToFloat(),
candle.GetVolume(),
candle.GetTime().AsTime().Unix(),
candle.GetIsComplete())
defer func() {
if err = tx.Commit(); err != nil {
log.Printf(err.Error())
}
}()

insertCandle, err := tx.Prepare(`insert into candles (instrument_uid, open, close, high, low, volume, time, is_complete)
values (?, ?, ?, ?, ?, ?, ?, ?) `)
if err != nil {
return err
}
}
defer func() {
if err := insertCandle.Close(); err != nil {
log.Printf(err.Error())
}
}()

if err := tx.Commit(); err != nil {
for _, candle := range hc {
_, err := insertCandle.Exec(uid,
candle.GetOpen().ToFloat(),
candle.GetClose().ToFloat(),
candle.GetHigh().ToFloat(),
candle.GetLow().ToFloat(),
candle.GetVolume(),
candle.GetTime().AsTime().Unix(),
candle.GetIsComplete())
if err != nil {
if errors.As(err, &sqlite3.Error{}) {
continue
} else {
return err
}
}
}
return nil
}()
if err != nil {
return err
}

// записываем в базу время последнего обновления
_, err = db.Exec(`insert or replace into updates(instrument_id, time) values (?, ?)`, uid, update.Unix())
_, err = db.Exec(`insert or replace into updates(instrument_id, first_time, last_time) values (?, ?, ?)`, uid, first.Unix(), last.Unix())
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions examples/interval_bot/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func main() {

// передаем инструменты в конфиг
intervalConfig.Instruments = instrumentIds
// запрашиваемое время для свечей не может быть раньше StorageFromTime
if time.Now().Add(-time.Hour * 24 * time.Duration(intervalConfig.DaysToCalculateInterval)).Before(intervalConfig.StorageFromTime) {
intervalConfig.StorageFromTime = time.Now().Add(-time.Hour * 24 * time.Duration(intervalConfig.DaysToCalculateInterval))
}

// создание интервального бота
intervalBot, err := bot.NewBot(ctx, client, intervalConfig)
Expand Down
2 changes: 1 addition & 1 deletion examples/interval_bot/internal/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func NewBot(ctx context.Context, client *investgo.Client, config IntervalStrateg
instrumentsForStorage[instrument] = StorageInstrument{
CandleInterval: config.StorageCandleInterval,
PriceStep: resp.GetInstrument().GetMinPriceIncrement(),
LastUpdate: config.StorageFromTime,
FirstUpdate: config.StorageFromTime,
ticker: resp.GetInstrument().GetTicker(),
}
}
Expand Down
Loading

0 comments on commit 4f8fbb5

Please sign in to comment.