From 8de233c0b9cf5418b561d45a29c89086a9c37449 Mon Sep 17 00:00:00 2001 From: Min Min <37430175+jamsman94@users.noreply.github.com> Date: Fri, 3 Jan 2025 17:32:31 +0800 Subject: [PATCH 1/8] fix production sse log authorization problem (#3940) * fix production sse log authorization problem Signed-off-by: Min Min * remove &merge apis Signed-off-by: Min Min --------- Signed-off-by: Min Min --- .../aslan/core/log/handler/router.go | 1 - .../aslan/core/log/handler/sse.go | 83 ++++++++----------- 2 files changed, 34 insertions(+), 50 deletions(-) diff --git a/pkg/microservice/aslan/core/log/handler/router.go b/pkg/microservice/aslan/core/log/handler/router.go index 8fa82633d4..5a517430a3 100644 --- a/pkg/microservice/aslan/core/log/handler/router.go +++ b/pkg/microservice/aslan/core/log/handler/router.go @@ -38,7 +38,6 @@ func (*Router) Inject(router *gin.RouterGroup) { sse := router.Group("sse") { sse.GET("/pods/:podName/containers/:containerName", GetContainerLogsSSE) - sse.GET("/production/pods/:podName/containers/:containerName", GetProductionEnvContainerLogsSSE) sse.GET("/testing/:test_name/tasks/:task_id", GetTestingContainerLogsSSE) sse.GET("/scanning/:id/task/:scan_id", GetScanningContainerLogsSSE) sse.GET("/v4/workflow/:workflowName/:taskID/:jobName/:lines", GetWorkflowJobContainerLogsSSE) diff --git a/pkg/microservice/aslan/core/log/handler/sse.go b/pkg/microservice/aslan/core/log/handler/sse.go index 5123e08af1..b4e5a6f9ed 100644 --- a/pkg/microservice/aslan/core/log/handler/sse.go +++ b/pkg/microservice/aslan/core/log/handler/sse.go @@ -55,69 +55,54 @@ func GetContainerLogsSSE(c *gin.Context) { envName := c.Query("envName") productName := c.Query("projectName") + isProduction := c.Query("production") == "true" - // authorization checks - if !ctx.Resources.IsSystemAdmin { - if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { - ctx.UnAuthorized = true - internalhandler.JSONResponse(c, ctx) - return - } - if !ctx.Resources.ProjectAuthInfo[productName].Env.View && - !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { - permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.EnvActionView) - if err != nil || !permitted { + if !isProduction { + // authorization checks + if !ctx.Resources.IsSystemAdmin { + if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { ctx.UnAuthorized = true internalhandler.JSONResponse(c, ctx) return } + if !ctx.Resources.ProjectAuthInfo[productName].Env.View && + !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { + permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.EnvActionView) + if err != nil || !permitted { + ctx.UnAuthorized = true + internalhandler.JSONResponse(c, ctx) + return + } + } } - } - - internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { - logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) - }, logger) -} -func GetProductionEnvContainerLogsSSE(c *gin.Context) { - logger := ginzap.WithContext(c).Sugar() - ctx, err := internalhandler.NewContextWithAuthorization(c) - if err != nil { - ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err) - ctx.UnAuthorized = true - internalhandler.JSONResponse(c, ctx) - return - } - - tails, err := strconv.ParseInt(c.Query("tails"), 10, 64) - if err != nil { - tails = int64(10) - } - - envName := c.Query("envName") - productName := c.Query("projectName") - - // authorization checks - if !ctx.Resources.IsSystemAdmin { - if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { - ctx.UnAuthorized = true - internalhandler.JSONResponse(c, ctx) - return - } - if !ctx.Resources.ProjectAuthInfo[productName].ProductionEnv.View && - !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { - permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.ProductionEnvActionView) - if err != nil || !permitted { + internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { + logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) + }, logger) + } else { + // authorization checks + if !ctx.Resources.IsSystemAdmin { + if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { ctx.UnAuthorized = true internalhandler.JSONResponse(c, ctx) return } + if !ctx.Resources.ProjectAuthInfo[productName].ProductionEnv.View && + !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { + permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.ProductionEnvActionView) + if err != nil || !permitted { + ctx.UnAuthorized = true + internalhandler.JSONResponse(c, ctx) + return + } + } } + + internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { + logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) + }, logger) } - internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { - logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) - }, logger) } func GetWorkflowJobContainerLogsSSE(c *gin.Context) { From 683ca876fd4ab73931d01cd3458a0601e80910b3 Mon Sep 17 00:00:00 2001 From: Min Min <37430175+jamsman94@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:31:48 +0800 Subject: [PATCH 2/8] change get build detail openAPI logic (#3944) * change get build detail openAPI logic Signed-off-by: Min Min * change logic Signed-off-by: Min Min --------- Signed-off-by: Min Min --- .../aslan/core/build/handler/openapi.go | 5 ++- .../aslan/core/build/service/openapi.go | 45 ++++++++++++++----- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pkg/microservice/aslan/core/build/handler/openapi.go b/pkg/microservice/aslan/core/build/handler/openapi.go index eadd7eeccd..e005a67d46 100644 --- a/pkg/microservice/aslan/core/build/handler/openapi.go +++ b/pkg/microservice/aslan/core/build/handler/openapi.go @@ -217,6 +217,9 @@ func OpenAPIGetBuildModule(c *gin.Context) { return } + serviceName := c.Query("serviceName") + serviceModule := c.Query("serviceModule") + // authorization checks if !ctx.Resources.IsSystemAdmin { if _, ok := ctx.Resources.ProjectAuthInfo[projectKey]; !ok { @@ -230,5 +233,5 @@ func OpenAPIGetBuildModule(c *gin.Context) { } } - ctx.Resp, ctx.RespErr = buildservice.OpenAPIGetBuildModule(name, projectKey, ctx.Logger) + ctx.Resp, ctx.RespErr = buildservice.OpenAPIGetBuildModule(name, serviceName, serviceModule, projectKey, ctx.Logger) } diff --git a/pkg/microservice/aslan/core/build/service/openapi.go b/pkg/microservice/aslan/core/build/service/openapi.go index a6f411154c..7912d816e5 100644 --- a/pkg/microservice/aslan/core/build/service/openapi.go +++ b/pkg/microservice/aslan/core/build/service/openapi.go @@ -316,7 +316,7 @@ func OpenAPIListBuildModules(projectName string, pageNum, pageSize int64, logger return resp, nil } -func OpenAPIGetBuildModule(name, projectName string, logger *zap.SugaredLogger) (*OpenAPIBuildDetailResp, error) { +func OpenAPIGetBuildModule(name, serviceName, serviceModule, projectName string, logger *zap.SugaredLogger) (*OpenAPIBuildDetailResp, error) { opt := &commonrepo.BuildFindOption{ Name: name, ProductName: projectName, @@ -348,18 +348,39 @@ func OpenAPIGetBuildModule(name, projectName string, logger *zap.SugaredLogger) } resp.Repos = make([]*OpenAPIRepo, 0) - for _, rp := range build.Repos { - repo := &OpenAPIRepo{ - RepoName: rp.RepoName, - Branch: rp.Branch, - Source: rp.Source, - RepoOwner: rp.RepoOwner, - RemoteName: rp.RemoteName, - CheckoutPath: rp.CheckoutPath, - Submodules: rp.SubModules, - Hidden: rp.Hidden, + if serviceName == "" || serviceModule == "" || build.TemplateID == "" { + for _, rp := range build.Repos { + repo := &OpenAPIRepo{ + RepoName: rp.RepoName, + Branch: rp.Branch, + Source: rp.Source, + RepoOwner: rp.RepoOwner, + RemoteName: rp.RemoteName, + CheckoutPath: rp.CheckoutPath, + Submodules: rp.SubModules, + Hidden: rp.Hidden, + } + resp.Repos = append(resp.Repos, repo) + } + } else { + for _, svcBuild := range build.Targets { + if svcBuild.ServiceName == serviceName && svcBuild.ServiceModule == serviceModule { + for _, rp := range svcBuild.Repos { + repo := &OpenAPIRepo{ + RepoName: rp.RepoName, + Branch: rp.Branch, + Source: rp.Source, + RepoOwner: rp.RepoOwner, + RemoteName: rp.RemoteName, + CheckoutPath: rp.CheckoutPath, + Submodules: rp.SubModules, + Hidden: rp.Hidden, + } + resp.Repos = append(resp.Repos, repo) + } + break + } } - resp.Repos = append(resp.Repos, repo) } resp.TargetServices = make([]*commonmodels.ServiceWithModule, 0) From 7a6dfe9b750ab3df9a5114ae57b47bd04d484834 Mon Sep 17 00:00:00 2001 From: Petrus Date: Tue, 7 Jan 2025 14:00:46 +0800 Subject: [PATCH 3/8] fix panic if image not empty and not rednered in notification (#3946) Signed-off-by: Patrick Zhao --- .../aslan/core/common/service/instantmessage/workflow_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go b/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go index 6333db2150..ada45f078e 100644 --- a/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go +++ b/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go @@ -566,7 +566,7 @@ func (w *Service) getNotificationContent(notify *models.NotifyCtl, task *models. } } } - if image != "" { + if image != "" && !strings.HasPrefix(image, "{{.") && !strings.Contains(image, "}}") { jobTplcontent += fmt.Sprintf("{{if eq .WebHookType \"dingding\"}}##### {{end}}**镜像信息**:%s \n", image) mailJobTplcontent += fmt.Sprintf("镜像信息:%s \n", image) workflowNotifyJobTaskSpec.Image = image From 03ac0e7d974285555f4ed060c1097a108cba321f Mon Sep 17 00:00:00 2001 From: Petrus Date: Wed, 8 Jan 2025 17:46:55 +0800 Subject: [PATCH 4/8] v3.2.1 (#3937) * support list release plan by update time Signed-off-by: Patrick Zhao * add 321 ua Signed-off-by: Patrick Zhao * update third part cron lib to support year in cron service Signed-off-by: Patrick Zhao * update 321 ua Signed-off-by: Patrick Zhao * add sort by update time in list release plan Signed-off-by: Patrick Zhao * improve code Signed-off-by: Patrick Zhao * add failback mechanism in case missed schedule time if cron restarted Signed-off-by: Patrick Zhao --------- Signed-off-by: Patrick Zhao --- go.mod | 27 +- go.sum | 48 +- pkg/cli/upgradeassistant/cmd/migrate/321.go | 92 +++ pkg/microservice/aslan/config/consts.go | 2 + .../core/common/repository/models/cronjob.go | 1 + .../core/common/repository/models/workflow.go | 1 + .../common/repository/mongodb/release_plan.go | 26 +- .../aslan/core/common/service/cronjob.go | 13 +- .../aslan/core/cron/handler/cron.go | 3 + .../core/environment/service/environment.go | 3 +- .../core/release_plan/handler/release_plan.go | 2 +- .../core/release_plan/service/release_plan.go | 246 +++++--- .../core/release_plan/service/watcher.go | 2 +- pkg/microservice/aslan/core/service.go | 16 +- .../workflow/service/workflow/workflow_v4.go | 1 + pkg/microservice/cron/core/service/cronjob.go | 14 +- .../core/service/scheduler/cronjob_handler.go | 540 +++++++++--------- .../core/service/scheduler/schedule_env.go | 4 +- .../service/scheduler/schedule_env_update.go | 2 +- .../service/scheduler/schedule_workflow.go | 60 -- .../cron/core/service/scheduler/scheduler.go | 36 +- pkg/microservice/cron/core/service/types.go | 3 + pkg/setting/consts.go | 1 + 23 files changed, 688 insertions(+), 455 deletions(-) create mode 100644 pkg/cli/upgradeassistant/cmd/migrate/321.go diff --git a/go.mod b/go.mod index 382f7c2975..e9a3958022 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/docker/go-connections v0.4.0 github.com/envoyproxy/go-control-plane v0.10.3 github.com/gin-gonic/gin v1.9.1 - github.com/go-co-op/gocron v1.17.0 + github.com/go-co-op/gocron/v2 v2.14.0 github.com/go-ldap/ldap/v3 v3.3.0 github.com/go-redsync/redsync/v4 v4.11.0 github.com/go-resty/resty/v2 v2.7.0 @@ -81,7 +81,7 @@ require ( github.com/shirou/gopsutil/v3 v3.22.8 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.8.1 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/swaggo/files v0.0.0-20220728132757-551d4a08d97a github.com/swaggo/gin-swagger v1.5.3 github.com/swaggo/swag v1.16.3 @@ -90,16 +90,17 @@ require ( github.com/xanzy/go-gitlab v0.73.1 go.mongodb.org/mongo-driver v1.10.2 go.uber.org/zap v1.25.0 - golang.org/x/crypto v0.23.0 - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 - golang.org/x/net v0.25.0 + golang.org/x/crypto v0.24.0 + golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 + golang.org/x/net v0.26.0 golang.org/x/oauth2 v0.5.0 - golang.org/x/sync v0.6.0 - golang.org/x/text v0.15.0 + golang.org/x/sync v0.7.0 + golang.org/x/text v0.16.0 google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 google.golang.org/grpc v1.53.0 google.golang.org/protobuf v1.30.0 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df + gopkg.in/ini.v1 v1.67.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 @@ -213,7 +214,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/flatbuffers v1.12.1 // indirect - github.com/google/go-cmp v0.5.9 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-github/v29 v29.0.2 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -232,6 +233,7 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect + github.com/jonboulle/clockwork v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kennygrant/sanitize v1.2.4 // indirect @@ -319,17 +321,16 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/image v0.0.0-20190802002840-cff245a6509b // indirect - golang.org/x/mod v0.16.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/term v0.20.0 // indirect + golang.org/x/mod v0.18.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/term v0.21.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.19.0 // indirect + golang.org/x/tools v0.22.0 // indirect gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect google.golang.org/api v0.110.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect k8s.io/apiserver v0.27.7 // indirect k8s.io/component-base v0.27.7 // indirect diff --git a/go.sum b/go.sum index 5ea75c7e3c..d228732592 100644 --- a/go.sum +++ b/go.sum @@ -393,8 +393,8 @@ github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SU github.com/go-asn1-ber/asn1-ber v1.5.1 h1:pDbRAunXzIUXfx4CB2QJFv5IuPiuoW+sWvr/Us009o8= github.com/go-asn1-ber/asn1-ber v1.5.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= github.com/go-bindata/go-bindata v1.0.1-0.20190711162640-ee3c2418e368/go.mod h1:7xCgX1lzlrXPHkfvn3EhumqHkmSlzt8at9q7v0ax19c= -github.com/go-co-op/gocron v1.17.0 h1:IixLXsti+Qo0wMvmn6Kmjp2csk2ykpkcL+EmHmST18w= -github.com/go-co-op/gocron v1.17.0/go.mod h1:IpDBSaJOVfFw7hXZuTag3SCSkqazXBBUkbQ1m1aesBs= +github.com/go-co-op/gocron/v2 v2.14.0 h1:bWPJeIdd4ioqiEpLLD1BVSTrtae7WABhX/WaVJbKVqg= +github.com/go-co-op/gocron/v2 v2.14.0/go.mod h1:ZF70ZwEqz0OO4RBXE1sNxnANy/zvwLcattWEFsqpKig= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -560,8 +560,9 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github/v29 v29.0.2 h1:opYN6Wc7DOz7Ku3Oh4l7prmkOMwEcQxpFtxdU8N8Pts= github.com/google/go-github/v29 v29.0.2/go.mod h1:CHKiKKPHJ0REzfwc14QMklvtHwCveD0PxlMjLlzAM5E= github.com/google/go-github/v35 v35.3.0 h1:fU+WBzuukn0VssbayTT+Zo3/ESKX9JYWjbZTLOTEyho= @@ -701,6 +702,8 @@ github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.2.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -1087,8 +1090,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= @@ -1207,8 +1210,8 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -1254,8 +1257,9 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1266,8 +1270,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190501045829-6d32002ffd75/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4= @@ -1298,8 +1302,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= -golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= +golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1364,8 +1368,9 @@ golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1396,8 +1401,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1491,8 +1496,9 @@ golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1506,8 +1512,9 @@ golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= -golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= +golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1523,8 +1530,9 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1595,8 +1603,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= -golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= +golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/cli/upgradeassistant/cmd/migrate/321.go b/pkg/cli/upgradeassistant/cmd/migrate/321.go new file mode 100644 index 0000000000..7bbe4db04a --- /dev/null +++ b/pkg/cli/upgradeassistant/cmd/migrate/321.go @@ -0,0 +1,92 @@ +/* + * Copyright 2024 The KodeRover Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package migrate + +import ( + "fmt" + "time" + + "github.com/koderover/zadig/v2/pkg/cli/upgradeassistant/internal/upgradepath" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + "github.com/koderover/zadig/v2/pkg/shared/handler" + "go.mongodb.org/mongo-driver/bson" +) + +func init() { + upgradepath.RegisterHandler("3.2.0", "3.2.1", V320ToV321) + upgradepath.RegisterHandler("3.2.1", "3.2.0", V321ToV320) +} + +func V320ToV321() error { + ctx := handler.NewBackgroupContext() + + ctx.Logger.Infof("-------- start migrate release plan cronjob --------") + err := migrateReleasePlanCron(ctx) + if err != nil { + err = fmt.Errorf("failed to migrate release plan cronjob, error: %w", err) + ctx.Logger.Error(err) + return err + } + + return nil +} + +func V321ToV320() error { + return nil +} + +func migrateReleasePlanCron(ctx *handler.Context) error { + // delete all release plan cronjob first + _, err := commonrepo.NewCronjobColl().DeleteMany(ctx, + bson.M{"type": "release_plan"}, + ) + if err != nil { + return fmt.Errorf("failed to delete release plan cronjobs, error: %w", err) + } + + releasePlans, _, err := commonrepo.NewReleasePlanColl().ListByOptions(&commonrepo.ListReleasePlanOption{}) + if err != nil { + return fmt.Errorf("failed to list release plans, error: %w", err) + } + + // create new cronjob for release plan if schedule time is after now and status is executing + for _, releasePlan := range releasePlans { + if releasePlan.ScheduleExecuteTime != 0 && releasePlan.Status == config.StatusExecuting { + if time.Unix(releasePlan.ScheduleExecuteTime, 0).After(time.Now()) { + cronjob := &commonmodels.Cronjob{ + Enabled: true, + Name: releasePlan.Name, + Type: "release_plan", + JobType: string(config.UnixstampSchedule), + UnixStamp: releasePlan.ScheduleExecuteTime, + ReleasePlanArgs: &commonmodels.ReleasePlanArgs{ + ID: releasePlan.ID.Hex(), + Name: releasePlan.Name, + Index: releasePlan.Index, + }, + } + if err := commonrepo.NewCronjobColl().Upsert(cronjob); err != nil { + return fmt.Errorf("failed to create new release plan schedule job, error: %w", err) + } + } + } + } + + return nil +} diff --git a/pkg/microservice/aslan/config/consts.go b/pkg/microservice/aslan/config/consts.go index be0570b6ff..c219e3a16b 100644 --- a/pkg/microservice/aslan/config/consts.go +++ b/pkg/microservice/aslan/config/consts.go @@ -58,6 +58,8 @@ const ( TimingSchedule ScheduleType = "timing" // GapSchedule 间隔循环 GapSchedule ScheduleType = "gap" + // UnixstampSchedule 时间戳 + UnixstampSchedule ScheduleType = "unixstamp" ) type SlackNotifyType string diff --git a/pkg/microservice/aslan/core/common/repository/models/cronjob.go b/pkg/microservice/aslan/core/common/repository/models/cronjob.go index 0485632778..e756dae479 100644 --- a/pkg/microservice/aslan/core/common/repository/models/cronjob.go +++ b/pkg/microservice/aslan/core/common/repository/models/cronjob.go @@ -25,6 +25,7 @@ type Cronjob struct { Name string `bson:"name" json:"name"` Type string `bson:"type" json:"type"` Number uint64 `bson:"number" json:"number"` + UnixStamp int64 `bson:"unix_stamp" json:"unix_stamp"` Frequency string `bson:"frequency" json:"frequency"` Time string `bson:"time" json:"time"` Cron string `bson:"cron" json:"cron"` diff --git a/pkg/microservice/aslan/core/common/repository/models/workflow.go b/pkg/microservice/aslan/core/common/repository/models/workflow.go index 5e3375509c..c6d047a5de 100644 --- a/pkg/microservice/aslan/core/common/repository/models/workflow.go +++ b/pkg/microservice/aslan/core/common/repository/models/workflow.go @@ -121,6 +121,7 @@ type Schedule struct { ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"` Number uint64 `bson:"number" json:"number"` Frequency string `bson:"frequency" json:"frequency"` + UnixStamp int64 `bson:"unix_stamp" json:"unix_stamp"` Time string `bson:"time" json:"time"` MaxFailures int `bson:"max_failures,omitempty" json:"max_failures,omitempty"` TaskArgs *TaskArgs `bson:"task_args,omitempty" json:"task_args,omitempty"` diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go b/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go index c07e4094b3..beff221f23 100644 --- a/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go +++ b/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go @@ -71,6 +71,10 @@ func (c *ReleasePlanColl) EnsureIndex(ctx context.Context) error { Keys: bson.M{"success_time": 1}, Options: options.Index().SetUnique(false), }, + { + Keys: bson.M{"update_time": 1}, + Options: options.Index().SetUnique(false), + }, } _, err := c.Indexes().CreateMany(ctx, mod) @@ -124,6 +128,13 @@ func (c *ReleasePlanColl) DeleteByID(ctx context.Context, idString string) error return err } +type SortReleasePlanBy string + +const ( + SortReleasePlanByIndex SortReleasePlanBy = "index" + SortReleasePlanByUpdateTime SortReleasePlanBy = "update_time" +) + type ListReleasePlanOption struct { PageNum int64 PageSize int64 @@ -131,7 +142,10 @@ type ListReleasePlanOption struct { Manager string SuccessTimeStart int64 SuccessTimeEnd int64 + UpdateTimeStart int64 + UpdateTimeEnd int64 IsSort bool + SortBy SortReleasePlanBy ExcludedFields []string Status config.ReleasePlanStatus } @@ -147,8 +161,15 @@ func (c *ReleasePlanColl) ListByOptions(opt *ListReleasePlanOption) ([]*models.R ctx := context.Background() opts := options.Find() if opt.IsSort { - opts.SetSort(bson.D{{"index", -1}}) + if opt.SortBy == SortReleasePlanByIndex { + opts.SetSort(bson.D{{"index", -1}}) + } else if opt.SortBy == SortReleasePlanByUpdateTime { + opts.SetSort(bson.D{{"update_time", -1}}) + } else { + opts.SetSort(bson.D{{"index", -1}}) + } } + if opt.PageNum > 0 && opt.PageSize > 0 { opts.SetSkip((opt.PageNum - 1) * opt.PageSize) opts.SetLimit(opt.PageSize) @@ -162,6 +183,9 @@ func (c *ReleasePlanColl) ListByOptions(opt *ListReleasePlanOption) ([]*models.R if opt.SuccessTimeStart > 0 && opt.SuccessTimeEnd > 0 { query["success_time"] = bson.M{"$gte": opt.SuccessTimeStart, "$lte": opt.SuccessTimeEnd} } + if opt.UpdateTimeStart > 0 && opt.UpdateTimeEnd > 0 { + query["update_time"] = bson.M{"$gte": opt.UpdateTimeStart, "$lte": opt.UpdateTimeEnd} + } if opt.Status != "" { query["status"] = opt.Status } diff --git a/pkg/microservice/aslan/core/common/service/cronjob.go b/pkg/microservice/aslan/core/common/service/cronjob.go index fd32eaf120..3bdc6adfee 100644 --- a/pkg/microservice/aslan/core/common/service/cronjob.go +++ b/pkg/microservice/aslan/core/common/service/cronjob.go @@ -21,10 +21,11 @@ import ( ) type CronjobPayload struct { - Name string `json:"name"` - ProductName string `json:"product_name"` - Action string `json:"action"` - JobType string `json:"job_type"` - DeleteList []string `json:"delete_list,omitempty"` - JobList []*models.Schedule `json:"job_list,omitempty"` + Name string `json:"name"` + ProductName string `json:"product_name"` + Action string `json:"action"` + JobType string `json:"job_type"` + ScheduleType string `json:"schedule_type"` + DeleteList []string `json:"delete_list,omitempty"` + JobList []*models.Schedule `json:"job_list,omitempty"` } diff --git a/pkg/microservice/aslan/core/cron/handler/cron.go b/pkg/microservice/aslan/core/cron/handler/cron.go index 3853041c72..afb9be9607 100644 --- a/pkg/microservice/aslan/core/cron/handler/cron.go +++ b/pkg/microservice/aslan/core/cron/handler/cron.go @@ -64,6 +64,7 @@ type cronjobResp struct { Name string `json:"name"` Type string `json:"type"` Number uint64 `json:"number"` + UnixStamp int64 `json:"unix_stamp"` Frequency string `json:"frequency"` Time string `json:"time"` Cron string `json:"cron"` @@ -92,6 +93,7 @@ func ListActiveCronjobFailsafe(c *gin.Context) { Name: cronjob.Name, Type: cronjob.Type, Number: cronjob.Number, + UnixStamp: cronjob.UnixStamp, Frequency: cronjob.Frequency, Time: cronjob.Time, Cron: cronjob.Cron, @@ -124,6 +126,7 @@ func ListActiveCronjob(c *gin.Context) { Name: cronjob.Name, Type: cronjob.Type, Number: cronjob.Number, + UnixStamp: cronjob.UnixStamp, Frequency: cronjob.Frequency, Time: cronjob.Time, Cron: cronjob.Cron, diff --git a/pkg/microservice/aslan/core/environment/service/environment.go b/pkg/microservice/aslan/core/environment/service/environment.go index 542c8f474f..fb86c570ad 100644 --- a/pkg/microservice/aslan/core/environment/service/environment.go +++ b/pkg/microservice/aslan/core/environment/service/environment.go @@ -1493,7 +1493,7 @@ func GetAffectedServices(productName, envName string, arg *K8sRendersetArg, log func GeneEstimatedValues(productName, envName, serviceOrReleaseName, scene, format string, arg *EstimateValuesArg, isHelmChartDeploy bool, log *zap.SugaredLogger) (interface{}, error) { var ( productSvc *commonmodels.ProductService - tmplSvc *commonmodels.Service + tmplSvc *commonmodels.Service productInfo *commonmodels.Product err error ) @@ -3791,6 +3791,7 @@ func cronJobToSchedule(input *commonmodels.Cronjob) *commonmodels.Schedule { return &commonmodels.Schedule{ ID: input.ID, Number: input.Number, + UnixStamp: input.UnixStamp, Frequency: input.Frequency, Time: input.Time, MaxFailures: input.MaxFailure, diff --git a/pkg/microservice/aslan/core/release_plan/handler/release_plan.go b/pkg/microservice/aslan/core/release_plan/handler/release_plan.go index 7ef23436f7..c85dc2d5e7 100644 --- a/pkg/microservice/aslan/core/release_plan/handler/release_plan.go +++ b/pkg/microservice/aslan/core/release_plan/handler/release_plan.go @@ -204,7 +204,7 @@ func ScheduleExecuteReleasePlan(c *gin.Context) { return } - ctx.RespErr = service.ScheduleExecuteReleasePlan(ctx, c.Param("id")) + ctx.RespErr = service.ScheduleExecuteReleasePlan(ctx, c.Param("id"), c.Query("jobID")) } func SkipReleaseJob(c *gin.Context) { diff --git a/pkg/microservice/aslan/core/release_plan/service/release_plan.go b/pkg/microservice/aslan/core/release_plan/service/release_plan.go index e37015b22b..45bf16b908 100644 --- a/pkg/microservice/aslan/core/release_plan/service/release_plan.go +++ b/pkg/microservice/aslan/core/release_plan/service/release_plan.go @@ -28,6 +28,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" @@ -123,7 +124,7 @@ func CreateReleasePlan(c *handler.Context, args *models.ReleasePlan) error { return nil } -func upsertReleasePlanCron(id, name string, index int64, ScheduleExecuteTime int64) error { +func upsertReleasePlanCron(id, name string, index int64, status config.ReleasePlanStatus, ScheduleExecuteTime int64) error { var ( err error payload *commonservice.CronjobPayload @@ -146,67 +147,110 @@ func upsertReleasePlanCron(id, name string, index int64, ScheduleExecuteTime int found = true } - if found { - origEnabled := releasePlanCron.Enabled - releasePlanCron.Enabled = enable - releasePlanCron.ReleasePlanArgs = &commonmodels.ReleasePlanArgs{ - ID: id, - Name: name, - Index: index, - } - releasePlanCron.Cron = util.UnixStampToCronExpr(ScheduleExecuteTime) + if status != config.StatusExecuting { + // delete cron job if status is not executing + if found { + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{releasePlanCron.ID.Hex()}, + }) + if err != nil { + fmtErr := fmt.Errorf("Failed to delete release plan schedule job %s, error: %w", releasePlanCron.ID.Hex(), err) + log.Error(fmtErr) + } - err = commonrepo.NewCronjobColl().Upsert(releasePlanCron) - if err != nil { - fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) - log.Error(fmtErr) - return err + payload = &commonservice.CronjobPayload{ + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + DeleteList: []string{releasePlanCron.ID.Hex()}, + } } + } else { + // upsert cron job if status is executing + if found { + origEnabled := releasePlanCron.Enabled + releasePlanCron.Enabled = enable + releasePlanCron.ReleasePlanArgs = &commonmodels.ReleasePlanArgs{ + ID: id, + Name: name, + Index: index, + } + releasePlanCron.JobType = setting.UnixStampSchedule + releasePlanCron.UnixStamp = ScheduleExecuteTime + + if origEnabled && !enable { + // need to disable cronjob + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{releasePlanCron.ID.Hex()}, + }) + if err != nil { + fmtErr := fmt.Errorf("Failed to delete cron job %s, error: %w", releasePlanCron.ID.Hex(), err) + log.Error(fmtErr) + } - if origEnabled && !enable { - // need to disable cronjob - payload = &commonservice.CronjobPayload{ - Name: releasePlanCronName, - JobType: setting.ReleasePlanCronjob, - Action: setting.TypeEnableCronjob, - DeleteList: []string{releasePlanCron.ID.Hex()}, + payload = &commonservice.CronjobPayload{ + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + DeleteList: []string{releasePlanCron.ID.Hex()}, + } + } else if !origEnabled && enable || origEnabled && enable { + err = commonrepo.NewCronjobColl().Upsert(releasePlanCron) + if err != nil { + fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) + log.Error(fmtErr) + return err + } + + payload = &commonservice.CronjobPayload{ + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + JobList: []*commonmodels.Schedule{cronJobToSchedule(releasePlanCron)}, + } + } else { + // !origEnabled && !enable + return nil + } + } else { + if !enable { + return nil + } + + input := &commonmodels.Cronjob{ + Enabled: enable, + Name: releasePlanCronName, + Type: setting.ReleasePlanCronjob, + JobType: setting.UnixStampSchedule, + UnixStamp: ScheduleExecuteTime, + ReleasePlanArgs: &commonmodels.ReleasePlanArgs{ + ID: id, + Name: name, + Index: index, + }, + } + + err = commonrepo.NewCronjobColl().Upsert(input) + if err != nil { + fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) + log.Error(fmtErr) + return err } - } else if !origEnabled && enable || origEnabled && enable { payload = &commonservice.CronjobPayload{ - Name: releasePlanCronName, - JobType: setting.ReleasePlanCronjob, - Action: setting.TypeEnableCronjob, - JobList: []*commonmodels.Schedule{cronJobToSchedule(releasePlanCron)}, + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + JobList: []*commonmodels.Schedule{cronJobToSchedule(input)}, } - } else { - // !origEnabled && !enable - return nil - } - } else { - input := &commonmodels.Cronjob{ - Name: releasePlanCronName, - Type: setting.ReleasePlanCronjob, - } - input.Enabled = true - input.Cron = util.UnixStampToCronExpr(ScheduleExecuteTime) - input.ReleasePlanArgs = &commonmodels.ReleasePlanArgs{ - ID: id, - Name: name, - Index: index, } + } - err = commonrepo.NewCronjobColl().Upsert(input) - if err != nil { - fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) - log.Error(fmtErr) - return err - } - payload = &commonservice.CronjobPayload{ - Name: releasePlanCronName, - JobType: setting.ReleasePlanCronjob, - Action: setting.TypeEnableCronjob, - JobList: []*commonmodels.Schedule{cronJobToSchedule(input)}, - } + if payload == nil { + return nil } pl, _ := json.Marshal(payload) @@ -296,6 +340,23 @@ func DeleteReleasePlan(c *gin.Context, username, id string) error { return errors.Wrap(err, "get plan") } internalhandler.InsertOperationLog(c, username, "", "删除", "发布计划", info.Name, "", log.SugaredLogger()) + + releasePlanCronName := util.GetReleasePlanCronName(id, info.Name, info.Index) + releasePlanCron, err := commonrepo.NewCronjobColl().GetByName(releasePlanCronName, setting.ReleasePlanCronjob) + if err != nil { + if err != mongo.ErrNoDocuments && err != mongo.ErrNilDocument { + return e.ErrUpsertCronjob.AddErr(fmt.Errorf("failed to get release plan cron job, err: %w", err)) + } + } else { + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{releasePlanCron.ID.Hex()}, + }) + if err != nil { + fmtErr := fmt.Errorf("Failed to delete release plan schedule job %s, error: %w", releasePlanCron.ID.Hex(), err) + log.Error(fmtErr) + } + } + return mongodb.NewReleasePlanColl().DeleteByID(context.Background(), id) } @@ -435,11 +496,37 @@ func ExecuteReleaseJob(c *handler.Context, planID string, args *ExecuteReleaseJo return nil } -func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { +func ScheduleExecuteReleasePlan(c *handler.Context, planID, jobID string) error { approveLock := getLock(planID) approveLock.Lock() defer approveLock.Unlock() + // check if the job is already executed + jobObjectID, err := primitive.ObjectIDFromHex(jobID) + if err != nil { + return errors.Wrap(err, "invalid job ID") + } + _, err = commonrepo.NewCronjobColl().GetByID(jobObjectID) + if err != nil { + if !mongodb.IsErrNoDocuments(err) { + err = fmt.Errorf("Failed to get release job schedule job %s, error: %v", jobID, err) + log.Error(err) + return err + } else { + err = fmt.Errorf("Release job schedule job %s not found", jobID) + log.Error(err) + return err + } + } + + // delete the schedule job after executed + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{jobID}, + }) + if err != nil { + log.Errorf("Failed to delete release job schedule job %s, error: %v", jobID, err) + } + ctx := context.Background() plan, err := mongodb.NewReleasePlanColl().GetByID(ctx, planID) if err != nil { @@ -449,7 +536,7 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { } if plan.Status != config.StatusExecuting { - err = errors.Errorf("plan ID is %s, name is %s, index is %d, status is %s, can not execute", plan.ID, plan.Name, plan.Index, plan.Status) + err = errors.Errorf("plan ID is %s, name is %s, index is %d, status is %s, can not execute", plan.ID.Hex(), plan.Name, plan.Index, plan.Status) log.Error(err) return err } @@ -457,19 +544,19 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { if !(plan.StartTime == 0 && plan.EndTime == 0) { now := time.Now().Unix() if now < plan.StartTime || now > plan.EndTime { - err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's not in the release time range", plan.ID, plan.Name, plan.Index) + err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's not in the release time range", plan.ID.Hex(), plan.Name, plan.Index) log.Error(err) return err } } if plan.Approval != nil && plan.Approval.Enabled == true && plan.Approval.Status != config.StatusPassed { - err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's approval status is %s, can not execute", plan.ID, plan.Name, plan.Index, plan.Approval.Status) + err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's approval status is %s, can not execute", plan.ID.Hex(), plan.Name, plan.Index, plan.Approval.Status) log.Error(err) return err } - log.Infof("schedule execute release plan, plan ID: %s, name: %s, index: %d", plan.ID, plan.Name, plan.Index) + log.Infof("schedule execute release plan, plan ID: %s, name: %s, index: %d", plan.ID.Hex(), plan.Name, plan.Index) for _, job := range plan.Jobs { if job.Type == config.JobWorkflow { @@ -508,7 +595,7 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { plan.Status = config.StatusSuccess } - log.Infof("schedule execute release job, plan ID: %s, name: %s, index: %d, job ID: %s, job name: %s", plan.ID, plan.Name, plan.Index, job.ID, job.Name) + log.Infof("schedule execute release job, plan ID: %s, name: %s, index: %d, job ID: %s, job name: %s", plan.ID.Hex(), plan.Name, plan.Index, job.ID, job.Name) if err = mongodb.NewReleasePlanColl().UpdateByID(ctx, planID, plan); err != nil { err = errors.Wrap(err, "update plan") @@ -652,10 +739,6 @@ func UpdateReleasePlanStatus(c *handler.Context, planID, status string, isSystem return errors.Errorf("approval status is %s, can not execute", plan.Approval.Status) } - if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.ScheduleExecuteTime); err != nil { - return errors.Wrap(err, "upsert release plan cron") - } - plan.ExecutingTime = time.Now().Unix() setReleaseJobsForExecuting(plan) case config.StatusWaitForApprove: @@ -685,6 +768,10 @@ func UpdateReleasePlanStatus(c *handler.Context, planID, status string, isSystem return errors.Wrap(err, "update plan") } + if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.Status, plan.ScheduleExecuteTime); err != nil { + return errors.Wrap(err, "upsert release plan cron") + } + go func() { if err := mongodb.NewReleasePlanLogColl().Create(&models.ReleasePlanLog{ PlanID: planID, @@ -773,7 +860,7 @@ func ApproveReleasePlan(c *handler.Context, planID string, req *ApproveRequest) plan.Status = config.StatusExecuting plan.ApprovalTime = time.Now().Unix() - if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.ScheduleExecuteTime); err != nil { + if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.Status, plan.ScheduleExecuteTime); err != nil { err = errors.Wrap(err, "upsert release plan cron") log.Error(err) } @@ -875,6 +962,7 @@ func cronJobToSchedule(input *commonmodels.Cronjob) *commonmodels.Schedule { return &commonmodels.Schedule{ ID: input.ID, Number: input.Number, + UnixStamp: input.UnixStamp, Frequency: input.Frequency, Time: input.Time, MaxFailures: input.MaxFailure, @@ -891,6 +979,7 @@ const ( ListReleasePlanTypeName ListReleasePlanType = "name" ListReleasePlanTypeManager ListReleasePlanType = "manager" ListReleasePlanTypeSuccessTime ListReleasePlanType = "success_time" + ListReleasePlanTypeUpdateTime ListReleasePlanType = "update_time" ListReleasePlanTypeStatus ListReleasePlanType = "status" ) @@ -950,10 +1039,37 @@ func ListReleasePlans(opt *ListReleasePlanOption) (*ListReleasePlanResp, error) SuccessTimeStart: timeStart, SuccessTimeEnd: timeEnd, IsSort: true, + SortBy: mongodb.SortReleasePlanByUpdateTime, PageNum: opt.PageNum, PageSize: opt.PageSize, ExcludedFields: []string{"jobs", "logs"}, }) + case ListReleasePlanTypeUpdateTime: + timeArr := strings.Split(opt.Keyword, "-") + if len(timeArr) != 2 { + return nil, errors.New("invalid update time range") + } + + timeStart := int64(0) + timeEnd := int64(0) + timeStart, err = strconv.ParseInt(timeArr[0], 10, 64) + if err != nil { + return nil, errors.Wrap(err, "invalid update time start") + } + timeEnd, err = strconv.ParseInt(timeArr[1], 10, 64) + if err != nil { + return nil, errors.Wrap(err, "invalid update time end") + } + + list, total, err = mongodb.NewReleasePlanColl().ListByOptions(&mongodb.ListReleasePlanOption{ + UpdateTimeStart: timeStart, + UpdateTimeEnd: timeEnd, + IsSort: true, + SortBy: mongodb.SortReleasePlanByUpdateTime, + PageNum: opt.PageNum, + PageSize: opt.PageSize, + ExcludedFields: []string{"jobs", "logs"}, + }) case ListReleasePlanTypeStatus: list, total, err = mongodb.NewReleasePlanColl().ListByOptions(&mongodb.ListReleasePlanOption{ Status: config.ReleasePlanStatus(opt.Keyword), diff --git a/pkg/microservice/aslan/core/release_plan/service/watcher.go b/pkg/microservice/aslan/core/release_plan/service/watcher.go index a3dba99232..8d4109ff43 100644 --- a/pkg/microservice/aslan/core/release_plan/service/watcher.go +++ b/pkg/microservice/aslan/core/release_plan/service/watcher.go @@ -200,7 +200,7 @@ func updatePlanApproval(plan *models.ReleasePlan) error { plan.Status = config.StatusExecuting plan.ApprovalTime = time.Now().Unix() - if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.ScheduleExecuteTime); err != nil { + if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.Status, plan.ScheduleExecuteTime); err != nil { err = errors.Wrap(err, "upsert release plan cron") log.Error(err) } diff --git a/pkg/microservice/aslan/core/service.go b/pkg/microservice/aslan/core/service.go index 7f881797ee..7e83f99192 100644 --- a/pkg/microservice/aslan/core/service.go +++ b/pkg/microservice/aslan/core/service.go @@ -23,7 +23,7 @@ import ( "sync" "time" - newgoCron "github.com/go-co-op/gocron" + newgoCron "github.com/go-co-op/gocron/v2" _ "github.com/go-sql-driver/mysql" "github.com/hashicorp/go-multierror" "github.com/koderover/zadig/v2/pkg/tool/clientmanager" @@ -156,9 +156,13 @@ func Stop(ctx context.Context) { var Scheduler *newgoCron.Scheduler func initCron() { - Scheduler = newgoCron.NewScheduler(time.Local) + Scheduler, err := newgoCron.NewScheduler() + if err != nil { + log.Fatalf("failed to create scheduler: %v", err) + return + } - Scheduler.Every(5).Minutes().Do(func() { + Scheduler.NewJob(newgoCron.DurationJob(5*time.Minute), newgoCron.NewTask(func() { log.Infof("[CRONJOB] updating tokens for gitlab....") codehostList, err := mongodb2.NewCodehostColl().List(&mongodb2.ListArgs{ Source: "gitlab", @@ -175,11 +179,11 @@ func initCron() { } } log.Infof("[CRONJOB] gitlab token updated....") - }) + })) - Scheduler.Every(1).Days().At("04:00").Do(cleanCacheFiles) + Scheduler.NewJob(newgoCron.DailyJob(1, newgoCron.NewAtTimes(newgoCron.NewAtTime(4, 0, 0))), newgoCron.NewTask(cleanCacheFiles)) - Scheduler.StartAsync() + Scheduler.Start() } func initService() { diff --git a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go index d2bae88446..84581c04ca 100644 --- a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go +++ b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go @@ -2083,6 +2083,7 @@ func cronJobToSchedule(input *commonmodels.Cronjob) *commonmodels.Schedule { ID: input.ID, Number: input.Number, Frequency: input.Frequency, + UnixStamp: input.UnixStamp, Time: input.Time, MaxFailures: input.MaxFailure, WorkflowV4Args: input.WorkflowV4Args, diff --git a/pkg/microservice/cron/core/service/cronjob.go b/pkg/microservice/cron/core/service/cronjob.go index 91184d97b6..fab49e1117 100644 --- a/pkg/microservice/cron/core/service/cronjob.go +++ b/pkg/microservice/cron/core/service/cronjob.go @@ -17,12 +17,13 @@ limitations under the License. package service type CronjobPayload struct { - Name string `json:"name"` - ProductName string `json:"product_name"` - Action string `json:"action"` - JobType string `json:"job_type"` - DeleteList []string `json:"delete_list,omitempty"` - JobList []*Schedule `json:"job_list,omitempty"` + Name string `json:"name"` + ProductName string `json:"product_name"` + Action string `json:"action"` + JobType string `json:"job_type"` + ScheduleType string `json:"schedule_type"` + DeleteList []string `json:"delete_list,omitempty"` + JobList []*Schedule `json:"job_list,omitempty"` } type Cronjob struct { @@ -31,6 +32,7 @@ type Cronjob struct { Type string `json:"type"` Number uint64 `json:"number"` Frequency string `json:"frequency"` + UnixStamp int64 `json:"unix_stamp"` Time string `json:"time"` Cron string `json:"cron"` ProductName string `json:"product_name,omitempty"` diff --git a/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go b/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go index 9fb85261f1..ce8d3aaed4 100644 --- a/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go +++ b/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go @@ -26,6 +26,7 @@ import ( "strings" "time" + newgoCron "github.com/go-co-op/gocron/v2" "github.com/rfyiamcool/cronlib" "github.com/koderover/zadig/v2/pkg/microservice/cron/core/service" @@ -42,20 +43,22 @@ const ( ) type CronjobHandler struct { - aslanCli *client.Client - Scheduler *cronlib.CronSchduler + aslanCli *client.Client + Scheduler *cronlib.CronSchduler + NewScheduler newgoCron.Scheduler } -func NewCronjobHandler(client *client.Client, scheduler *cronlib.CronSchduler) *CronjobHandler { - InitExistedCronjob(client, scheduler) +func NewCronjobHandler(client *client.Client, scheduler *cronlib.CronSchduler, newScheduler newgoCron.Scheduler) *CronjobHandler { + InitExistedCronjob(client, scheduler, newScheduler) return &CronjobHandler{ - aslanCli: client, - Scheduler: scheduler, + aslanCli: client, + Scheduler: scheduler, + NewScheduler: newScheduler, } } -func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler) { +func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler, newScheduler newgoCron.Scheduler) { log.Infof("Initializing existing cronjob ....") initChan := make(chan []*service.Cronjob, 1) @@ -99,7 +102,7 @@ func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler) select { case jobList = <-initChan: for _, job := range jobList { - err := registerCronjob(job, client, scheduler) + err := registerCronjob(job, client, scheduler, newScheduler) if err != nil { fmt.Printf("Failed to init job with id: %s, err: %s\n", job.ID, err) } @@ -111,7 +114,7 @@ func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler) select { case jobList = <-failsafeChan: for _, job := range jobList { - err := registerCronjob(job, client, scheduler) + err := registerCronjob(job, client, scheduler, newScheduler) if err != nil { fmt.Printf("Failed to init job with id: %s, err: %s\n", job.ID, err) } @@ -128,77 +131,97 @@ func (h *CronjobHandler) HandleMessage(msgs []*service.CronjobPayload) error { for _, msg := range msgs { switch msg.Action { case setting.TypeEnableCronjob: - err := h.updateCronjob(msg.Name, msg.ProductName, msg.JobType, msg.JobList, msg.DeleteList) + err := h.updateCronjob(msg.Name, msg.ProductName, msg.ScheduleType, msg.JobType, msg.JobList, msg.DeleteList) if err != nil { log.Errorf("Failed to update cronjob, the error is: %v", err) return err } case setting.TypeDisableCronjob: - err := h.stopCronjob(msg.Name, msg.JobType) + err := h.stopCronjob(msg.Name, msg.JobType, msg.ScheduleType) if err != nil { log.Errorf("Failed to stop all cron job, the error is: %v", err) return err } default: - log.Errorf("unsupported cronjob action: NOT RECONSUMING") + log.Errorf("unsupported cronjob action: %+v", msg) } } return nil } -func (h *CronjobHandler) updateCronjob(name, productName, jobType string, jobList []*service.Schedule, deleteList []string) error { - //首先根据deleteList停止不需要的cronjob - for _, deleteID := range deleteList { - jobID := deleteID - log.Infof("stopping Job of ID: %s", jobID) - h.Scheduler.StopService(jobID) - } - // 根据job内容来在scheduler中新增cronjob - for _, job := range jobList { - var cron string - if job.Type == setting.FixedGapCronjob || job.Type == setting.FixedDayTimeCronjob { - cronString, err := convertFixedTimeToCron(job) - if err != nil { - return err - } - cron = cronString - } else { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) +func (h *CronjobHandler) updateCronjob(name, productName, scheduleType, jobType string, jobList []*service.Schedule, deleteList []string) error { + if scheduleType == setting.UnixStampSchedule { + //首先根据deleteList停止不需要的cronjob + for _, deleteID := range deleteList { + jobID := deleteID + log.Infof("stopping UnixStamp Schedule Job of ID: %s", jobID) + h.NewScheduler.RemoveByTags(jobID) } - switch jobType { - case setting.WorkflowCronjob: - err := h.registerWorkFlowJob(name, cron, job) - if err != nil { - return err - } - case setting.TestingCronjob: - err := h.registerTestJob(name, productName, cron, job) - if err != nil { - return err - } - case setting.WorkflowV4Cronjob: - err := h.registerWorkFlowV4Job(name, cron, job) - if err != nil { - return err - } - case setting.EnvAnalysisCronjob: - err := h.registerEnvAnalysisJob(name, cron, job) - if err != nil { - return err + + // 根据job内容来在scheduler中新增cronjob + for _, job := range jobList { + switch jobType { + case setting.ReleasePlanCronjob: + err := h.registerReleasePlanJob(name, job) + if err != nil { + return err + } + default: + log.Errorf("unrecognized cron job type for job id: %s", job.ID) } - case setting.EnvSleepCronjob: - err := h.registerEnvSleepJob(name, cron, job) - if err != nil { - return err + } + + } else { + //首先根据deleteList停止不需要的cronjob + for _, deleteID := range deleteList { + jobID := deleteID + log.Infof("stopping Job of ID: %s", jobID) + h.Scheduler.StopService(jobID) + } + + // 根据job内容来在scheduler中新增cronjob + for _, job := range jobList { + var cron string + if job.Type == setting.FixedGapCronjob || job.Type == setting.FixedDayTimeCronjob { + cronString, err := convertFixedTimeToCron(job) + if err != nil { + return err + } + cron = cronString + } else { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) } - case setting.ReleasePlanCronjob: - err := h.registerReleasePlanJob(name, cron, job) - if err != nil { - return err + + switch jobType { + case setting.WorkflowCronjob: + err := h.registerWorkFlowJob(name, cron, job) + if err != nil { + return err + } + case setting.TestingCronjob: + err := h.registerTestJob(name, productName, cron, job) + if err != nil { + return err + } + case setting.WorkflowV4Cronjob: + err := h.registerWorkFlowV4Job(name, cron, job) + if err != nil { + return err + } + case setting.EnvAnalysisCronjob: + err := h.registerEnvAnalysisJob(name, cron, job) + if err != nil { + return err + } + case setting.EnvSleepCronjob: + err := h.registerEnvSleepJob(name, cron, job) + if err != nil { + return err + } + default: + log.Errorf("unrecognized cron job type for job id: %s", job.ID) } - default: - log.Errorf("unrecognized cron job type for job id: %s", job.ID) } } return nil @@ -333,7 +356,7 @@ func (h *CronjobHandler) registerTestJob(name, productName, schedule string, job // FIXME // UNDER CURRENT SERVICE STRUCTURE, STOPPING CRONJOB SERVICE AND UPDATING DB RECORD // ARE NOT ATOMIC, THIS WILL CAUSE SERIOUS PROBLEM IF UPDATE FAILED -func (h *CronjobHandler) stopCronjob(name, ptype string) error { +func (h *CronjobHandler) stopCronjob(name, ptype, scheduleType string) error { var jobList []*service.Cronjob listAPI := fmt.Sprintf("%s/cron/cronjob/type/%s/name/%s", h.aslanCli.APIBase, ptype, name) header := http.Header{} @@ -348,9 +371,16 @@ func (h *CronjobHandler) stopCronjob(name, ptype string) error { return err } - for _, job := range jobList { - log.Infof("stopping cronjob of ID: %s", job.ID) - h.Scheduler.StopService(job.ID) + if scheduleType == setting.UnixStampSchedule { + for _, job := range jobList { + log.Infof("stopping unixstamp schedule job of ID: %s", job.ID) + h.NewScheduler.RemoveByTags(job.ID) + } + } else { + for _, job := range jobList { + log.Infof("stopping cronjob of ID: %s", job.ID) + h.Scheduler.StopService(job.ID) + } } disableAPI := fmt.Sprintf("%s/cron/cronjob/disable", h.aslanCli.APIBase) @@ -369,206 +399,193 @@ func (h *CronjobHandler) stopCronjob(name, ptype string) error { return nil } -func registerCronjob(job *service.Cronjob, client *client.Client, scheduler *cronlib.CronSchduler) error { - switch job.Type { - case setting.WorkflowCronjob: - args := &service.WorkflowTaskArgs{ - WorkflowName: job.Name, - WorklowTaskCreator: setting.CronTaskCreator, - } - if job.WorkflowArgs != nil { - args.Description = job.WorkflowArgs.Description - args.ProductTmplName = job.WorkflowArgs.ProductTmplName - args.Target = job.WorkflowArgs.Target - args.Namespace = job.WorkflowArgs.Namespace - args.Tests = job.WorkflowArgs.Tests - args.DistributeEnabled = job.WorkflowArgs.DistributeEnabled - } - var cron string - if job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - if err := client.ScheduleCall(path.Join("workflow/workflowtask", job.WorkflowArgs.WorkflowName), args, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) - } - }) - if err != nil { - log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.WorkflowV4Cronjob: - if job.WorkflowV4Args == nil { - return fmt.Errorf("workflow args is nil") - } - var cron string - if job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - if err := client.ScheduleCall(fmt.Sprintf("workflow/v4/workflowtask/trigger?triggerName=%s", setting.CronTaskCreator), job.WorkflowV4Args, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) - } - }) - if err != nil { - log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.TestingCronjob: - args := &service.TestTaskArgs{ - TestName: job.Name, - ProductName: job.ProductName, - TestTaskCreator: setting.CronTaskCreator, - } - var cron string - if job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - if err := client.ScheduleCall("testing/testtask", args, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) +func registerCronjob(job *service.Cronjob, client *client.Client, scheduler *cronlib.CronSchduler, newScheduler newgoCron.Scheduler) error { + if job.JobType == setting.UnixStampSchedule { + switch job.Type { + case setting.ReleasePlanCronjob: + if job.ReleasePlanArgs == nil { + log.Errorf("ReleasePlanArgs is nil, name: %v, jobID: %v", job.Name, job.ID) + return nil } - }) - if err != nil { - log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.EnvAnalysisCronjob: - if job.EnvAnalysisArgs == nil { - return nil - } - var cron string - if job.JobType == "" || job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } + executeReleasePlanFunc := func() { + base := "release_plan/v1" + url := base + fmt.Sprintf("/%s/schedule_execute?jobID=%s", job.ReleasePlanArgs.ID, job.ID) + if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { + log.Errorf("[%s] RunScheduledTask err: %v", job.Name, err) + } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - base := "environment/environments/" - production := "false" - if job.EnvAnalysisArgs.Production { - production = "true" + log.Infof("schedule executed release plan, jobID: %v, schdule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, time.Unix(job.UnixStamp, 0), job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) } - url := base + fmt.Sprintf("%s/analysis?projectName=%s&triggerName=%s&userName=%s&production=%s", job.EnvAnalysisArgs.EnvName, job.EnvAnalysisArgs.ProductName, setting.CronTaskCreator, setting.CronTaskCreator, production) - - if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + // delete old schedule job first + tag := job.ID + newScheduler.RemoveByTags(tag) + + scheduleTime := time.Unix(job.UnixStamp, 0) + jobName := fmt.Sprintf("release_plan:%s:%d:%s", job.ReleasePlanArgs.Name, job.ReleasePlanArgs.Index, scheduleTime) + if scheduleTime.Before(time.Now()) { + if time.Now().Sub(scheduleTime) <= time.Second*30 { + // now - schedule time <= 30s + // maybe missed the schedule time because of cron service restart + // so start this immediately + log.Infof("found an release plan outdated <= 30s, start it immediately, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + _, err := newScheduler.NewJob(newgoCron.OneTimeJob(newgoCron.OneTimeJobStartImmediately()), newgoCron.NewTask(executeReleasePlanFunc), newgoCron.WithTags(tag), newgoCron.WithName(jobName)) + if err != nil { + log.Errorf("Failed to create jobID: %s, jobName: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID, job.Name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) + return err + } + } else { + // now - schedule time > 30s + // schedule time is too old + log.Errorf("found an release plan outdate > 30s, drop it, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + return nil + } + } else { + // schedule time correct + _, err := newScheduler.NewJob(newgoCron.OneTimeJob(newgoCron.OneTimeJobStartDateTime(scheduleTime)), newgoCron.NewTask(executeReleasePlanFunc), newgoCron.WithTags(tag), newgoCron.WithName(jobName)) + if err != nil { + log.Errorf("Failed to create jobID: %s, jobName: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID, job.Name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) + return err + } } - }) - if err != nil { - log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.EnvSleepCronjob: - if job.EnvArgs == nil { + log.Infof("registering jobID: %s with name: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, job.Name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) return nil } - var cron string - if job.JobType == "" || job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - base := "environment/environments/" - production := "false" - if job.EnvArgs.Production { - production = "true" + } else { + switch job.Type { + case setting.WorkflowV4Cronjob: + if job.WorkflowV4Args == nil { + return fmt.Errorf("workflow args is nil") } - - url := "" - if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, true) { - url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=enable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) - } else if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, false) { - url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=disable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) + var cron string + if job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) + } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + if err := client.ScheduleCall(fmt.Sprintf("workflow/v4/workflowtask/trigger?triggerName=%s", setting.CronTaskCreator), job.WorkflowV4Args, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + case setting.TestingCronjob: + args := &service.TestTaskArgs{ + TestName: job.Name, + ProductName: job.ProductName, + TestTaskCreator: setting.CronTaskCreator, + } + var cron string + if job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) + } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + if err := client.ScheduleCall("testing/testtask", args, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + case setting.EnvAnalysisCronjob: + if job.EnvAnalysisArgs == nil { + return nil } - if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + var cron string + if job.JobType == "" || job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) } - }) - if err != nil { - log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.ReleasePlanCronjob: - var cron string - if job.JobType == "" || job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + base := "environment/environments/" + production := "false" + if job.EnvAnalysisArgs.Production { + production = "true" + } - if job.ReleasePlanArgs == nil { - log.Errorf("ReleasePlanArgs is nil, name: %v, schedule: %v, jobID: %v", job.Name, cron, job.ID) - return nil - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - base := "release_plan/v1" - url := base + fmt.Sprintf("/%s/schedule_execute", job.ReleasePlanArgs.ID) - if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) - } + url := base + fmt.Sprintf("%s/analysis?projectName=%s&triggerName=%s&userName=%s&production=%s", job.EnvAnalysisArgs.EnvName, job.EnvAnalysisArgs.ProductName, setting.CronTaskCreator, setting.CronTaskCreator, production) - scheduler.StopService(job.ID) + if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) + return err + } - log.Infof("schedule executed release plan, jobID: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID, job.Cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - }) - if err != nil { - log.Errorf("Failed to create jobID: %s, jobName: %v, cron: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID, job.Name, cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) - return err - } + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + case setting.EnvSleepCronjob: + if job.EnvArgs == nil { + return nil + } + var cron string + if job.JobType == "" || job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) + } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + base := "environment/environments/" + production := "false" + if job.EnvArgs.Production { + production = "true" + } + + url := "" + if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, true) { + url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=enable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) + } else if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, false) { + url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=disable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) + } + + if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) + return err + } - log.Infof("registering jobID: %s with name: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID, job.Name, cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + default: + fmt.Printf("Not supported type of service: %s\n", job.Type) + return errors.New("not supported service type") } - default: - fmt.Printf("Not supported type of service: %s\n", job.Type) - return errors.New("not supported service type") } return nil } @@ -640,32 +657,39 @@ func (h *CronjobHandler) registerEnvSleepJob(name, schedule string, job *service return nil } -func (h *CronjobHandler) registerReleasePlanJob(name, schedule string, job *service.Schedule) error { +func (h *CronjobHandler) registerReleasePlanJob(name string, job *service.Schedule) error { if job.ReleasePlanArgs == nil { - log.Errorf("ReleasePlanArgs is nil, name: %v, schedule: %v, jobID: %v", name, schedule, job.ID.Hex()) + log.Errorf("ReleasePlanArgs is nil, name: %v, jobID: %v", name, job.ID.Hex()) return nil } - scheduleJob, err := cronlib.NewJobModel(schedule, func() { + + executeReleasePlanFunc := func() { base := "release_plan/v1" - url := base + fmt.Sprintf("/%s/schedule_execute", job.ReleasePlanArgs.ID) + url := base + fmt.Sprintf("/%s/schedule_execute?jobID=%s", job.ReleasePlanArgs.ID, job.ID.Hex()) if err := h.aslanCli.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { log.Errorf("[%s]RunScheduledTask err: %v", name, err) } - h.Scheduler.StopService(job.ID.Hex()) + log.Infof("schedule executed release plan, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), time.Unix(job.UnixStamp, 0), job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + } - log.Infof("schedule executed release plan, jobID: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), job.Cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - }) - if err != nil { - log.Errorf("Failed to create jobID: %s, jobName: %v, cron: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID.Hex(), name, schedule, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) - return err + // delete old schedule job first + tag := job.ID.Hex() + h.NewScheduler.RemoveByTags(tag) + + scheduleTime := time.Unix(job.UnixStamp, 0) + if scheduleTime.Before(time.Now()) { + log.Errorf("release plan schedule time is before now, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + return nil } - log.Infof("registering jobID: %s with name: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), name, schedule, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - err = h.Scheduler.UpdateJobModel(job.ID.Hex(), scheduleJob) + jobName := fmt.Sprintf("release_plan:%s:%d:%s", job.ReleasePlanArgs.Name, job.ReleasePlanArgs.Index, scheduleTime) + _, err := h.NewScheduler.NewJob(newgoCron.OneTimeJob(newgoCron.OneTimeJobStartDateTime(scheduleTime)), newgoCron.NewTask(executeReleasePlanFunc), newgoCron.WithTags(tag), newgoCron.WithName(jobName)) if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + log.Errorf("Failed to create jobID: %s, jobName: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID.Hex(), name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) return err } + + log.Infof("registering jobID: %s with name: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) return nil } diff --git a/pkg/microservice/cron/core/service/scheduler/schedule_env.go b/pkg/microservice/cron/core/service/scheduler/schedule_env.go index 9075f3be9c..506cb558ee 100644 --- a/pkg/microservice/cron/core/service/scheduler/schedule_env.go +++ b/pkg/microservice/cron/core/service/scheduler/schedule_env.go @@ -91,7 +91,7 @@ func (c *CronClient) UpsertEnvServiceScheduler(log *zap.SugaredLogger) { } } c.lastSchedulersRWMutex.Unlock() - log.Infof("[vm] [%s] deleted service scheduler..", key) + // log.Infof("[vm] [%s] deleted service scheduler..", key) continue } @@ -136,7 +136,7 @@ func (c *CronClient) UpsertEnvServiceScheduler(log *zap.SugaredLogger) { c.SchedulerController[key] = c.Schedulers[key].Start() c.SchedulerControllerRWMutex.Unlock() - log.Infof("[vm] [%s] added service scheduler..", key) + // log.Infof("[vm] [%s] added service scheduler..", key) } } break diff --git a/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go b/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go index 3ff80f79bb..5ee5f75a4c 100644 --- a/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go +++ b/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go @@ -56,7 +56,7 @@ func (c *CronClient) UpsertEnvValueSyncScheduler(log *zap.SugaredLogger) { log.Infof("start init env values sync scheduler... env count: %v", len(envs)) for _, env := range envs { - log.Infof("schedule_env_update handle single helm env: %s/%s", env.ProductName, env.EnvName) + // log.Debugf("schedule_env_update handle single helm env: %s/%s", env.ProductName, env.EnvName) envObj, err := c.AslanCli.GetEnvService(env.ProductName, env.EnvName, log) if err != nil { log.Errorf("failed to get env data, productName:%s envName:%s err:%v", env.ProductName, env.EnvName, err) diff --git a/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go b/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go index 2dc906852a..d83c76b43d 100644 --- a/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go +++ b/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go @@ -93,66 +93,6 @@ func (c *CronClient) UpsertWorkflowScheduler(log *zap.SugaredLogger) { c.SchedulerControllerRWMutex.Unlock() } - pipelines, err := c.AslanCli.ListPipelines(log) - if err != nil { - log.Error(err) - return - } - - log.Info("start init pipeline scheduler..") - for _, pipeline := range pipelines { - key := "pipeline-" + pipeline.Name - taskMap[key] = true - - c.enabledMapRWMutex.Lock() - c.lastSchedulersRWMutex.Lock() - if _, ok := c.lastSchedulers[key]; ok && reflect.DeepEqual(pipeline.Schedules.Items, c.lastSchedulers[key]) { - // 增加判断:enabled的值未被更新时才能跳过 - if enabled, ok := c.enabledMap[key]; ok && enabled == pipeline.Schedules.Enabled { - c.lastSchedulersRWMutex.Unlock() - c.enabledMapRWMutex.Unlock() - continue - } - } - c.enabledMap[key] = pipeline.Schedules.Enabled - c.lastSchedulers[key] = pipeline.Schedules.Items - c.lastSchedulersRWMutex.Unlock() - c.enabledMapRWMutex.Unlock() - - newScheduler := gocron.NewScheduler() - for _, schedule := range pipeline.Schedules.Items { - if schedule != nil { - if err := schedule.Validate(); err != nil { - log.Errorf("[%s] invalid schedule: %v", key, err) - continue - } - BuildScheduledPipelineJob(newScheduler, schedule).Do(c.RunScheduledPipelineTask, pipeline, schedule.TaskArgs, log) - } - } - // 所有scheduler总开关 - if !pipeline.Schedules.Enabled { - newScheduler.Clear() - } - - c.SchedulersRWMutex.Lock() - c.Schedulers[key] = newScheduler - c.SchedulersRWMutex.Unlock() - - log.Infof("[%s] building schedulers..", key) - // 停掉旧的scheduler - c.SchedulerControllerRWMutex.Lock() - sc, ok := c.SchedulerController[key] - c.SchedulerControllerRWMutex.Unlock() - if ok { - sc <- true - } - - log.Infof("[%s]lens of scheduler: %d", key, c.Schedulers[key].Len()) - c.SchedulerControllerRWMutex.Lock() - c.SchedulerController[key] = c.Schedulers[key].Start() - c.SchedulerControllerRWMutex.Unlock() - } - ScheduleNames := sets.NewString( CleanJobScheduler, UpsertWorkflowScheduler, UpsertTestScheduler, InitStatScheduler, InitOperationStatScheduler, diff --git a/pkg/microservice/cron/core/service/scheduler/scheduler.go b/pkg/microservice/cron/core/service/scheduler/scheduler.go index 1ceff56e50..c063d912e3 100644 --- a/pkg/microservice/cron/core/service/scheduler/scheduler.go +++ b/pkg/microservice/cron/core/service/scheduler/scheduler.go @@ -23,12 +23,11 @@ import ( "sync" "time" + newgoCron "github.com/go-co-op/gocron/v2" "github.com/jasonlvhit/gocron" "github.com/rfyiamcool/cronlib" "go.uber.org/zap" - newgoCron "github.com/go-co-op/gocron" - configbase "github.com/koderover/zadig/v2/pkg/config" "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" "github.com/koderover/zadig/v2/pkg/microservice/cron/core/service" @@ -66,13 +65,17 @@ type CronClient struct { } type CronV3Client struct { - Scheduler *newgoCron.Scheduler + Scheduler newgoCron.Scheduler AslanCli *aslan.Client } func NewCronV3() *CronV3Client { + scheduler, err := newgoCron.NewScheduler() + if err != nil { + log.Fatalf("failed to create scheduler: %v", err) + } return &CronV3Client{ - Scheduler: newgoCron.NewScheduler(time.Local), + Scheduler: scheduler, AslanCli: aslan.New(configbase.AslanServiceAddress()), } } @@ -80,7 +83,7 @@ func NewCronV3() *CronV3Client { func (c *CronV3Client) Start() { var lastConfig *aslan.CleanConfig - c.Scheduler.Every(5).Seconds().Do(func() { + c.Scheduler.NewJob(newgoCron.DurationJob(5*time.Second), newgoCron.NewTask(func() { // get the docker clean config config, err := c.AslanCli.GetDockerCleanConfig() if err != nil { @@ -92,26 +95,26 @@ func (c *CronV3Client) Start() { lastConfig = config log.Infof("config changed to %v", config) if config.CronEnabled { - c.Scheduler.RemoveByTag(string(types.CleanDockerTag)) - _, err = c.Scheduler.Cron(config.Cron).Tag(string(types.CleanDockerTag)).Do(func() { + c.Scheduler.RemoveByTags(string(types.CleanDockerTag)) + + _, err := c.Scheduler.NewJob(newgoCron.CronJob(config.Cron, false), newgoCron.NewTask(func() { log.Infof("trigger aslan docker clean,reg: %v", config.Cron) // call docker clean if err := c.AslanCli.DockerClean(); err != nil { log.Errorf("fail to clean docker cache , err:%s", err) } - }) + })) if err != nil { log.Errorf("fail to add docker_cache clean cron job:reg: %v,err:%s", config.Cron, err) } } else { log.Infof("remove docker_cache clean job , job tag: %v", types.CleanDockerTag) - c.Scheduler.RemoveByTag(string(types.CleanDockerTag)) - + c.Scheduler.RemoveByTags(string(types.CleanDockerTag)) } } - }) + })) - c.Scheduler.StartAsync() + c.Scheduler.Start() } const ( @@ -146,14 +149,19 @@ const ( ) // NewCronClient ... -// 服务初始化 +// 注意初始化失败会panic func NewCronClient() *CronClient { aslanCli := client.NewAslanClient(fmt.Sprintf("%s/api", configbase.AslanServiceAddress())) cronjobScheduler := cronlib.New() cronjobScheduler.Start() + newgoCronSchedule, err := newgoCron.NewScheduler() + if err != nil { + log.Fatalf("failed to create scheduler: %v", err) + } + newgoCronSchedule.Start() - cronjobHandler := NewCronjobHandler(aslanCli, cronjobScheduler) + cronjobHandler := NewCronjobHandler(aslanCli, cronjobScheduler, newgoCronSchedule) go func() { for { diff --git a/pkg/microservice/cron/core/service/types.go b/pkg/microservice/cron/core/service/types.go index 3795d62dfe..591ba7e37e 100644 --- a/pkg/microservice/cron/core/service/types.go +++ b/pkg/microservice/cron/core/service/types.go @@ -37,6 +37,8 @@ const ( TimingSchedule ScheduleType = "timing" // GapSchedule 间隔循环 GapSchedule ScheduleType = "gap" + // UnixstampSchedule 时间戳定时 + UnixstampSchedule ScheduleType = "unix_stamp" ) type PipelineResource struct { @@ -80,6 +82,7 @@ type PipelineSpec struct { type Schedule struct { ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"` Number uint64 `bson:"number" json:"number"` + UnixStamp int64 `bson:"unix_stamp" json:"unix_stamp"` Frequency string `bson:"frequency" json:"frequency"` Time string `bson:"time" json:"time"` MaxFailures int `bson:"max_failures,omitempty" json:"max_failures,omitempty"` diff --git a/pkg/setting/consts.go b/pkg/setting/consts.go index b1319b780b..fa33940faa 100644 --- a/pkg/setting/consts.go +++ b/pkg/setting/consts.go @@ -559,6 +559,7 @@ const ( FixedDayTimeCronjob = "timing" FixedGapCronjob = "gap" CrontabCronjob = "crontab" + UnixStampSchedule = "unix_stamp" // 定时器的所属job类型 WorkflowCronjob = "workflow" From ed1e9689b587b1231b682c04147d89ce1a377958 Mon Sep 17 00:00:00 2001 From: Petrus Date: Thu, 9 Jan 2025 10:17:08 +0800 Subject: [PATCH 5/8] create log before execute release plan job (#3952) Signed-off-by: Patrick Zhao --- .../core/release_plan/service/release_plan.go | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/pkg/microservice/aslan/core/release_plan/service/release_plan.go b/pkg/microservice/aslan/core/release_plan/service/release_plan.go index 45bf16b908..1f916d00eb 100644 --- a/pkg/microservice/aslan/core/release_plan/service/release_plan.go +++ b/pkg/microservice/aslan/core/release_plan/service/release_plan.go @@ -569,6 +569,21 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID, jobID string) error Name: job.Name, Type: string(job.Type), } + + go func() { + if err := mongodb.NewReleasePlanLogColl().Create(&models.ReleasePlanLog{ + PlanID: planID, + Username: "系统", + Account: "", + Verb: VerbExecute, + TargetName: args.Name, + TargetType: TargetTypeReleaseJob, + CreatedAt: time.Now().Unix(), + }); err != nil { + log.Errorf("create release plan log error: %v", err) + } + }() + executor, err := NewReleaseJobExecutor(&ExecuteReleaseJobContext{ AuthResources: c.Resources, UserID: c.UserID, @@ -602,20 +617,6 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID, jobID string) error log.Error(err) return err } - - go func() { - if err := mongodb.NewReleasePlanLogColl().Create(&models.ReleasePlanLog{ - PlanID: planID, - Username: "系统", - Account: "", - Verb: VerbExecute, - TargetName: args.Name, - TargetType: TargetTypeReleaseJob, - CreatedAt: time.Now().Unix(), - }); err != nil { - log.Errorf("create release plan log error: %v", err) - } - }() } } From 7ed56bb9d6709ac4d8d946326c1775a7aa00ace1 Mon Sep 17 00:00:00 2001 From: Min Min <37430175+jamsman94@users.noreply.github.com> Date: Thu, 9 Jan 2025 13:39:27 +0800 Subject: [PATCH 6/8] fix gitee list project with chinese name (#3947) Signed-off-by: Min Min --- pkg/microservice/aslan/core/code/client/gitee/gitee.go | 2 +- pkg/tool/gitee/repositories.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/microservice/aslan/core/code/client/gitee/gitee.go b/pkg/microservice/aslan/core/code/client/gitee/gitee.go index 3642375421..32bec35781 100644 --- a/pkg/microservice/aslan/core/code/client/gitee/gitee.go +++ b/pkg/microservice/aslan/core/code/client/gitee/gitee.go @@ -152,7 +152,7 @@ func (c *Client) ListProjects(opt client.ListOpt) ([]*client.Project, error) { for _, project := range projects { res = append(res, &client.Project{ ID: project.ID, - Name: project.Name, + Name: project.Path, DefaultBranch: project.DefaultBranch, }) } diff --git a/pkg/tool/gitee/repositories.go b/pkg/tool/gitee/repositories.go index 07655fbe99..1f89cb185d 100644 --- a/pkg/tool/gitee/repositories.go +++ b/pkg/tool/gitee/repositories.go @@ -32,6 +32,7 @@ import ( type Project struct { ID int `json:"id"` Name string `json:"name"` + Path string `json:"path"` DefaultBranch string `json:"default_branch,omitempty"` Namespace *NamespaceInfo `json:"namespace,omitempty"` } From 8c7f2588b6afeec5df099a11fe73b880dd4b5c8b Mon Sep 17 00:00:00 2001 From: Petrus Date: Thu, 9 Jan 2025 18:11:43 +0800 Subject: [PATCH 7/8] fix 321 ua (#3955) Signed-off-by: Patrick Zhao --- pkg/cli/upgradeassistant/cmd/migrate/321.go | 4 +++- pkg/microservice/aslan/config/consts.go | 2 +- .../cron/core/service/scheduler/cronjob_handler.go | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/cli/upgradeassistant/cmd/migrate/321.go b/pkg/cli/upgradeassistant/cmd/migrate/321.go index 7bbe4db04a..845c919f24 100644 --- a/pkg/cli/upgradeassistant/cmd/migrate/321.go +++ b/pkg/cli/upgradeassistant/cmd/migrate/321.go @@ -25,6 +25,7 @@ import ( commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" "github.com/koderover/zadig/v2/pkg/shared/handler" + "github.com/koderover/zadig/v2/pkg/util" "go.mongodb.org/mongo-driver/bson" ) @@ -69,9 +70,10 @@ func migrateReleasePlanCron(ctx *handler.Context) error { for _, releasePlan := range releasePlans { if releasePlan.ScheduleExecuteTime != 0 && releasePlan.Status == config.StatusExecuting { if time.Unix(releasePlan.ScheduleExecuteTime, 0).After(time.Now()) { + releasePlanCronName := util.GetReleasePlanCronName(releasePlan.ID.Hex(), releasePlan.Name, releasePlan.Index) cronjob := &commonmodels.Cronjob{ Enabled: true, - Name: releasePlan.Name, + Name: releasePlanCronName, Type: "release_plan", JobType: string(config.UnixstampSchedule), UnixStamp: releasePlan.ScheduleExecuteTime, diff --git a/pkg/microservice/aslan/config/consts.go b/pkg/microservice/aslan/config/consts.go index c219e3a16b..f3f70f9263 100644 --- a/pkg/microservice/aslan/config/consts.go +++ b/pkg/microservice/aslan/config/consts.go @@ -59,7 +59,7 @@ const ( // GapSchedule 间隔循环 GapSchedule ScheduleType = "gap" // UnixstampSchedule 时间戳 - UnixstampSchedule ScheduleType = "unixstamp" + UnixstampSchedule ScheduleType = "unix_stamp" ) type SlackNotifyType string diff --git a/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go b/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go index ce8d3aaed4..6d78bfefa2 100644 --- a/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go +++ b/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go @@ -455,6 +455,8 @@ func registerCronjob(job *service.Cronjob, client *client.Client, scheduler *cro } } else { switch job.Type { + case setting.WorkflowCronjob: + return nil case setting.WorkflowV4Cronjob: if job.WorkflowV4Args == nil { return fmt.Errorf("workflow args is nil") From a7ef527b512678d91341c04ff0dd4e9d2c5e7df0 Mon Sep 17 00:00:00 2001 From: Petrus Date: Fri, 10 Jan 2025 15:21:58 +0800 Subject: [PATCH 8/8] improve 321 ua performance (#3957) Signed-off-by: Patrick Zhao --- pkg/cli/upgradeassistant/cmd/migrate/321.go | 11 ++++++++--- .../core/common/repository/mongodb/release_plan.go | 5 +++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/cli/upgradeassistant/cmd/migrate/321.go b/pkg/cli/upgradeassistant/cmd/migrate/321.go index 845c919f24..c711961e97 100644 --- a/pkg/cli/upgradeassistant/cmd/migrate/321.go +++ b/pkg/cli/upgradeassistant/cmd/migrate/321.go @@ -17,11 +17,13 @@ package migrate import ( + "context" "fmt" "time" "github.com/koderover/zadig/v2/pkg/cli/upgradeassistant/internal/upgradepath" "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" "github.com/koderover/zadig/v2/pkg/shared/handler" @@ -61,13 +63,16 @@ func migrateReleasePlanCron(ctx *handler.Context) error { return fmt.Errorf("failed to delete release plan cronjobs, error: %w", err) } - releasePlans, _, err := commonrepo.NewReleasePlanColl().ListByOptions(&commonrepo.ListReleasePlanOption{}) + cursor, err := commonrepo.NewReleasePlanColl().ListByCursor() if err != nil { return fmt.Errorf("failed to list release plans, error: %w", err) } + for cursor.Next(context.Background()) { + var releasePlan models.ReleasePlan + if err := cursor.Decode(&releasePlan); err != nil { + return err + } - // create new cronjob for release plan if schedule time is after now and status is executing - for _, releasePlan := range releasePlans { if releasePlan.ScheduleExecuteTime != 0 && releasePlan.Status == config.StatusExecuting { if time.Unix(releasePlan.ScheduleExecuteTime, 0).After(time.Now()) { releasePlanCronName := util.GetReleasePlanCronName(releasePlan.ID.Hex(), releasePlan.Name, releasePlan.Index) diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go b/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go index beff221f23..52e161eed6 100644 --- a/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go +++ b/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go @@ -254,3 +254,8 @@ func (c *ReleasePlanColl) ListFinishedReleasePlan(startTime, endTime int64) ([]* return resp, nil } + +func (c *ReleasePlanColl) ListByCursor() (*mongo.Cursor, error) { + query := bson.M{} + return c.Collection.Find(context.TODO(), query) +}