Skip to content

Commit

Permalink
style: add retry log
Browse files Browse the repository at this point in the history
  • Loading branch information
Breeze0806 committed May 13, 2024
1 parent 76c5ec4 commit 8898496
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions datax/plugin/writer/dbms/batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,16 @@ func (b *BaseBatchWriter) BatchTimeout() time.Duration {

// BatchWrite - The process of writing data in batches.
func (b *BaseBatchWriter) BatchWrite(ctx context.Context, records []element.Record) (err error) {
if b.strategy != nil {
retry := schedule.NewRetryTask(ctx, b.strategy, newWriteTask(func() error {
return b.batchWrite(ctx, records)
}))
err = retry.Do()
}
retry := schedule.NewRetryTask(ctx, b.strategy, newWriteTask(func() error {
return b.batchWriteWithLog(ctx, records, "")
}))
err = retry.Do()

if b.judger != nil {
if b.judger.ShouldOneByOne(err) {
for _, r := range records {
for i := range records {
retry := schedule.NewRetryTask(ctx, b.strategy, newWriteTask(func() error {
return b.batchWrite(ctx, []element.Record{r})
return b.batchWriteWithLog(ctx, []element.Record{records[i]}, "one by one")
}))
err = retry.Do()
if b.Task.Config.IgnoreOneByOneError() {
Expand Down Expand Up @@ -150,6 +148,14 @@ func (b *BaseBatchWriter) batchWrite(ctx context.Context, records []element.Reco
return b.Task.Execer.BatchExec(ctx, b.opts)
}

func (b *BaseBatchWriter) batchWriteWithLog(ctx context.Context, records []element.Record, msg string) (err error) {
if err = b.batchWrite(ctx, records); err != nil {
log.Debugf("jobID: %v taskgroupID:%v taskID: %v batchWrite(%v) %v error: %+v",
b.JobID(), b.TaskGroupID(), b.TaskID(), records, msg, err)
}
return
}

type writeTask struct {
do func() error
}
Expand Down

0 comments on commit 8898496

Please sign in to comment.