From 3ba1cb2565259dc26cf9d38ea50f7fe757fe4174 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=87=8C=E7=83=A8?= Date: Tue, 1 Mar 2022 17:43:41 +0800 Subject: [PATCH] update cronjob --- core/api/v1/cron_job_api.go | 114 +++++++++++----------- core/api/v1/request/cron_job_request.go | 1 + core/api/v1/response/cron_job_response.go | 12 +++ core/dao/cron_job_dao.go | 25 ++--- core/dao/index.go | 18 ---- core/entity/sys_cron_job.go | 13 ++- core/job/index.go | 2 +- core/router/cron_job_router.go | 8 +- pkg/common/{util.go => index.go} | 24 +++++ pkg/common/log.go | 4 + pkg/cronjob/index.go | 41 ++++---- pkg/util/rest_util.go | 12 +++ pkg/util/struct_util.go | 27 +++++ 13 files changed, 186 insertions(+), 115 deletions(-) create mode 100644 core/api/v1/response/cron_job_response.go rename pkg/common/{util.go => index.go} (89%) create mode 100644 pkg/util/rest_util.go create mode 100644 pkg/util/struct_util.go diff --git a/core/api/v1/cron_job_api.go b/core/api/v1/cron_job_api.go index daadc05..9089478 100644 --- a/core/api/v1/cron_job_api.go +++ b/core/api/v1/cron_job_api.go @@ -2,14 +2,14 @@ package v1 import ( "cutego/core/api/v1/request" + "cutego/core/api/v1/response" "cutego/core/entity" "cutego/core/service" - "cutego/pkg/cache" - "cutego/pkg/page" + "cutego/pkg/common" + "cutego/pkg/cronjob" "cutego/pkg/resp" + "cutego/pkg/util" "github.com/gin-gonic/gin" - "strconv" - "strings" ) type CronJobApi struct { @@ -24,27 +24,30 @@ func (a CronJobApi) List(c *gin.Context) { return } find, i := a.cronJobService.FindPage(query) - resp.OK(c, page.Page{ - List: find, - Total: i, - Size: query.PageSize, - }) + + var result []interface{} + for _, item := range find { + // 领域对象转换 + result = append(result, util.DeepCopy(item, &response.CronJobResponse{})) + } + resp.OK(c, util.NewPage(result, i, query.PageSize)) +} + +// GetById 根据id获取任务详情 +func (a CronJobApi) GetById(c *gin.Context) { + jobId := c.Param("jobId") + info := a.cronJobService.GetInfo(common.StringToInt64(jobId)) + resp.OK(c, util.DeepCopy(info, &response.CronJobResponse{})) } // Edit 修改定时任务 func (a CronJobApi) Edit(c *gin.Context) { - dictType := entity.SysDictType{} - if c.Bind(&dictType) != nil { + record := entity.SysCronJob{} + if c.Bind(&record) != nil { resp.ParamError(c) return } - //检验定时任务是否存在 - if a.dictTypeService.CheckDictTypeUnique(dictType) { - resp.Error(c, "修改字典'"+dictType.DictName+"'失败, 定时任务已存在") - return - } - //修改数据 - if a.dictTypeService.Edit(dictType) { + if a.cronJobService.Edit(record) > 0 { resp.OK(c) } else { resp.Error(c) @@ -53,54 +56,49 @@ func (a CronJobApi) Edit(c *gin.Context) { // Add 新增定时任务 func (a CronJobApi) Add(c *gin.Context) { - dictType := entity.SysDictType{} - if c.Bind(&dictType) != nil { + dto := entity.SysCronJob{} + if c.Bind(&dto) != nil { resp.ParamError(c) return } - //检验定时任务是否存在 - if a.dictTypeService.CheckDictTypeUnique(dictType) { - resp.Error(c, "新增字典'"+dictType.DictName+"'失败, 定时任务已存在") + record := a.cronJobService.GetInfoByAlias(dto.FuncAlias) + if record.JobId > 0 { + resp.Error(c, "任务已存在!") + } else { + a.cronJobService.Save(dto) + cronjob.AppendCronFunc(dto.JobCron, dto.FuncAlias, dto.Status) + resp.OK(c) + } +} + +// Remove 删除定时任务 +func (a CronJobApi) Remove(c *gin.Context) { + jobId := common.StringToInt64(c.Param("jobId")) + funcAlias := c.Param("funcAlias") + + if a.cronJobService.GetInfo(jobId).Status == "1" { + resp.Error(c, "任务运行中, 无法删除!") return } - //新增定时任务 - if a.dictTypeService.Save(dictType) { - resp.OK(c) - } else { - resp.Error(c) - } + + cronjob.RemoveCronFunc(funcAlias) + a.cronJobService.Remove([]int64{jobId}) + resp.OK(c) } -// Remove 批量删除定时任务 -func (a CronJobApi) Remove(c *gin.Context) { - param := c.Param("dictId") - split := strings.Split(param, ",") - ids := make([]int64, 0) - types := make([]string, 0) - for _, s := range split { - parseInt, _ := strconv.ParseInt(s, 10, 64) - ids = append(ids, parseInt) +// ChangeStatus 改变任务状态 +func (a CronJobApi) ChangeStatus(c *gin.Context) { + record := entity.SysCronJob{} + if c.Bind(&record) != nil { + resp.ParamError(c) + return } - //校验定时任务是否使用 - for _, id := range ids { - dictType := a.dictTypeService.GetById(id) - if len(a.dictDataService.FindByDictType(dictType.DictType)) > 0 { - resp.Error(c, dictType.DictName+"已分配,不能删除") - return + if a.cronJobService.Edit(record) > 0 { + if record.Status == "1" { + cronjob.StartCronFunc(record.FuncAlias) + } else { + cronjob.StopCronFunc(record.FuncAlias) } - types = append(types, dictType.DictType) - } - //批量删除 - if a.dictTypeService.Remove(ids) { - //从缓存中删除数据 - cache.RemoveList(types) - resp.OK(c) - } else { - resp.Error(c) } -} - -// 改变任务状态 -func (a CronJobApi) ChangeStatus(context *gin.Context) { - + resp.OK(c) } diff --git a/core/api/v1/request/cron_job_request.go b/core/api/v1/request/cron_job_request.go index a507b07..e8194e7 100644 --- a/core/api/v1/request/cron_job_request.go +++ b/core/api/v1/request/cron_job_request.go @@ -5,4 +5,5 @@ import "cutego/pkg/base" type CronJobQuery struct { base.GlobalQuery JobName string `form:"jobName"` + Status string `form:"Status"` } diff --git a/core/api/v1/response/cron_job_response.go b/core/api/v1/response/cron_job_response.go new file mode 100644 index 0000000..cd8129d --- /dev/null +++ b/core/api/v1/response/cron_job_response.go @@ -0,0 +1,12 @@ +package response + +type CronJobResponse struct { + JobId int `json:"jobId"` // 任务主键 + JobName string `json:"jobName"` // 任务名称 + JobCron string `json:"jobCron"` // cron表达式 + FuncAlias string `json:"funcAlias"` // 方法别名(程序内注册的别名) + FuncParam string `json:"funcParam"` // 方法参数 + Status string `json:"status"` // 状态(1、Running 0、Stop) + Level int `json:"level"` // 任务级别(0、普通 1、一般 2、重要 3、强保) + Remark string `json:"remark"` // 备注 +} diff --git a/core/dao/cron_job_dao.go b/core/dao/cron_job_dao.go index 07d4d6b..2fc3bae 100644 --- a/core/dao/cron_job_dao.go +++ b/core/dao/cron_job_dao.go @@ -23,6 +23,9 @@ func (d CronJobDao) SelectPage(query request.CronJobQuery) ([]entity.SysCronJob, if gotool.StrUtils.HasNotEmpty(query.JobName) { session.And("job_name like concat('%', ?, '%')", query.JobName) } + if gotool.StrUtils.HasNotEmpty(query.Status) { + session.And("status = ?", query.Status) + } total, _ := page.GetTotal(session.Clone()) err := session.Limit(query.PageSize, page.StartSize(query.PageNum, query.PageSize)).Find(&configs) if err != nil { @@ -60,23 +63,23 @@ func (d CronJobDao) SelectById(id int64) *entity.SysCronJob { // Update 修改数据 func (d CronJobDao) Update(config entity.SysCronJob) int64 { - //session := SqlDB.NewSession() - //session.Begin() - //update, err := session.Where("job_id = ?", config.JobId).Update(&config) - //if err != nil { - // common.ErrorLog(err) - // session.Rollback() - // return 0 - //} - //session.Commit() - return CustomUpdateById("job_id", config.JobId, config) + session := SqlDB.NewSession() + session.Begin() + update, err := session.Where("job_id = ?", config.JobId).Update(&config) + if err != nil { + common.ErrorLog(err) + session.Rollback() + return 0 + } + session.Commit() + return update } // Remove 删除数据 func (d CronJobDao) Delete(list []int64) bool { session := SqlDB.NewSession() session.Begin() - _, err := session.In("config_id", list).Delete(&entity.SysCronJob{}) + _, err := session.In("job_id", list).Delete(&entity.SysCronJob{}) if err != nil { common.ErrorLog(err) session.Rollback() diff --git a/core/dao/index.go b/core/dao/index.go index 2017da6..b765ce4 100644 --- a/core/dao/index.go +++ b/core/dao/index.go @@ -113,21 +113,3 @@ func initConfig() { })) } } - -// 通用: 根据ID更新 -// @Param idColumnName 字段名称 -// @Param idColumnValue 字段值 -// @Param bean 更新的bean内容 -// @Return -func CustomUpdateById(idColumnName string, idColumnValue interface{}, bean interface{}) int64 { - session := SqlDB.NewSession() - session.Begin() - update, err := session.Where(idColumnName+" = ?", idColumnValue).Update(&bean) - if err != nil { - common.ErrorLog(err) - session.Rollback() - return 0 - } - session.Commit() - return update -} diff --git a/core/entity/sys_cron_job.go b/core/entity/sys_cron_job.go index 20b825e..cd21c7e 100644 --- a/core/entity/sys_cron_job.go +++ b/core/entity/sys_cron_job.go @@ -5,17 +5,16 @@ import ( ) type SysCronJob struct { - JobId int `xorm:"pk autoincr" json:"jobId"` // 任务主键 + JobId int64 `xorm:"pk autoincr" json:"jobId"` // 任务主键 JobName string `xorm:"varchar(100)" json:"jobName"` // 任务名称 - JobCron string `xorm:"varchar(255)" json:"job_cron"` // cron表达式 + JobCron string `xorm:"varchar(255)" json:"jobCron"` // cron表达式 FuncAlias string `xorm:"varchar(100)" json:"funcAlias"` // 方法别名(程序内注册的别名) - FuncParam string `xorm:"varchar(1)" json:"funcParam"` // 方法参数 Status string `xorm:"char(1)" json:"status"` // 状态(1、Running 0、Stop) Level int `xorm:"int(1)" json:"level"` // 任务级别(0、普通 1、一般 2、重要 3、强保) - CreateBy string `xorm:"varchar(64)" json:"createBy"` // 创建人 - CreateTime time.Time `xorm:"created" json:"createTime"` // 创建时间 - UpdateBy string `xorm:"varchar(64)" json:"updateBy"` // 更新人 - UpdateTime time.Time `json:"updateTime"` // 更新时间 + CreateBy string `xorm:"varchar(64)"` // 创建人 + CreateTime time.Time `xorm:"created"` // 创建时间 + UpdateBy string `xorm:"varchar(64)"` // 更新人 + UpdateTime time.Time `xorm:"datetime"` // 更新时间 Remark string `xorm:"varchar(500)" json:"remark"` // 备注 } diff --git a/core/job/index.go b/core/job/index.go index 36ca716..33f526e 100644 --- a/core/job/index.go +++ b/core/job/index.go @@ -17,5 +17,5 @@ func RegisterFunc(aliasName string, f func()) { // 注册方法 func init() { - //RegisterFunc("test1", TestJob) + RegisterFunc("test1", TestJob) } diff --git a/core/router/cron_job_router.go b/core/router/cron_job_router.go index 5d75a45..652ed99 100644 --- a/core/router/cron_job_router.go +++ b/core/router/cron_job_router.go @@ -8,17 +8,19 @@ import ( // 初始化定时任务路由 func initCronJobRouter(router *gin.RouterGroup) { v := new(v1.CronJobApi) - group := router.Group("/cronJob") + group := router.Group("/monitor/cronJob") { // 查询定时任务分页数据 group.GET("/list", v.List) + // 查询定时任务分页数据 + group.GET("/:jobId", v.GetById) // 修改定时任务 group.PUT("/modify", v.Edit) // 新增定时任务 group.POST("/create", v.Add) // 删除定时任务 - group.DELETE("/:jobId", v.Remove) + group.DELETE("/:jobId/:funcAlias", v.Remove) // 改变定时任务状态 - group.DELETE("/changeStatus", v.ChangeStatus) + group.PUT("/changeStatus", v.ChangeStatus) } } diff --git a/pkg/common/util.go b/pkg/common/index.go similarity index 89% rename from pkg/common/util.go rename to pkg/common/index.go index 3a62508..35a9ee5 100644 --- a/pkg/common/util.go +++ b/pkg/common/index.go @@ -13,6 +13,30 @@ import ( func IntToString(n int) string { return strconv.Itoa(n) } + +// StringToInt string转int +func StringToInt(s string) int { + i, err := strconv.Atoi(s) + if err != nil { + panic(err) + } + return i +} + +// StringToInt64 string转int64 +func StringToInt64(s string) int64 { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + panic(err) + } + return i +} + +// Int64ToString int64转string +func Int64ToString(n int64) string { + return strconv.FormatInt(n, 10) +} + func mapToBytes(data map[string]interface{}) []byte { bytes, _ := json.Marshal(data) return bytes diff --git a/pkg/common/log.go b/pkg/common/log.go index 29cc4f0..cf4ad40 100644 --- a/pkg/common/log.go +++ b/pkg/common/log.go @@ -6,6 +6,10 @@ func ErrorLog(v ...interface{}) { gotool.Logs.ErrorLog().Println(v) } +func ErrorLogf(format string, v ...interface{}) { + gotool.Logs.ErrorLog().Printf(format+"\n", v) +} + func FatalfLog(format string, v ...interface{}) { gotool.Logs.ErrorLog().Fatalf(format, v) } diff --git a/pkg/cronjob/index.go b/pkg/cronjob/index.go index 2ef0db2..984897f 100644 --- a/pkg/cronjob/index.go +++ b/pkg/cronjob/index.go @@ -6,7 +6,6 @@ import ( "cutego/core/service" "cutego/pkg/common" "github.com/robfig/cron" - "time" ) // Cron表达式参考 @@ -23,20 +22,41 @@ import ( // 定时任务: 别名与调度器的映射 var AliasCronMap = make(map[string]*cron.Cron) -// 停止任务, 不会停止已开始的任务 +// StopCronFunc 停止任务, 不会停止已开始的任务 func StopCronFunc(aliasName string) { common.InfoLogf("停止任务 %s ---> Start", aliasName) AliasCronMap[aliasName].Stop() common.InfoLogf("停止任务 %s ---> Finish", aliasName) } -// 开始任务 +// StartCronFunc 开始任务 func StartCronFunc(aliasName string) { common.InfoLogf("唤起任务 %s ---> Start", aliasName) AliasCronMap[aliasName].Start() common.InfoLogf("唤起任务 %s ---> Finish", aliasName) } +// RemoveCronFunc 移除任务 +func RemoveCronFunc(aliasName string) { + common.InfoLogf("移除任务 %s ---> Start", aliasName) + StopCronFunc(aliasName) + delete(AliasCronMap, aliasName) + common.InfoLogf("移除任务 %s ---> Finish", aliasName) +} + +// AppendCronFunc 新增任务 +func AppendCronFunc(jobCron string, aliasName string, status string) { + common.InfoLogf("新增任务 %s ---> Start", aliasName) + c := cron.New() + c.AddFunc(jobCron, job.AliasFuncMap[aliasName]) + if status == "1" { + c.Start() + common.InfoLogf("调度定时任务 --- %s ---> Success", aliasName) + } + AliasCronMap[aliasName] = c + common.InfoLogf("新增任务 %s ---> Finish", aliasName) +} + func init() { if len(job.AliasFuncMap) > 0 { //go test() @@ -49,22 +69,9 @@ func init() { break } for _, datum := range data { - c := cron.New() - c.AddFunc(datum.JobCron, job.AliasFuncMap[datum.FuncAlias]) - c.Start() - - AliasCronMap[datum.FuncAlias] = c - common.InfoLogf("调度定时任务 --- %s ---> Success", datum.JobName) + AppendCronFunc(datum.JobCron, datum.FuncAlias, datum.Status) } index += 1 } } } - -// 测试通过 -func test() { - time.Sleep(time.Second * 10) - StopCronFunc("test1") - time.Sleep(time.Second * 10) - StartCronFunc("test1") -} diff --git a/pkg/util/rest_util.go b/pkg/util/rest_util.go new file mode 100644 index 0000000..6189804 --- /dev/null +++ b/pkg/util/rest_util.go @@ -0,0 +1,12 @@ +package util + +import "cutego/pkg/page" + +func NewPage(list []interface{}, total int64, pageSize int) page.Page { + // 值对象, 不可变 + return page.Page{ + List: list, + Total: total, + Size: pageSize, + } +} diff --git a/pkg/util/struct_util.go b/pkg/util/struct_util.go new file mode 100644 index 0000000..580bf01 --- /dev/null +++ b/pkg/util/struct_util.go @@ -0,0 +1,27 @@ +package util + +import ( + "bytes" + "cutego/pkg/common" + "encoding/gob" +) + +// DeepCopy 深度拷贝对象 +// @Param src 原对象 +// @Param dst 目标对象 +// @Return 目标对象 +// @Usage util.DeepCopy(item, &response.CronJobPageResponse{}) +// +// @Author tianjun@odboy.cn +// @Date 2022-03-01 +func DeepCopy(src, dst interface{}) interface{} { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(src); err != nil { + return err + } + err := gob.NewDecoder(bytes.NewBuffer(buf.Bytes())).Decode(dst) + if err != nil { + common.ErrorLogf("src(%v)---> dst(%v), error", src, dst) + } + return dst +}