diff --git a/go.mod b/go.mod index 01dd0fe2c2..a963260ae6 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/docker/go-connections v0.4.0 github.com/envoyproxy/go-control-plane v0.11.1-0.20230524094728-9239064ad72f github.com/gin-gonic/gin v1.9.1 - github.com/go-co-op/gocron v1.17.0 + github.com/go-co-op/gocron/v2 v2.14.0 github.com/go-ldap/ldap/v3 v3.3.0 github.com/go-redsync/redsync/v4 v4.11.0 github.com/go-resty/resty/v2 v2.7.0 @@ -82,7 +82,7 @@ require ( github.com/shirou/gopsutil/v3 v3.22.8 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.8.1 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/swaggo/files v0.0.0-20220728132757-551d4a08d97a github.com/swaggo/gin-swagger v1.5.3 github.com/tidwall/gjson v1.14.3 @@ -234,6 +234,7 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect + github.com/jonboulle/clockwork v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kennygrant/sanitize v1.2.4 // indirect diff --git a/pkg/cli/upgradeassistant/cmd/migrate/321.go b/pkg/cli/upgradeassistant/cmd/migrate/321.go new file mode 100644 index 0000000000..c711961e97 --- /dev/null +++ b/pkg/cli/upgradeassistant/cmd/migrate/321.go @@ -0,0 +1,99 @@ +/* + * Copyright 2024 The KodeRover Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package migrate + +import ( + "context" + "fmt" + "time" + + "github.com/koderover/zadig/v2/pkg/cli/upgradeassistant/internal/upgradepath" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + "github.com/koderover/zadig/v2/pkg/shared/handler" + "github.com/koderover/zadig/v2/pkg/util" + "go.mongodb.org/mongo-driver/bson" +) + +func init() { + upgradepath.RegisterHandler("3.2.0", "3.2.1", V320ToV321) + upgradepath.RegisterHandler("3.2.1", "3.2.0", V321ToV320) +} + +func V320ToV321() error { + ctx := handler.NewBackgroupContext() + + ctx.Logger.Infof("-------- start migrate release plan cronjob --------") + err := migrateReleasePlanCron(ctx) + if err != nil { + err = fmt.Errorf("failed to migrate release plan cronjob, error: %w", err) + ctx.Logger.Error(err) + return err + } + + return nil +} + +func V321ToV320() error { + return nil +} + +func migrateReleasePlanCron(ctx *handler.Context) error { + // delete all release plan cronjob first + _, err := commonrepo.NewCronjobColl().DeleteMany(ctx, + bson.M{"type": "release_plan"}, + ) + if err != nil { + return fmt.Errorf("failed to delete release plan cronjobs, error: %w", err) + } + + cursor, err := commonrepo.NewReleasePlanColl().ListByCursor() + if err != nil { + return fmt.Errorf("failed to list release plans, error: %w", err) + } + for cursor.Next(context.Background()) { + var releasePlan models.ReleasePlan + if err := cursor.Decode(&releasePlan); err != nil { + return err + } + + if releasePlan.ScheduleExecuteTime != 0 && releasePlan.Status == config.StatusExecuting { + if time.Unix(releasePlan.ScheduleExecuteTime, 0).After(time.Now()) { + releasePlanCronName := util.GetReleasePlanCronName(releasePlan.ID.Hex(), releasePlan.Name, releasePlan.Index) + cronjob := &commonmodels.Cronjob{ + Enabled: true, + Name: releasePlanCronName, + Type: "release_plan", + JobType: string(config.UnixstampSchedule), + UnixStamp: releasePlan.ScheduleExecuteTime, + ReleasePlanArgs: &commonmodels.ReleasePlanArgs{ + ID: releasePlan.ID.Hex(), + Name: releasePlan.Name, + Index: releasePlan.Index, + }, + } + if err := commonrepo.NewCronjobColl().Upsert(cronjob); err != nil { + return fmt.Errorf("failed to create new release plan schedule job, error: %w", err) + } + } + } + } + + return nil +} diff --git a/pkg/microservice/aslan/config/consts.go b/pkg/microservice/aslan/config/consts.go index be0570b6ff..f3f70f9263 100644 --- a/pkg/microservice/aslan/config/consts.go +++ b/pkg/microservice/aslan/config/consts.go @@ -58,6 +58,8 @@ const ( TimingSchedule ScheduleType = "timing" // GapSchedule 间隔循环 GapSchedule ScheduleType = "gap" + // UnixstampSchedule 时间戳 + UnixstampSchedule ScheduleType = "unix_stamp" ) type SlackNotifyType string diff --git a/pkg/microservice/aslan/core/build/handler/openapi.go b/pkg/microservice/aslan/core/build/handler/openapi.go index eadd7eeccd..e005a67d46 100644 --- a/pkg/microservice/aslan/core/build/handler/openapi.go +++ b/pkg/microservice/aslan/core/build/handler/openapi.go @@ -217,6 +217,9 @@ func OpenAPIGetBuildModule(c *gin.Context) { return } + serviceName := c.Query("serviceName") + serviceModule := c.Query("serviceModule") + // authorization checks if !ctx.Resources.IsSystemAdmin { if _, ok := ctx.Resources.ProjectAuthInfo[projectKey]; !ok { @@ -230,5 +233,5 @@ func OpenAPIGetBuildModule(c *gin.Context) { } } - ctx.Resp, ctx.RespErr = buildservice.OpenAPIGetBuildModule(name, projectKey, ctx.Logger) + ctx.Resp, ctx.RespErr = buildservice.OpenAPIGetBuildModule(name, serviceName, serviceModule, projectKey, ctx.Logger) } diff --git a/pkg/microservice/aslan/core/build/service/openapi.go b/pkg/microservice/aslan/core/build/service/openapi.go index a6f411154c..7912d816e5 100644 --- a/pkg/microservice/aslan/core/build/service/openapi.go +++ b/pkg/microservice/aslan/core/build/service/openapi.go @@ -316,7 +316,7 @@ func OpenAPIListBuildModules(projectName string, pageNum, pageSize int64, logger return resp, nil } -func OpenAPIGetBuildModule(name, projectName string, logger *zap.SugaredLogger) (*OpenAPIBuildDetailResp, error) { +func OpenAPIGetBuildModule(name, serviceName, serviceModule, projectName string, logger *zap.SugaredLogger) (*OpenAPIBuildDetailResp, error) { opt := &commonrepo.BuildFindOption{ Name: name, ProductName: projectName, @@ -348,18 +348,39 @@ func OpenAPIGetBuildModule(name, projectName string, logger *zap.SugaredLogger) } resp.Repos = make([]*OpenAPIRepo, 0) - for _, rp := range build.Repos { - repo := &OpenAPIRepo{ - RepoName: rp.RepoName, - Branch: rp.Branch, - Source: rp.Source, - RepoOwner: rp.RepoOwner, - RemoteName: rp.RemoteName, - CheckoutPath: rp.CheckoutPath, - Submodules: rp.SubModules, - Hidden: rp.Hidden, + if serviceName == "" || serviceModule == "" || build.TemplateID == "" { + for _, rp := range build.Repos { + repo := &OpenAPIRepo{ + RepoName: rp.RepoName, + Branch: rp.Branch, + Source: rp.Source, + RepoOwner: rp.RepoOwner, + RemoteName: rp.RemoteName, + CheckoutPath: rp.CheckoutPath, + Submodules: rp.SubModules, + Hidden: rp.Hidden, + } + resp.Repos = append(resp.Repos, repo) + } + } else { + for _, svcBuild := range build.Targets { + if svcBuild.ServiceName == serviceName && svcBuild.ServiceModule == serviceModule { + for _, rp := range svcBuild.Repos { + repo := &OpenAPIRepo{ + RepoName: rp.RepoName, + Branch: rp.Branch, + Source: rp.Source, + RepoOwner: rp.RepoOwner, + RemoteName: rp.RemoteName, + CheckoutPath: rp.CheckoutPath, + Submodules: rp.SubModules, + Hidden: rp.Hidden, + } + resp.Repos = append(resp.Repos, repo) + } + break + } } - resp.Repos = append(resp.Repos, repo) } resp.TargetServices = make([]*commonmodels.ServiceWithModule, 0) diff --git a/pkg/microservice/aslan/core/code/client/gitee/gitee.go b/pkg/microservice/aslan/core/code/client/gitee/gitee.go index 3642375421..32bec35781 100644 --- a/pkg/microservice/aslan/core/code/client/gitee/gitee.go +++ b/pkg/microservice/aslan/core/code/client/gitee/gitee.go @@ -152,7 +152,7 @@ func (c *Client) ListProjects(opt client.ListOpt) ([]*client.Project, error) { for _, project := range projects { res = append(res, &client.Project{ ID: project.ID, - Name: project.Name, + Name: project.Path, DefaultBranch: project.DefaultBranch, }) } diff --git a/pkg/microservice/aslan/core/common/repository/models/cronjob.go b/pkg/microservice/aslan/core/common/repository/models/cronjob.go index 0485632778..e756dae479 100644 --- a/pkg/microservice/aslan/core/common/repository/models/cronjob.go +++ b/pkg/microservice/aslan/core/common/repository/models/cronjob.go @@ -25,6 +25,7 @@ type Cronjob struct { Name string `bson:"name" json:"name"` Type string `bson:"type" json:"type"` Number uint64 `bson:"number" json:"number"` + UnixStamp int64 `bson:"unix_stamp" json:"unix_stamp"` Frequency string `bson:"frequency" json:"frequency"` Time string `bson:"time" json:"time"` Cron string `bson:"cron" json:"cron"` diff --git a/pkg/microservice/aslan/core/common/repository/models/workflow.go b/pkg/microservice/aslan/core/common/repository/models/workflow.go index 5e3375509c..c6d047a5de 100644 --- a/pkg/microservice/aslan/core/common/repository/models/workflow.go +++ b/pkg/microservice/aslan/core/common/repository/models/workflow.go @@ -121,6 +121,7 @@ type Schedule struct { ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"` Number uint64 `bson:"number" json:"number"` Frequency string `bson:"frequency" json:"frequency"` + UnixStamp int64 `bson:"unix_stamp" json:"unix_stamp"` Time string `bson:"time" json:"time"` MaxFailures int `bson:"max_failures,omitempty" json:"max_failures,omitempty"` TaskArgs *TaskArgs `bson:"task_args,omitempty" json:"task_args,omitempty"` diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go b/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go index c07e4094b3..52e161eed6 100644 --- a/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go +++ b/pkg/microservice/aslan/core/common/repository/mongodb/release_plan.go @@ -71,6 +71,10 @@ func (c *ReleasePlanColl) EnsureIndex(ctx context.Context) error { Keys: bson.M{"success_time": 1}, Options: options.Index().SetUnique(false), }, + { + Keys: bson.M{"update_time": 1}, + Options: options.Index().SetUnique(false), + }, } _, err := c.Indexes().CreateMany(ctx, mod) @@ -124,6 +128,13 @@ func (c *ReleasePlanColl) DeleteByID(ctx context.Context, idString string) error return err } +type SortReleasePlanBy string + +const ( + SortReleasePlanByIndex SortReleasePlanBy = "index" + SortReleasePlanByUpdateTime SortReleasePlanBy = "update_time" +) + type ListReleasePlanOption struct { PageNum int64 PageSize int64 @@ -131,7 +142,10 @@ type ListReleasePlanOption struct { Manager string SuccessTimeStart int64 SuccessTimeEnd int64 + UpdateTimeStart int64 + UpdateTimeEnd int64 IsSort bool + SortBy SortReleasePlanBy ExcludedFields []string Status config.ReleasePlanStatus } @@ -147,8 +161,15 @@ func (c *ReleasePlanColl) ListByOptions(opt *ListReleasePlanOption) ([]*models.R ctx := context.Background() opts := options.Find() if opt.IsSort { - opts.SetSort(bson.D{{"index", -1}}) + if opt.SortBy == SortReleasePlanByIndex { + opts.SetSort(bson.D{{"index", -1}}) + } else if opt.SortBy == SortReleasePlanByUpdateTime { + opts.SetSort(bson.D{{"update_time", -1}}) + } else { + opts.SetSort(bson.D{{"index", -1}}) + } } + if opt.PageNum > 0 && opt.PageSize > 0 { opts.SetSkip((opt.PageNum - 1) * opt.PageSize) opts.SetLimit(opt.PageSize) @@ -162,6 +183,9 @@ func (c *ReleasePlanColl) ListByOptions(opt *ListReleasePlanOption) ([]*models.R if opt.SuccessTimeStart > 0 && opt.SuccessTimeEnd > 0 { query["success_time"] = bson.M{"$gte": opt.SuccessTimeStart, "$lte": opt.SuccessTimeEnd} } + if opt.UpdateTimeStart > 0 && opt.UpdateTimeEnd > 0 { + query["update_time"] = bson.M{"$gte": opt.UpdateTimeStart, "$lte": opt.UpdateTimeEnd} + } if opt.Status != "" { query["status"] = opt.Status } @@ -230,3 +254,8 @@ func (c *ReleasePlanColl) ListFinishedReleasePlan(startTime, endTime int64) ([]* return resp, nil } + +func (c *ReleasePlanColl) ListByCursor() (*mongo.Cursor, error) { + query := bson.M{} + return c.Collection.Find(context.TODO(), query) +} diff --git a/pkg/microservice/aslan/core/common/service/cronjob.go b/pkg/microservice/aslan/core/common/service/cronjob.go index fd32eaf120..3bdc6adfee 100644 --- a/pkg/microservice/aslan/core/common/service/cronjob.go +++ b/pkg/microservice/aslan/core/common/service/cronjob.go @@ -21,10 +21,11 @@ import ( ) type CronjobPayload struct { - Name string `json:"name"` - ProductName string `json:"product_name"` - Action string `json:"action"` - JobType string `json:"job_type"` - DeleteList []string `json:"delete_list,omitempty"` - JobList []*models.Schedule `json:"job_list,omitempty"` + Name string `json:"name"` + ProductName string `json:"product_name"` + Action string `json:"action"` + JobType string `json:"job_type"` + ScheduleType string `json:"schedule_type"` + DeleteList []string `json:"delete_list,omitempty"` + JobList []*models.Schedule `json:"job_list,omitempty"` } diff --git a/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go b/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go index 6333db2150..ada45f078e 100644 --- a/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go +++ b/pkg/microservice/aslan/core/common/service/instantmessage/workflow_task.go @@ -566,7 +566,7 @@ func (w *Service) getNotificationContent(notify *models.NotifyCtl, task *models. } } } - if image != "" { + if image != "" && !strings.HasPrefix(image, "{{.") && !strings.Contains(image, "}}") { jobTplcontent += fmt.Sprintf("{{if eq .WebHookType \"dingding\"}}##### {{end}}**镜像信息**:%s \n", image) mailJobTplcontent += fmt.Sprintf("镜像信息:%s \n", image) workflowNotifyJobTaskSpec.Image = image diff --git a/pkg/microservice/aslan/core/cron/handler/cron.go b/pkg/microservice/aslan/core/cron/handler/cron.go index 3853041c72..afb9be9607 100644 --- a/pkg/microservice/aslan/core/cron/handler/cron.go +++ b/pkg/microservice/aslan/core/cron/handler/cron.go @@ -64,6 +64,7 @@ type cronjobResp struct { Name string `json:"name"` Type string `json:"type"` Number uint64 `json:"number"` + UnixStamp int64 `json:"unix_stamp"` Frequency string `json:"frequency"` Time string `json:"time"` Cron string `json:"cron"` @@ -92,6 +93,7 @@ func ListActiveCronjobFailsafe(c *gin.Context) { Name: cronjob.Name, Type: cronjob.Type, Number: cronjob.Number, + UnixStamp: cronjob.UnixStamp, Frequency: cronjob.Frequency, Time: cronjob.Time, Cron: cronjob.Cron, @@ -124,6 +126,7 @@ func ListActiveCronjob(c *gin.Context) { Name: cronjob.Name, Type: cronjob.Type, Number: cronjob.Number, + UnixStamp: cronjob.UnixStamp, Frequency: cronjob.Frequency, Time: cronjob.Time, Cron: cronjob.Cron, diff --git a/pkg/microservice/aslan/core/environment/service/environment.go b/pkg/microservice/aslan/core/environment/service/environment.go index 542c8f474f..fb86c570ad 100644 --- a/pkg/microservice/aslan/core/environment/service/environment.go +++ b/pkg/microservice/aslan/core/environment/service/environment.go @@ -1493,7 +1493,7 @@ func GetAffectedServices(productName, envName string, arg *K8sRendersetArg, log func GeneEstimatedValues(productName, envName, serviceOrReleaseName, scene, format string, arg *EstimateValuesArg, isHelmChartDeploy bool, log *zap.SugaredLogger) (interface{}, error) { var ( productSvc *commonmodels.ProductService - tmplSvc *commonmodels.Service + tmplSvc *commonmodels.Service productInfo *commonmodels.Product err error ) @@ -3791,6 +3791,7 @@ func cronJobToSchedule(input *commonmodels.Cronjob) *commonmodels.Schedule { return &commonmodels.Schedule{ ID: input.ID, Number: input.Number, + UnixStamp: input.UnixStamp, Frequency: input.Frequency, Time: input.Time, MaxFailures: input.MaxFailure, diff --git a/pkg/microservice/aslan/core/log/handler/router.go b/pkg/microservice/aslan/core/log/handler/router.go index 8fa82633d4..5a517430a3 100644 --- a/pkg/microservice/aslan/core/log/handler/router.go +++ b/pkg/microservice/aslan/core/log/handler/router.go @@ -38,7 +38,6 @@ func (*Router) Inject(router *gin.RouterGroup) { sse := router.Group("sse") { sse.GET("/pods/:podName/containers/:containerName", GetContainerLogsSSE) - sse.GET("/production/pods/:podName/containers/:containerName", GetProductionEnvContainerLogsSSE) sse.GET("/testing/:test_name/tasks/:task_id", GetTestingContainerLogsSSE) sse.GET("/scanning/:id/task/:scan_id", GetScanningContainerLogsSSE) sse.GET("/v4/workflow/:workflowName/:taskID/:jobName/:lines", GetWorkflowJobContainerLogsSSE) diff --git a/pkg/microservice/aslan/core/log/handler/sse.go b/pkg/microservice/aslan/core/log/handler/sse.go index 5123e08af1..b4e5a6f9ed 100644 --- a/pkg/microservice/aslan/core/log/handler/sse.go +++ b/pkg/microservice/aslan/core/log/handler/sse.go @@ -55,69 +55,54 @@ func GetContainerLogsSSE(c *gin.Context) { envName := c.Query("envName") productName := c.Query("projectName") + isProduction := c.Query("production") == "true" - // authorization checks - if !ctx.Resources.IsSystemAdmin { - if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { - ctx.UnAuthorized = true - internalhandler.JSONResponse(c, ctx) - return - } - if !ctx.Resources.ProjectAuthInfo[productName].Env.View && - !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { - permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.EnvActionView) - if err != nil || !permitted { + if !isProduction { + // authorization checks + if !ctx.Resources.IsSystemAdmin { + if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { ctx.UnAuthorized = true internalhandler.JSONResponse(c, ctx) return } + if !ctx.Resources.ProjectAuthInfo[productName].Env.View && + !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { + permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.EnvActionView) + if err != nil || !permitted { + ctx.UnAuthorized = true + internalhandler.JSONResponse(c, ctx) + return + } + } } - } - - internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { - logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) - }, logger) -} -func GetProductionEnvContainerLogsSSE(c *gin.Context) { - logger := ginzap.WithContext(c).Sugar() - ctx, err := internalhandler.NewContextWithAuthorization(c) - if err != nil { - ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err) - ctx.UnAuthorized = true - internalhandler.JSONResponse(c, ctx) - return - } - - tails, err := strconv.ParseInt(c.Query("tails"), 10, 64) - if err != nil { - tails = int64(10) - } - - envName := c.Query("envName") - productName := c.Query("projectName") - - // authorization checks - if !ctx.Resources.IsSystemAdmin { - if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { - ctx.UnAuthorized = true - internalhandler.JSONResponse(c, ctx) - return - } - if !ctx.Resources.ProjectAuthInfo[productName].ProductionEnv.View && - !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { - permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.ProductionEnvActionView) - if err != nil || !permitted { + internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { + logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) + }, logger) + } else { + // authorization checks + if !ctx.Resources.IsSystemAdmin { + if _, ok := ctx.Resources.ProjectAuthInfo[productName]; !ok { ctx.UnAuthorized = true internalhandler.JSONResponse(c, ctx) return } + if !ctx.Resources.ProjectAuthInfo[productName].ProductionEnv.View && + !ctx.Resources.ProjectAuthInfo[productName].IsProjectAdmin { + permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, productName, types.ResourceTypeEnvironment, envName, types.ProductionEnvActionView) + if err != nil || !permitted { + ctx.UnAuthorized = true + internalhandler.JSONResponse(c, ctx) + return + } + } } + + internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { + logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) + }, logger) } - internalhandler.Stream(c, func(ctx context.Context, streamChan chan interface{}) { - logservice.ContainerLogStream(ctx, streamChan, envName, productName, c.Param("podName"), c.Param("containerName"), true, tails, logger) - }, logger) } func GetWorkflowJobContainerLogsSSE(c *gin.Context) { diff --git a/pkg/microservice/aslan/core/release_plan/handler/release_plan.go b/pkg/microservice/aslan/core/release_plan/handler/release_plan.go index 7ef23436f7..c85dc2d5e7 100644 --- a/pkg/microservice/aslan/core/release_plan/handler/release_plan.go +++ b/pkg/microservice/aslan/core/release_plan/handler/release_plan.go @@ -204,7 +204,7 @@ func ScheduleExecuteReleasePlan(c *gin.Context) { return } - ctx.RespErr = service.ScheduleExecuteReleasePlan(ctx, c.Param("id")) + ctx.RespErr = service.ScheduleExecuteReleasePlan(ctx, c.Param("id"), c.Query("jobID")) } func SkipReleaseJob(c *gin.Context) { diff --git a/pkg/microservice/aslan/core/release_plan/service/release_plan.go b/pkg/microservice/aslan/core/release_plan/service/release_plan.go index e37015b22b..1f916d00eb 100644 --- a/pkg/microservice/aslan/core/release_plan/service/release_plan.go +++ b/pkg/microservice/aslan/core/release_plan/service/release_plan.go @@ -28,6 +28,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" @@ -123,7 +124,7 @@ func CreateReleasePlan(c *handler.Context, args *models.ReleasePlan) error { return nil } -func upsertReleasePlanCron(id, name string, index int64, ScheduleExecuteTime int64) error { +func upsertReleasePlanCron(id, name string, index int64, status config.ReleasePlanStatus, ScheduleExecuteTime int64) error { var ( err error payload *commonservice.CronjobPayload @@ -146,67 +147,110 @@ func upsertReleasePlanCron(id, name string, index int64, ScheduleExecuteTime int found = true } - if found { - origEnabled := releasePlanCron.Enabled - releasePlanCron.Enabled = enable - releasePlanCron.ReleasePlanArgs = &commonmodels.ReleasePlanArgs{ - ID: id, - Name: name, - Index: index, - } - releasePlanCron.Cron = util.UnixStampToCronExpr(ScheduleExecuteTime) + if status != config.StatusExecuting { + // delete cron job if status is not executing + if found { + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{releasePlanCron.ID.Hex()}, + }) + if err != nil { + fmtErr := fmt.Errorf("Failed to delete release plan schedule job %s, error: %w", releasePlanCron.ID.Hex(), err) + log.Error(fmtErr) + } - err = commonrepo.NewCronjobColl().Upsert(releasePlanCron) - if err != nil { - fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) - log.Error(fmtErr) - return err + payload = &commonservice.CronjobPayload{ + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + DeleteList: []string{releasePlanCron.ID.Hex()}, + } } + } else { + // upsert cron job if status is executing + if found { + origEnabled := releasePlanCron.Enabled + releasePlanCron.Enabled = enable + releasePlanCron.ReleasePlanArgs = &commonmodels.ReleasePlanArgs{ + ID: id, + Name: name, + Index: index, + } + releasePlanCron.JobType = setting.UnixStampSchedule + releasePlanCron.UnixStamp = ScheduleExecuteTime + + if origEnabled && !enable { + // need to disable cronjob + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{releasePlanCron.ID.Hex()}, + }) + if err != nil { + fmtErr := fmt.Errorf("Failed to delete cron job %s, error: %w", releasePlanCron.ID.Hex(), err) + log.Error(fmtErr) + } - if origEnabled && !enable { - // need to disable cronjob - payload = &commonservice.CronjobPayload{ - Name: releasePlanCronName, - JobType: setting.ReleasePlanCronjob, - Action: setting.TypeEnableCronjob, - DeleteList: []string{releasePlanCron.ID.Hex()}, + payload = &commonservice.CronjobPayload{ + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + DeleteList: []string{releasePlanCron.ID.Hex()}, + } + } else if !origEnabled && enable || origEnabled && enable { + err = commonrepo.NewCronjobColl().Upsert(releasePlanCron) + if err != nil { + fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) + log.Error(fmtErr) + return err + } + + payload = &commonservice.CronjobPayload{ + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + JobList: []*commonmodels.Schedule{cronJobToSchedule(releasePlanCron)}, + } + } else { + // !origEnabled && !enable + return nil + } + } else { + if !enable { + return nil + } + + input := &commonmodels.Cronjob{ + Enabled: enable, + Name: releasePlanCronName, + Type: setting.ReleasePlanCronjob, + JobType: setting.UnixStampSchedule, + UnixStamp: ScheduleExecuteTime, + ReleasePlanArgs: &commonmodels.ReleasePlanArgs{ + ID: id, + Name: name, + Index: index, + }, + } + + err = commonrepo.NewCronjobColl().Upsert(input) + if err != nil { + fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) + log.Error(fmtErr) + return err } - } else if !origEnabled && enable || origEnabled && enable { payload = &commonservice.CronjobPayload{ - Name: releasePlanCronName, - JobType: setting.ReleasePlanCronjob, - Action: setting.TypeEnableCronjob, - JobList: []*commonmodels.Schedule{cronJobToSchedule(releasePlanCron)}, + Name: releasePlanCronName, + JobType: setting.ReleasePlanCronjob, + Action: setting.TypeEnableCronjob, + ScheduleType: setting.UnixStampSchedule, + JobList: []*commonmodels.Schedule{cronJobToSchedule(input)}, } - } else { - // !origEnabled && !enable - return nil - } - } else { - input := &commonmodels.Cronjob{ - Name: releasePlanCronName, - Type: setting.ReleasePlanCronjob, - } - input.Enabled = true - input.Cron = util.UnixStampToCronExpr(ScheduleExecuteTime) - input.ReleasePlanArgs = &commonmodels.ReleasePlanArgs{ - ID: id, - Name: name, - Index: index, } + } - err = commonrepo.NewCronjobColl().Upsert(input) - if err != nil { - fmtErr := fmt.Errorf("Failed to upsert cron job, error: %w", err) - log.Error(fmtErr) - return err - } - payload = &commonservice.CronjobPayload{ - Name: releasePlanCronName, - JobType: setting.ReleasePlanCronjob, - Action: setting.TypeEnableCronjob, - JobList: []*commonmodels.Schedule{cronJobToSchedule(input)}, - } + if payload == nil { + return nil } pl, _ := json.Marshal(payload) @@ -296,6 +340,23 @@ func DeleteReleasePlan(c *gin.Context, username, id string) error { return errors.Wrap(err, "get plan") } internalhandler.InsertOperationLog(c, username, "", "删除", "发布计划", info.Name, "", log.SugaredLogger()) + + releasePlanCronName := util.GetReleasePlanCronName(id, info.Name, info.Index) + releasePlanCron, err := commonrepo.NewCronjobColl().GetByName(releasePlanCronName, setting.ReleasePlanCronjob) + if err != nil { + if err != mongo.ErrNoDocuments && err != mongo.ErrNilDocument { + return e.ErrUpsertCronjob.AddErr(fmt.Errorf("failed to get release plan cron job, err: %w", err)) + } + } else { + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{releasePlanCron.ID.Hex()}, + }) + if err != nil { + fmtErr := fmt.Errorf("Failed to delete release plan schedule job %s, error: %w", releasePlanCron.ID.Hex(), err) + log.Error(fmtErr) + } + } + return mongodb.NewReleasePlanColl().DeleteByID(context.Background(), id) } @@ -435,11 +496,37 @@ func ExecuteReleaseJob(c *handler.Context, planID string, args *ExecuteReleaseJo return nil } -func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { +func ScheduleExecuteReleasePlan(c *handler.Context, planID, jobID string) error { approveLock := getLock(planID) approveLock.Lock() defer approveLock.Unlock() + // check if the job is already executed + jobObjectID, err := primitive.ObjectIDFromHex(jobID) + if err != nil { + return errors.Wrap(err, "invalid job ID") + } + _, err = commonrepo.NewCronjobColl().GetByID(jobObjectID) + if err != nil { + if !mongodb.IsErrNoDocuments(err) { + err = fmt.Errorf("Failed to get release job schedule job %s, error: %v", jobID, err) + log.Error(err) + return err + } else { + err = fmt.Errorf("Release job schedule job %s not found", jobID) + log.Error(err) + return err + } + } + + // delete the schedule job after executed + err = commonrepo.NewCronjobColl().Delete(&commonrepo.CronjobDeleteOption{ + IDList: []string{jobID}, + }) + if err != nil { + log.Errorf("Failed to delete release job schedule job %s, error: %v", jobID, err) + } + ctx := context.Background() plan, err := mongodb.NewReleasePlanColl().GetByID(ctx, planID) if err != nil { @@ -449,7 +536,7 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { } if plan.Status != config.StatusExecuting { - err = errors.Errorf("plan ID is %s, name is %s, index is %d, status is %s, can not execute", plan.ID, plan.Name, plan.Index, plan.Status) + err = errors.Errorf("plan ID is %s, name is %s, index is %d, status is %s, can not execute", plan.ID.Hex(), plan.Name, plan.Index, plan.Status) log.Error(err) return err } @@ -457,19 +544,19 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { if !(plan.StartTime == 0 && plan.EndTime == 0) { now := time.Now().Unix() if now < plan.StartTime || now > plan.EndTime { - err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's not in the release time range", plan.ID, plan.Name, plan.Index) + err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's not in the release time range", plan.ID.Hex(), plan.Name, plan.Index) log.Error(err) return err } } if plan.Approval != nil && plan.Approval.Enabled == true && plan.Approval.Status != config.StatusPassed { - err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's approval status is %s, can not execute", plan.ID, plan.Name, plan.Index, plan.Approval.Status) + err = errors.Errorf("plan ID is %s, name is %s, index is %d, it's approval status is %s, can not execute", plan.ID.Hex(), plan.Name, plan.Index, plan.Approval.Status) log.Error(err) return err } - log.Infof("schedule execute release plan, plan ID: %s, name: %s, index: %d", plan.ID, plan.Name, plan.Index) + log.Infof("schedule execute release plan, plan ID: %s, name: %s, index: %d", plan.ID.Hex(), plan.Name, plan.Index) for _, job := range plan.Jobs { if job.Type == config.JobWorkflow { @@ -482,6 +569,21 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { Name: job.Name, Type: string(job.Type), } + + go func() { + if err := mongodb.NewReleasePlanLogColl().Create(&models.ReleasePlanLog{ + PlanID: planID, + Username: "系统", + Account: "", + Verb: VerbExecute, + TargetName: args.Name, + TargetType: TargetTypeReleaseJob, + CreatedAt: time.Now().Unix(), + }); err != nil { + log.Errorf("create release plan log error: %v", err) + } + }() + executor, err := NewReleaseJobExecutor(&ExecuteReleaseJobContext{ AuthResources: c.Resources, UserID: c.UserID, @@ -508,27 +610,13 @@ func ScheduleExecuteReleasePlan(c *handler.Context, planID string) error { plan.Status = config.StatusSuccess } - log.Infof("schedule execute release job, plan ID: %s, name: %s, index: %d, job ID: %s, job name: %s", plan.ID, plan.Name, plan.Index, job.ID, job.Name) + log.Infof("schedule execute release job, plan ID: %s, name: %s, index: %d, job ID: %s, job name: %s", plan.ID.Hex(), plan.Name, plan.Index, job.ID, job.Name) if err = mongodb.NewReleasePlanColl().UpdateByID(ctx, planID, plan); err != nil { err = errors.Wrap(err, "update plan") log.Error(err) return err } - - go func() { - if err := mongodb.NewReleasePlanLogColl().Create(&models.ReleasePlanLog{ - PlanID: planID, - Username: "系统", - Account: "", - Verb: VerbExecute, - TargetName: args.Name, - TargetType: TargetTypeReleaseJob, - CreatedAt: time.Now().Unix(), - }); err != nil { - log.Errorf("create release plan log error: %v", err) - } - }() } } @@ -652,10 +740,6 @@ func UpdateReleasePlanStatus(c *handler.Context, planID, status string, isSystem return errors.Errorf("approval status is %s, can not execute", plan.Approval.Status) } - if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.ScheduleExecuteTime); err != nil { - return errors.Wrap(err, "upsert release plan cron") - } - plan.ExecutingTime = time.Now().Unix() setReleaseJobsForExecuting(plan) case config.StatusWaitForApprove: @@ -685,6 +769,10 @@ func UpdateReleasePlanStatus(c *handler.Context, planID, status string, isSystem return errors.Wrap(err, "update plan") } + if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.Status, plan.ScheduleExecuteTime); err != nil { + return errors.Wrap(err, "upsert release plan cron") + } + go func() { if err := mongodb.NewReleasePlanLogColl().Create(&models.ReleasePlanLog{ PlanID: planID, @@ -773,7 +861,7 @@ func ApproveReleasePlan(c *handler.Context, planID string, req *ApproveRequest) plan.Status = config.StatusExecuting plan.ApprovalTime = time.Now().Unix() - if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.ScheduleExecuteTime); err != nil { + if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.Status, plan.ScheduleExecuteTime); err != nil { err = errors.Wrap(err, "upsert release plan cron") log.Error(err) } @@ -875,6 +963,7 @@ func cronJobToSchedule(input *commonmodels.Cronjob) *commonmodels.Schedule { return &commonmodels.Schedule{ ID: input.ID, Number: input.Number, + UnixStamp: input.UnixStamp, Frequency: input.Frequency, Time: input.Time, MaxFailures: input.MaxFailure, @@ -891,6 +980,7 @@ const ( ListReleasePlanTypeName ListReleasePlanType = "name" ListReleasePlanTypeManager ListReleasePlanType = "manager" ListReleasePlanTypeSuccessTime ListReleasePlanType = "success_time" + ListReleasePlanTypeUpdateTime ListReleasePlanType = "update_time" ListReleasePlanTypeStatus ListReleasePlanType = "status" ) @@ -950,10 +1040,37 @@ func ListReleasePlans(opt *ListReleasePlanOption) (*ListReleasePlanResp, error) SuccessTimeStart: timeStart, SuccessTimeEnd: timeEnd, IsSort: true, + SortBy: mongodb.SortReleasePlanByUpdateTime, PageNum: opt.PageNum, PageSize: opt.PageSize, ExcludedFields: []string{"jobs", "logs"}, }) + case ListReleasePlanTypeUpdateTime: + timeArr := strings.Split(opt.Keyword, "-") + if len(timeArr) != 2 { + return nil, errors.New("invalid update time range") + } + + timeStart := int64(0) + timeEnd := int64(0) + timeStart, err = strconv.ParseInt(timeArr[0], 10, 64) + if err != nil { + return nil, errors.Wrap(err, "invalid update time start") + } + timeEnd, err = strconv.ParseInt(timeArr[1], 10, 64) + if err != nil { + return nil, errors.Wrap(err, "invalid update time end") + } + + list, total, err = mongodb.NewReleasePlanColl().ListByOptions(&mongodb.ListReleasePlanOption{ + UpdateTimeStart: timeStart, + UpdateTimeEnd: timeEnd, + IsSort: true, + SortBy: mongodb.SortReleasePlanByUpdateTime, + PageNum: opt.PageNum, + PageSize: opt.PageSize, + ExcludedFields: []string{"jobs", "logs"}, + }) case ListReleasePlanTypeStatus: list, total, err = mongodb.NewReleasePlanColl().ListByOptions(&mongodb.ListReleasePlanOption{ Status: config.ReleasePlanStatus(opt.Keyword), diff --git a/pkg/microservice/aslan/core/release_plan/service/watcher.go b/pkg/microservice/aslan/core/release_plan/service/watcher.go index a3dba99232..8d4109ff43 100644 --- a/pkg/microservice/aslan/core/release_plan/service/watcher.go +++ b/pkg/microservice/aslan/core/release_plan/service/watcher.go @@ -200,7 +200,7 @@ func updatePlanApproval(plan *models.ReleasePlan) error { plan.Status = config.StatusExecuting plan.ApprovalTime = time.Now().Unix() - if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.ScheduleExecuteTime); err != nil { + if err := upsertReleasePlanCron(plan.ID.Hex(), plan.Name, plan.Index, plan.Status, plan.ScheduleExecuteTime); err != nil { err = errors.Wrap(err, "upsert release plan cron") log.Error(err) } diff --git a/pkg/microservice/aslan/core/service.go b/pkg/microservice/aslan/core/service.go index 7f881797ee..7e83f99192 100644 --- a/pkg/microservice/aslan/core/service.go +++ b/pkg/microservice/aslan/core/service.go @@ -23,7 +23,7 @@ import ( "sync" "time" - newgoCron "github.com/go-co-op/gocron" + newgoCron "github.com/go-co-op/gocron/v2" _ "github.com/go-sql-driver/mysql" "github.com/hashicorp/go-multierror" "github.com/koderover/zadig/v2/pkg/tool/clientmanager" @@ -156,9 +156,13 @@ func Stop(ctx context.Context) { var Scheduler *newgoCron.Scheduler func initCron() { - Scheduler = newgoCron.NewScheduler(time.Local) + Scheduler, err := newgoCron.NewScheduler() + if err != nil { + log.Fatalf("failed to create scheduler: %v", err) + return + } - Scheduler.Every(5).Minutes().Do(func() { + Scheduler.NewJob(newgoCron.DurationJob(5*time.Minute), newgoCron.NewTask(func() { log.Infof("[CRONJOB] updating tokens for gitlab....") codehostList, err := mongodb2.NewCodehostColl().List(&mongodb2.ListArgs{ Source: "gitlab", @@ -175,11 +179,11 @@ func initCron() { } } log.Infof("[CRONJOB] gitlab token updated....") - }) + })) - Scheduler.Every(1).Days().At("04:00").Do(cleanCacheFiles) + Scheduler.NewJob(newgoCron.DailyJob(1, newgoCron.NewAtTimes(newgoCron.NewAtTime(4, 0, 0))), newgoCron.NewTask(cleanCacheFiles)) - Scheduler.StartAsync() + Scheduler.Start() } func initService() { diff --git a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go index d2bae88446..84581c04ca 100644 --- a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go +++ b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_v4.go @@ -2083,6 +2083,7 @@ func cronJobToSchedule(input *commonmodels.Cronjob) *commonmodels.Schedule { ID: input.ID, Number: input.Number, Frequency: input.Frequency, + UnixStamp: input.UnixStamp, Time: input.Time, MaxFailures: input.MaxFailure, WorkflowV4Args: input.WorkflowV4Args, diff --git a/pkg/microservice/cron/core/service/cronjob.go b/pkg/microservice/cron/core/service/cronjob.go index 91184d97b6..fab49e1117 100644 --- a/pkg/microservice/cron/core/service/cronjob.go +++ b/pkg/microservice/cron/core/service/cronjob.go @@ -17,12 +17,13 @@ limitations under the License. package service type CronjobPayload struct { - Name string `json:"name"` - ProductName string `json:"product_name"` - Action string `json:"action"` - JobType string `json:"job_type"` - DeleteList []string `json:"delete_list,omitempty"` - JobList []*Schedule `json:"job_list,omitempty"` + Name string `json:"name"` + ProductName string `json:"product_name"` + Action string `json:"action"` + JobType string `json:"job_type"` + ScheduleType string `json:"schedule_type"` + DeleteList []string `json:"delete_list,omitempty"` + JobList []*Schedule `json:"job_list,omitempty"` } type Cronjob struct { @@ -31,6 +32,7 @@ type Cronjob struct { Type string `json:"type"` Number uint64 `json:"number"` Frequency string `json:"frequency"` + UnixStamp int64 `json:"unix_stamp"` Time string `json:"time"` Cron string `json:"cron"` ProductName string `json:"product_name,omitempty"` diff --git a/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go b/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go index 9fb85261f1..6d78bfefa2 100644 --- a/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go +++ b/pkg/microservice/cron/core/service/scheduler/cronjob_handler.go @@ -26,6 +26,7 @@ import ( "strings" "time" + newgoCron "github.com/go-co-op/gocron/v2" "github.com/rfyiamcool/cronlib" "github.com/koderover/zadig/v2/pkg/microservice/cron/core/service" @@ -42,20 +43,22 @@ const ( ) type CronjobHandler struct { - aslanCli *client.Client - Scheduler *cronlib.CronSchduler + aslanCli *client.Client + Scheduler *cronlib.CronSchduler + NewScheduler newgoCron.Scheduler } -func NewCronjobHandler(client *client.Client, scheduler *cronlib.CronSchduler) *CronjobHandler { - InitExistedCronjob(client, scheduler) +func NewCronjobHandler(client *client.Client, scheduler *cronlib.CronSchduler, newScheduler newgoCron.Scheduler) *CronjobHandler { + InitExistedCronjob(client, scheduler, newScheduler) return &CronjobHandler{ - aslanCli: client, - Scheduler: scheduler, + aslanCli: client, + Scheduler: scheduler, + NewScheduler: newScheduler, } } -func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler) { +func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler, newScheduler newgoCron.Scheduler) { log.Infof("Initializing existing cronjob ....") initChan := make(chan []*service.Cronjob, 1) @@ -99,7 +102,7 @@ func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler) select { case jobList = <-initChan: for _, job := range jobList { - err := registerCronjob(job, client, scheduler) + err := registerCronjob(job, client, scheduler, newScheduler) if err != nil { fmt.Printf("Failed to init job with id: %s, err: %s\n", job.ID, err) } @@ -111,7 +114,7 @@ func InitExistedCronjob(client *client.Client, scheduler *cronlib.CronSchduler) select { case jobList = <-failsafeChan: for _, job := range jobList { - err := registerCronjob(job, client, scheduler) + err := registerCronjob(job, client, scheduler, newScheduler) if err != nil { fmt.Printf("Failed to init job with id: %s, err: %s\n", job.ID, err) } @@ -128,77 +131,97 @@ func (h *CronjobHandler) HandleMessage(msgs []*service.CronjobPayload) error { for _, msg := range msgs { switch msg.Action { case setting.TypeEnableCronjob: - err := h.updateCronjob(msg.Name, msg.ProductName, msg.JobType, msg.JobList, msg.DeleteList) + err := h.updateCronjob(msg.Name, msg.ProductName, msg.ScheduleType, msg.JobType, msg.JobList, msg.DeleteList) if err != nil { log.Errorf("Failed to update cronjob, the error is: %v", err) return err } case setting.TypeDisableCronjob: - err := h.stopCronjob(msg.Name, msg.JobType) + err := h.stopCronjob(msg.Name, msg.JobType, msg.ScheduleType) if err != nil { log.Errorf("Failed to stop all cron job, the error is: %v", err) return err } default: - log.Errorf("unsupported cronjob action: NOT RECONSUMING") + log.Errorf("unsupported cronjob action: %+v", msg) } } return nil } -func (h *CronjobHandler) updateCronjob(name, productName, jobType string, jobList []*service.Schedule, deleteList []string) error { - //首先根据deleteList停止不需要的cronjob - for _, deleteID := range deleteList { - jobID := deleteID - log.Infof("stopping Job of ID: %s", jobID) - h.Scheduler.StopService(jobID) - } - // 根据job内容来在scheduler中新增cronjob - for _, job := range jobList { - var cron string - if job.Type == setting.FixedGapCronjob || job.Type == setting.FixedDayTimeCronjob { - cronString, err := convertFixedTimeToCron(job) - if err != nil { - return err - } - cron = cronString - } else { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) +func (h *CronjobHandler) updateCronjob(name, productName, scheduleType, jobType string, jobList []*service.Schedule, deleteList []string) error { + if scheduleType == setting.UnixStampSchedule { + //首先根据deleteList停止不需要的cronjob + for _, deleteID := range deleteList { + jobID := deleteID + log.Infof("stopping UnixStamp Schedule Job of ID: %s", jobID) + h.NewScheduler.RemoveByTags(jobID) } - switch jobType { - case setting.WorkflowCronjob: - err := h.registerWorkFlowJob(name, cron, job) - if err != nil { - return err - } - case setting.TestingCronjob: - err := h.registerTestJob(name, productName, cron, job) - if err != nil { - return err - } - case setting.WorkflowV4Cronjob: - err := h.registerWorkFlowV4Job(name, cron, job) - if err != nil { - return err - } - case setting.EnvAnalysisCronjob: - err := h.registerEnvAnalysisJob(name, cron, job) - if err != nil { - return err + + // 根据job内容来在scheduler中新增cronjob + for _, job := range jobList { + switch jobType { + case setting.ReleasePlanCronjob: + err := h.registerReleasePlanJob(name, job) + if err != nil { + return err + } + default: + log.Errorf("unrecognized cron job type for job id: %s", job.ID) } - case setting.EnvSleepCronjob: - err := h.registerEnvSleepJob(name, cron, job) - if err != nil { - return err + } + + } else { + //首先根据deleteList停止不需要的cronjob + for _, deleteID := range deleteList { + jobID := deleteID + log.Infof("stopping Job of ID: %s", jobID) + h.Scheduler.StopService(jobID) + } + + // 根据job内容来在scheduler中新增cronjob + for _, job := range jobList { + var cron string + if job.Type == setting.FixedGapCronjob || job.Type == setting.FixedDayTimeCronjob { + cronString, err := convertFixedTimeToCron(job) + if err != nil { + return err + } + cron = cronString + } else { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) } - case setting.ReleasePlanCronjob: - err := h.registerReleasePlanJob(name, cron, job) - if err != nil { - return err + + switch jobType { + case setting.WorkflowCronjob: + err := h.registerWorkFlowJob(name, cron, job) + if err != nil { + return err + } + case setting.TestingCronjob: + err := h.registerTestJob(name, productName, cron, job) + if err != nil { + return err + } + case setting.WorkflowV4Cronjob: + err := h.registerWorkFlowV4Job(name, cron, job) + if err != nil { + return err + } + case setting.EnvAnalysisCronjob: + err := h.registerEnvAnalysisJob(name, cron, job) + if err != nil { + return err + } + case setting.EnvSleepCronjob: + err := h.registerEnvSleepJob(name, cron, job) + if err != nil { + return err + } + default: + log.Errorf("unrecognized cron job type for job id: %s", job.ID) } - default: - log.Errorf("unrecognized cron job type for job id: %s", job.ID) } } return nil @@ -333,7 +356,7 @@ func (h *CronjobHandler) registerTestJob(name, productName, schedule string, job // FIXME // UNDER CURRENT SERVICE STRUCTURE, STOPPING CRONJOB SERVICE AND UPDATING DB RECORD // ARE NOT ATOMIC, THIS WILL CAUSE SERIOUS PROBLEM IF UPDATE FAILED -func (h *CronjobHandler) stopCronjob(name, ptype string) error { +func (h *CronjobHandler) stopCronjob(name, ptype, scheduleType string) error { var jobList []*service.Cronjob listAPI := fmt.Sprintf("%s/cron/cronjob/type/%s/name/%s", h.aslanCli.APIBase, ptype, name) header := http.Header{} @@ -348,9 +371,16 @@ func (h *CronjobHandler) stopCronjob(name, ptype string) error { return err } - for _, job := range jobList { - log.Infof("stopping cronjob of ID: %s", job.ID) - h.Scheduler.StopService(job.ID) + if scheduleType == setting.UnixStampSchedule { + for _, job := range jobList { + log.Infof("stopping unixstamp schedule job of ID: %s", job.ID) + h.NewScheduler.RemoveByTags(job.ID) + } + } else { + for _, job := range jobList { + log.Infof("stopping cronjob of ID: %s", job.ID) + h.Scheduler.StopService(job.ID) + } } disableAPI := fmt.Sprintf("%s/cron/cronjob/disable", h.aslanCli.APIBase) @@ -369,206 +399,195 @@ func (h *CronjobHandler) stopCronjob(name, ptype string) error { return nil } -func registerCronjob(job *service.Cronjob, client *client.Client, scheduler *cronlib.CronSchduler) error { - switch job.Type { - case setting.WorkflowCronjob: - args := &service.WorkflowTaskArgs{ - WorkflowName: job.Name, - WorklowTaskCreator: setting.CronTaskCreator, - } - if job.WorkflowArgs != nil { - args.Description = job.WorkflowArgs.Description - args.ProductTmplName = job.WorkflowArgs.ProductTmplName - args.Target = job.WorkflowArgs.Target - args.Namespace = job.WorkflowArgs.Namespace - args.Tests = job.WorkflowArgs.Tests - args.DistributeEnabled = job.WorkflowArgs.DistributeEnabled - } - var cron string - if job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - if err := client.ScheduleCall(path.Join("workflow/workflowtask", job.WorkflowArgs.WorkflowName), args, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) - } - }) - if err != nil { - log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.WorkflowV4Cronjob: - if job.WorkflowV4Args == nil { - return fmt.Errorf("workflow args is nil") - } - var cron string - if job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - if err := client.ScheduleCall(fmt.Sprintf("workflow/v4/workflowtask/trigger?triggerName=%s", setting.CronTaskCreator), job.WorkflowV4Args, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) - } - }) - if err != nil { - log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.TestingCronjob: - args := &service.TestTaskArgs{ - TestName: job.Name, - ProductName: job.ProductName, - TestTaskCreator: setting.CronTaskCreator, - } - var cron string - if job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - if err := client.ScheduleCall("testing/testtask", args, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) +func registerCronjob(job *service.Cronjob, client *client.Client, scheduler *cronlib.CronSchduler, newScheduler newgoCron.Scheduler) error { + if job.JobType == setting.UnixStampSchedule { + switch job.Type { + case setting.ReleasePlanCronjob: + if job.ReleasePlanArgs == nil { + log.Errorf("ReleasePlanArgs is nil, name: %v, jobID: %v", job.Name, job.ID) + return nil } - }) - if err != nil { - log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.EnvAnalysisCronjob: - if job.EnvAnalysisArgs == nil { - return nil - } - var cron string - if job.JobType == "" || job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } + executeReleasePlanFunc := func() { + base := "release_plan/v1" + url := base + fmt.Sprintf("/%s/schedule_execute?jobID=%s", job.ReleasePlanArgs.ID, job.ID) + if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { + log.Errorf("[%s] RunScheduledTask err: %v", job.Name, err) + } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - base := "environment/environments/" - production := "false" - if job.EnvAnalysisArgs.Production { - production = "true" + log.Infof("schedule executed release plan, jobID: %v, schdule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, time.Unix(job.UnixStamp, 0), job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) } - url := base + fmt.Sprintf("%s/analysis?projectName=%s&triggerName=%s&userName=%s&production=%s", job.EnvAnalysisArgs.EnvName, job.EnvAnalysisArgs.ProductName, setting.CronTaskCreator, setting.CronTaskCreator, production) - - if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + // delete old schedule job first + tag := job.ID + newScheduler.RemoveByTags(tag) + + scheduleTime := time.Unix(job.UnixStamp, 0) + jobName := fmt.Sprintf("release_plan:%s:%d:%s", job.ReleasePlanArgs.Name, job.ReleasePlanArgs.Index, scheduleTime) + if scheduleTime.Before(time.Now()) { + if time.Now().Sub(scheduleTime) <= time.Second*30 { + // now - schedule time <= 30s + // maybe missed the schedule time because of cron service restart + // so start this immediately + log.Infof("found an release plan outdated <= 30s, start it immediately, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + _, err := newScheduler.NewJob(newgoCron.OneTimeJob(newgoCron.OneTimeJobStartImmediately()), newgoCron.NewTask(executeReleasePlanFunc), newgoCron.WithTags(tag), newgoCron.WithName(jobName)) + if err != nil { + log.Errorf("Failed to create jobID: %s, jobName: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID, job.Name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) + return err + } + } else { + // now - schedule time > 30s + // schedule time is too old + log.Errorf("found an release plan outdate > 30s, drop it, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + return nil + } + } else { + // schedule time correct + _, err := newScheduler.NewJob(newgoCron.OneTimeJob(newgoCron.OneTimeJobStartDateTime(scheduleTime)), newgoCron.NewTask(executeReleasePlanFunc), newgoCron.WithTags(tag), newgoCron.WithName(jobName)) + if err != nil { + log.Errorf("Failed to create jobID: %s, jobName: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID, job.Name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) + return err + } } - }) - if err != nil { - log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.EnvSleepCronjob: - if job.EnvArgs == nil { + log.Infof("registering jobID: %s with name: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, job.Name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) return nil } - var cron string - if job.JobType == "" || job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - base := "environment/environments/" - production := "false" - if job.EnvArgs.Production { - production = "true" + } else { + switch job.Type { + case setting.WorkflowCronjob: + return nil + case setting.WorkflowV4Cronjob: + if job.WorkflowV4Args == nil { + return fmt.Errorf("workflow args is nil") } - - url := "" - if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, true) { - url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=enable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) - } else if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, false) { - url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=disable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) + var cron string + if job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) + } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + if err := client.ScheduleCall(fmt.Sprintf("workflow/v4/workflowtask/trigger?triggerName=%s", setting.CronTaskCreator), job.WorkflowV4Args, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + case setting.TestingCronjob: + args := &service.TestTaskArgs{ + TestName: job.Name, + ProductName: job.ProductName, + TestTaskCreator: setting.CronTaskCreator, + } + var cron string + if job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) + } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + if err := client.ScheduleCall("testing/testtask", args, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to generate job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + case setting.EnvAnalysisCronjob: + if job.EnvAnalysisArgs == nil { + return nil } - if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + var cron string + if job.JobType == "" || job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) } - }) - if err != nil { - log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) - return err - } - log.Infof("registering jobID: %s with cron: %s", job.ID, cron) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err - } - case setting.ReleasePlanCronjob: - var cron string - if job.JobType == "" || job.JobType == setting.CrontabCronjob { - cron = fmt.Sprintf("%s%s", "0 ", job.Cron) - } else { - cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) - } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + base := "environment/environments/" + production := "false" + if job.EnvAnalysisArgs.Production { + production = "true" + } - if job.ReleasePlanArgs == nil { - log.Errorf("ReleasePlanArgs is nil, name: %v, schedule: %v, jobID: %v", job.Name, cron, job.ID) - return nil - } - scheduleJob, err := cronlib.NewJobModel(cron, func() { - base := "release_plan/v1" - url := base + fmt.Sprintf("/%s/schedule_execute", job.ReleasePlanArgs.ID) - if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { - log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) - } + url := base + fmt.Sprintf("%s/analysis?projectName=%s&triggerName=%s&userName=%s&production=%s", job.EnvAnalysisArgs.EnvName, job.EnvAnalysisArgs.ProductName, setting.CronTaskCreator, setting.CronTaskCreator, production) - scheduler.StopService(job.ID) + if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) + return err + } - log.Infof("schedule executed release plan, jobID: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID, job.Cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - }) - if err != nil { - log.Errorf("Failed to create jobID: %s, jobName: %v, cron: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID, job.Name, cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) - return err - } + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + case setting.EnvSleepCronjob: + if job.EnvArgs == nil { + return nil + } + var cron string + if job.JobType == "" || job.JobType == setting.CrontabCronjob { + cron = fmt.Sprintf("%s%s", "0 ", job.Cron) + } else { + cron, _ = convertCronString(job.JobType, job.Time, job.Frequency, job.Number) + } + scheduleJob, err := cronlib.NewJobModel(cron, func() { + base := "environment/environments/" + production := "false" + if job.EnvArgs.Production { + production = "true" + } + + url := "" + if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, true) { + url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=enable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) + } else if job.EnvArgs.Name == util.GetEnvSleepCronName(job.EnvArgs.ProductName, job.EnvArgs.EnvName, false) { + url = base + fmt.Sprintf("%s/sleep?projectName=%s&action=disable&production=%s", job.EnvArgs.EnvName, job.EnvArgs.ProductName, production) + } + + if err := client.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { + log.Errorf("[%s]RunScheduledTask err: %v", job.Name, err) + } + }) + if err != nil { + log.Errorf("Failed to create job of ID: %s, the error is: %v", job.ID, err) + return err + } - log.Infof("registering jobID: %s with name: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID, job.Name, cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - err = scheduler.UpdateJobModel(job.ID, scheduleJob) - if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) - return err + log.Infof("registering jobID: %s with cron: %s", job.ID, cron) + err = scheduler.UpdateJobModel(job.ID, scheduleJob) + if err != nil { + log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + return err + } + default: + fmt.Printf("Not supported type of service: %s\n", job.Type) + return errors.New("not supported service type") } - default: - fmt.Printf("Not supported type of service: %s\n", job.Type) - return errors.New("not supported service type") } return nil } @@ -640,32 +659,39 @@ func (h *CronjobHandler) registerEnvSleepJob(name, schedule string, job *service return nil } -func (h *CronjobHandler) registerReleasePlanJob(name, schedule string, job *service.Schedule) error { +func (h *CronjobHandler) registerReleasePlanJob(name string, job *service.Schedule) error { if job.ReleasePlanArgs == nil { - log.Errorf("ReleasePlanArgs is nil, name: %v, schedule: %v, jobID: %v", name, schedule, job.ID.Hex()) + log.Errorf("ReleasePlanArgs is nil, name: %v, jobID: %v", name, job.ID.Hex()) return nil } - scheduleJob, err := cronlib.NewJobModel(schedule, func() { + + executeReleasePlanFunc := func() { base := "release_plan/v1" - url := base + fmt.Sprintf("/%s/schedule_execute", job.ReleasePlanArgs.ID) + url := base + fmt.Sprintf("/%s/schedule_execute?jobID=%s", job.ReleasePlanArgs.ID, job.ID.Hex()) if err := h.aslanCli.ScheduleCall(url, nil, log.SugaredLogger()); err != nil { log.Errorf("[%s]RunScheduledTask err: %v", name, err) } - h.Scheduler.StopService(job.ID.Hex()) + log.Infof("schedule executed release plan, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), time.Unix(job.UnixStamp, 0), job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + } - log.Infof("schedule executed release plan, jobID: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), job.Cron, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - }) - if err != nil { - log.Errorf("Failed to create jobID: %s, jobName: %v, cron: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID.Hex(), name, schedule, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) - return err + // delete old schedule job first + tag := job.ID.Hex() + h.NewScheduler.RemoveByTags(tag) + + scheduleTime := time.Unix(job.UnixStamp, 0) + if scheduleTime.Before(time.Now()) { + log.Errorf("release plan schedule time is before now, jobID: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) + return nil } - log.Infof("registering jobID: %s with name: %v, cron: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), name, schedule, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) - err = h.Scheduler.UpdateJobModel(job.ID.Hex(), scheduleJob) + jobName := fmt.Sprintf("release_plan:%s:%d:%s", job.ReleasePlanArgs.Name, job.ReleasePlanArgs.Index, scheduleTime) + _, err := h.NewScheduler.NewJob(newgoCron.OneTimeJob(newgoCron.OneTimeJobStartDateTime(scheduleTime)), newgoCron.NewTask(executeReleasePlanFunc), newgoCron.WithTags(tag), newgoCron.WithName(jobName)) if err != nil { - log.Errorf("Failed to register job of ID: %s to scheduler, the error is: %v", job.ID, err) + log.Errorf("Failed to create jobID: %s, jobName: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v, error: %v", job.ID.Hex(), name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name, err) return err } + + log.Infof("registering jobID: %s with name: %v, schedule time: %v; release plan ID: %v, index: %v, name: %v", job.ID.Hex(), name, scheduleTime, job.ReleasePlanArgs.ID, job.ReleasePlanArgs.Index, job.ReleasePlanArgs.Name) return nil } diff --git a/pkg/microservice/cron/core/service/scheduler/schedule_env.go b/pkg/microservice/cron/core/service/scheduler/schedule_env.go index 9075f3be9c..506cb558ee 100644 --- a/pkg/microservice/cron/core/service/scheduler/schedule_env.go +++ b/pkg/microservice/cron/core/service/scheduler/schedule_env.go @@ -91,7 +91,7 @@ func (c *CronClient) UpsertEnvServiceScheduler(log *zap.SugaredLogger) { } } c.lastSchedulersRWMutex.Unlock() - log.Infof("[vm] [%s] deleted service scheduler..", key) + // log.Infof("[vm] [%s] deleted service scheduler..", key) continue } @@ -136,7 +136,7 @@ func (c *CronClient) UpsertEnvServiceScheduler(log *zap.SugaredLogger) { c.SchedulerController[key] = c.Schedulers[key].Start() c.SchedulerControllerRWMutex.Unlock() - log.Infof("[vm] [%s] added service scheduler..", key) + // log.Infof("[vm] [%s] added service scheduler..", key) } } break diff --git a/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go b/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go index 3ff80f79bb..5ee5f75a4c 100644 --- a/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go +++ b/pkg/microservice/cron/core/service/scheduler/schedule_env_update.go @@ -56,7 +56,7 @@ func (c *CronClient) UpsertEnvValueSyncScheduler(log *zap.SugaredLogger) { log.Infof("start init env values sync scheduler... env count: %v", len(envs)) for _, env := range envs { - log.Infof("schedule_env_update handle single helm env: %s/%s", env.ProductName, env.EnvName) + // log.Debugf("schedule_env_update handle single helm env: %s/%s", env.ProductName, env.EnvName) envObj, err := c.AslanCli.GetEnvService(env.ProductName, env.EnvName, log) if err != nil { log.Errorf("failed to get env data, productName:%s envName:%s err:%v", env.ProductName, env.EnvName, err) diff --git a/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go b/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go index 2dc906852a..d83c76b43d 100644 --- a/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go +++ b/pkg/microservice/cron/core/service/scheduler/schedule_workflow.go @@ -93,66 +93,6 @@ func (c *CronClient) UpsertWorkflowScheduler(log *zap.SugaredLogger) { c.SchedulerControllerRWMutex.Unlock() } - pipelines, err := c.AslanCli.ListPipelines(log) - if err != nil { - log.Error(err) - return - } - - log.Info("start init pipeline scheduler..") - for _, pipeline := range pipelines { - key := "pipeline-" + pipeline.Name - taskMap[key] = true - - c.enabledMapRWMutex.Lock() - c.lastSchedulersRWMutex.Lock() - if _, ok := c.lastSchedulers[key]; ok && reflect.DeepEqual(pipeline.Schedules.Items, c.lastSchedulers[key]) { - // 增加判断:enabled的值未被更新时才能跳过 - if enabled, ok := c.enabledMap[key]; ok && enabled == pipeline.Schedules.Enabled { - c.lastSchedulersRWMutex.Unlock() - c.enabledMapRWMutex.Unlock() - continue - } - } - c.enabledMap[key] = pipeline.Schedules.Enabled - c.lastSchedulers[key] = pipeline.Schedules.Items - c.lastSchedulersRWMutex.Unlock() - c.enabledMapRWMutex.Unlock() - - newScheduler := gocron.NewScheduler() - for _, schedule := range pipeline.Schedules.Items { - if schedule != nil { - if err := schedule.Validate(); err != nil { - log.Errorf("[%s] invalid schedule: %v", key, err) - continue - } - BuildScheduledPipelineJob(newScheduler, schedule).Do(c.RunScheduledPipelineTask, pipeline, schedule.TaskArgs, log) - } - } - // 所有scheduler总开关 - if !pipeline.Schedules.Enabled { - newScheduler.Clear() - } - - c.SchedulersRWMutex.Lock() - c.Schedulers[key] = newScheduler - c.SchedulersRWMutex.Unlock() - - log.Infof("[%s] building schedulers..", key) - // 停掉旧的scheduler - c.SchedulerControllerRWMutex.Lock() - sc, ok := c.SchedulerController[key] - c.SchedulerControllerRWMutex.Unlock() - if ok { - sc <- true - } - - log.Infof("[%s]lens of scheduler: %d", key, c.Schedulers[key].Len()) - c.SchedulerControllerRWMutex.Lock() - c.SchedulerController[key] = c.Schedulers[key].Start() - c.SchedulerControllerRWMutex.Unlock() - } - ScheduleNames := sets.NewString( CleanJobScheduler, UpsertWorkflowScheduler, UpsertTestScheduler, InitStatScheduler, InitOperationStatScheduler, diff --git a/pkg/microservice/cron/core/service/scheduler/scheduler.go b/pkg/microservice/cron/core/service/scheduler/scheduler.go index 1ceff56e50..c063d912e3 100644 --- a/pkg/microservice/cron/core/service/scheduler/scheduler.go +++ b/pkg/microservice/cron/core/service/scheduler/scheduler.go @@ -23,12 +23,11 @@ import ( "sync" "time" + newgoCron "github.com/go-co-op/gocron/v2" "github.com/jasonlvhit/gocron" "github.com/rfyiamcool/cronlib" "go.uber.org/zap" - newgoCron "github.com/go-co-op/gocron" - configbase "github.com/koderover/zadig/v2/pkg/config" "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" "github.com/koderover/zadig/v2/pkg/microservice/cron/core/service" @@ -66,13 +65,17 @@ type CronClient struct { } type CronV3Client struct { - Scheduler *newgoCron.Scheduler + Scheduler newgoCron.Scheduler AslanCli *aslan.Client } func NewCronV3() *CronV3Client { + scheduler, err := newgoCron.NewScheduler() + if err != nil { + log.Fatalf("failed to create scheduler: %v", err) + } return &CronV3Client{ - Scheduler: newgoCron.NewScheduler(time.Local), + Scheduler: scheduler, AslanCli: aslan.New(configbase.AslanServiceAddress()), } } @@ -80,7 +83,7 @@ func NewCronV3() *CronV3Client { func (c *CronV3Client) Start() { var lastConfig *aslan.CleanConfig - c.Scheduler.Every(5).Seconds().Do(func() { + c.Scheduler.NewJob(newgoCron.DurationJob(5*time.Second), newgoCron.NewTask(func() { // get the docker clean config config, err := c.AslanCli.GetDockerCleanConfig() if err != nil { @@ -92,26 +95,26 @@ func (c *CronV3Client) Start() { lastConfig = config log.Infof("config changed to %v", config) if config.CronEnabled { - c.Scheduler.RemoveByTag(string(types.CleanDockerTag)) - _, err = c.Scheduler.Cron(config.Cron).Tag(string(types.CleanDockerTag)).Do(func() { + c.Scheduler.RemoveByTags(string(types.CleanDockerTag)) + + _, err := c.Scheduler.NewJob(newgoCron.CronJob(config.Cron, false), newgoCron.NewTask(func() { log.Infof("trigger aslan docker clean,reg: %v", config.Cron) // call docker clean if err := c.AslanCli.DockerClean(); err != nil { log.Errorf("fail to clean docker cache , err:%s", err) } - }) + })) if err != nil { log.Errorf("fail to add docker_cache clean cron job:reg: %v,err:%s", config.Cron, err) } } else { log.Infof("remove docker_cache clean job , job tag: %v", types.CleanDockerTag) - c.Scheduler.RemoveByTag(string(types.CleanDockerTag)) - + c.Scheduler.RemoveByTags(string(types.CleanDockerTag)) } } - }) + })) - c.Scheduler.StartAsync() + c.Scheduler.Start() } const ( @@ -146,14 +149,19 @@ const ( ) // NewCronClient ... -// 服务初始化 +// 注意初始化失败会panic func NewCronClient() *CronClient { aslanCli := client.NewAslanClient(fmt.Sprintf("%s/api", configbase.AslanServiceAddress())) cronjobScheduler := cronlib.New() cronjobScheduler.Start() + newgoCronSchedule, err := newgoCron.NewScheduler() + if err != nil { + log.Fatalf("failed to create scheduler: %v", err) + } + newgoCronSchedule.Start() - cronjobHandler := NewCronjobHandler(aslanCli, cronjobScheduler) + cronjobHandler := NewCronjobHandler(aslanCli, cronjobScheduler, newgoCronSchedule) go func() { for { diff --git a/pkg/microservice/cron/core/service/types.go b/pkg/microservice/cron/core/service/types.go index 3795d62dfe..591ba7e37e 100644 --- a/pkg/microservice/cron/core/service/types.go +++ b/pkg/microservice/cron/core/service/types.go @@ -37,6 +37,8 @@ const ( TimingSchedule ScheduleType = "timing" // GapSchedule 间隔循环 GapSchedule ScheduleType = "gap" + // UnixstampSchedule 时间戳定时 + UnixstampSchedule ScheduleType = "unix_stamp" ) type PipelineResource struct { @@ -80,6 +82,7 @@ type PipelineSpec struct { type Schedule struct { ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"` Number uint64 `bson:"number" json:"number"` + UnixStamp int64 `bson:"unix_stamp" json:"unix_stamp"` Frequency string `bson:"frequency" json:"frequency"` Time string `bson:"time" json:"time"` MaxFailures int `bson:"max_failures,omitempty" json:"max_failures,omitempty"` diff --git a/pkg/setting/consts.go b/pkg/setting/consts.go index 8c41fe8401..bc1b3ebfb8 100644 --- a/pkg/setting/consts.go +++ b/pkg/setting/consts.go @@ -560,6 +560,7 @@ const ( FixedDayTimeCronjob = "timing" FixedGapCronjob = "gap" CrontabCronjob = "crontab" + UnixStampSchedule = "unix_stamp" // 定时器的所属job类型 WorkflowCronjob = "workflow" diff --git a/pkg/tool/gitee/repositories.go b/pkg/tool/gitee/repositories.go index 07655fbe99..1f89cb185d 100644 --- a/pkg/tool/gitee/repositories.go +++ b/pkg/tool/gitee/repositories.go @@ -32,6 +32,7 @@ import ( type Project struct { ID int `json:"id"` Name string `json:"name"` + Path string `json:"path"` DefaultBranch string `json:"default_branch,omitempty"` Namespace *NamespaceInfo `json:"namespace,omitempty"` }