Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Description of the bug #1007

Open
txbxxx opened this issue Jan 15, 2025 · 0 comments
Open

[BUG] Description of the bug #1007

txbxxx opened this issue Jan 15, 2025 · 0 comments
Assignees
Labels
bug Something isn't working

Comments

@txbxxx
Copy link

txbxxx commented Jan 15, 2025

Why do I register these scheduled tasks in asynq, and can see them using asynq cron ls, but they are not displayed in the task?.

package main

import (
	"flag"
	"time"

	"github.com/hibiken/asynq"
	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/logx"

	"violet/apps/gtask/internal/config"
	// "violet/apps/gtask/internal/task/test_gtask"
	"violet/apps/gtask/internal/task/transaction_task"
	"violet/apps/gtask/internal/types"
)

var configFile = flag.String("f", "apps/stask/etc/stask.yaml", "the config file")

func init() {
	time.Local = time.UTC
}

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c, conf.UseEnv())
	logx.MustSetup(logx.LogConf{
		ServiceName: c.ServerName,
		Path:        c.LogPath,
		Mode:        c.LogMode,
		Encoding:    c.LogEncoding,
	})
	defer logx.Close()

	scheduler := asynq.NewScheduler(c.Redis.ToAsynqConffig(), &asynq.SchedulerOpts{})

	symbolTasks := transaction_task.NewTransactionTaskFactory()

	_, err := scheduler.Register("@every 60m", symbolTasks.CreatePullSymbolAllTask("test"), asynq.TaskID("symbol"), asynq.Queue(types.TransactionQueue))
	if err != nil {
		logx.Errorf("注册拉取品种任务失败,err:%v", err)
		panic(err)
	}

	// 拉取取交易组
	if _, err = scheduler.Register("@every 30s", symbolTasks.CreatePullRealGroupAllTask("test", true), asynq.TaskID("real_group"), asynq.Queue(types.TransactionQueue)); err != nil {
		logx.Errorf("注册任务失败,err:%v", err)
		panic(err)
	}


	if _, err = scheduler.Register("@every 60m", symbolTasks.CreatePullNoRealGroupAllTask("test", false), asynq.TaskID("no_real_group"), asynq.Queue(types.TransactionQueue)); err != nil {
		logx.Errorf("注册任务失败,err:%v", err)
		panic(err)
	}

	// 拉取品种组
	if _, err := scheduler.Register("@every 60m", symbolTasks.CreatePullSymbolGroupAllTask("test"), asynq.TaskID("real_group"), asynq.Queue(types.TransactionQueue)); err != nil {
		logx.Errorf("注册拉取品种组任务失败,err:%v", err)
		panic(err)
	}

	if err := scheduler.Run(); err != nil {
		logx.Error(err)
		panic(err)
	}
}
EntryID                               Spec        Type                                   Payload                             Options                 Next       Prev
-------                               ----        ----                                   -------                             -------                 ----       ----
55c65ea0-5cac-4962-8af3-ad3482393ca3  @every 1m   sandy:gtask:transaction_real_group     {"platform":"test","is_real":true}   [Queue("transaction")]  In 39s     21s ago
8bed2e24-5a5a-41fe-9dee-0815eb50d2bc  @every 60m  sandy:gtask:transaction_symbol_group   {"platform":"test","is_real":false}  [Queue("transaction")]  In 53m39s  N/A
3f28af36-767d-4275-a0bd-7b7641a30da5  @every 60m  sandy:gtask:transaction_no_real_group  {"platform":"test","is_real":false}  [Queue("transaction")]  In 53m39s  N/A
e036fb0e-25f3-4f58-a3d4-b13db098b7b6  @every 60m  sandy:gtask:transaction_symbol         {"platform":"test","is_real":false}  [Queue("transaction")]  In 53m39s  N/A

worker code:

package main

import (
	"flag"
	"time"

	"github.com/hibiken/asynq"
	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/logx"

	"violet/apps/gtask/internal/config"
	"violet/apps/gtask/internal/handler"
	"violet/apps/gtask/internal/svc"
)

var configFile = flag.String("f", "apps/gtask/etc/gtask.yaml", "the config file")

func init() {
	time.Local = time.UTC
}

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c, conf.UseEnv())
	logx.MustSetup(logx.LogConf{
		ServiceName: c.ServerName,
		Path:        c.LogPath,
		Mode:        c.LogMode,
		Encoding:    c.LogEncoding,
	})
	defer logx.Close()

	srv := asynq.NewServer(
		c.Redis.ToAsynqConffig(),
		asynq.Config{
			Concurrency: 10, 
			Queues: map[string]int{
				"default":     6,
				"transaction": 8,
			},
		},
	)
	ctx := svc.NewServiceContext(c)
	mux := asynq.NewServeMux()
	handler.RegisterHandlers(mux, ctx)
	if err := srv.Run(mux); err != nil {
		logx.Error(err)
		panic(err)
	}
}

公共参数设置任务类型和队列

package types

// 任务类型
const (
	GTaskTypeTest = "sandy:gtask:test"

	GTasksTransactionPullSymbolAll      = "sandy:gtask:transaction_symbol"        
	GTasksTransactionPullRealGroupAll   = "sandy:gtask:transaction_real_group"  
	GTasksTransactionPullNoRealGroupAll = "sandy:gtask:transaction_no_real_group" 
	GTasksTransactionPullSymbolGroupAll = "sandy:gtask:transaction_symbol_group" 
)

// 队列
const (
	TransactionQueue = "transaction"
)

注册handler文件

func RegisterHandlers(mux *asynq.ServeMux, serverCtx *svc.ServiceContext) {
	mux.HandleFunc(types.GTaskTypeTest, test_gtask.TestGtaskHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullSymbolAll, transaction.GetTradeSymbolAllHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullRealGroupAll, transaction.GetTradeGroupAllHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullNoRealGroupAll, transaction.GetTradeGroupAllHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullSymbolGroupAll, transaction.GetTradeSymbolGroupAllHandler(serverCtx))
}

This is why my cron is started, the consumer side is also started, but the consumer side cannot get the cron queued tasks, or cron has no queued tasks at all

Using the following command, no queued tasks are found

via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack)
➜ asynq queue inspect transaction
Queue Info
Name:   transaction
Size:   0
Groups: 0
Paused: false

Task Count by State
active  pending  aggregating  scheduled  retry  archived  completed
------  -------  -----------  ---------  -----  --------  ---------
0       0        0            0          0      0         0

Daily Stats 2025-01-14 UTC
processed  failed  error rate
---------  ------  ----------
198        22      11.11%

os: macos
asynq version 0.25.0

@txbxxx txbxxx added the bug Something isn't working label Jan 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants