update cronjob

This commit is contained in:
骑着蜗牛追导弹 2022-03-01 17:43:41 +08:00
parent 093e44c642
commit 3ba1cb2565
13 changed files with 186 additions and 115 deletions

View File

@ -2,14 +2,14 @@ package v1
import ( import (
"cutego/core/api/v1/request" "cutego/core/api/v1/request"
"cutego/core/api/v1/response"
"cutego/core/entity" "cutego/core/entity"
"cutego/core/service" "cutego/core/service"
"cutego/pkg/cache" "cutego/pkg/common"
"cutego/pkg/page" "cutego/pkg/cronjob"
"cutego/pkg/resp" "cutego/pkg/resp"
"cutego/pkg/util"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"strconv"
"strings"
) )
type CronJobApi struct { type CronJobApi struct {
@ -24,27 +24,30 @@ func (a CronJobApi) List(c *gin.Context) {
return return
} }
find, i := a.cronJobService.FindPage(query) find, i := a.cronJobService.FindPage(query)
resp.OK(c, page.Page{
List: find, var result []interface{}
Total: i, for _, item := range find {
Size: query.PageSize, // 领域对象转换
}) result = append(result, util.DeepCopy(item, &response.CronJobResponse{}))
}
resp.OK(c, util.NewPage(result, i, query.PageSize))
}
// GetById 根据id获取任务详情
func (a CronJobApi) GetById(c *gin.Context) {
jobId := c.Param("jobId")
info := a.cronJobService.GetInfo(common.StringToInt64(jobId))
resp.OK(c, util.DeepCopy(info, &response.CronJobResponse{}))
} }
// Edit 修改定时任务 // Edit 修改定时任务
func (a CronJobApi) Edit(c *gin.Context) { func (a CronJobApi) Edit(c *gin.Context) {
dictType := entity.SysDictType{} record := entity.SysCronJob{}
if c.Bind(&dictType) != nil { if c.Bind(&record) != nil {
resp.ParamError(c) resp.ParamError(c)
return return
} }
//检验定时任务是否存在 if a.cronJobService.Edit(record) > 0 {
if a.dictTypeService.CheckDictTypeUnique(dictType) {
resp.Error(c, "修改字典'"+dictType.DictName+"'失败, 定时任务已存在")
return
}
//修改数据
if a.dictTypeService.Edit(dictType) {
resp.OK(c) resp.OK(c)
} else { } else {
resp.Error(c) resp.Error(c)
@ -53,54 +56,49 @@ func (a CronJobApi) Edit(c *gin.Context) {
// Add 新增定时任务 // Add 新增定时任务
func (a CronJobApi) Add(c *gin.Context) { func (a CronJobApi) Add(c *gin.Context) {
dictType := entity.SysDictType{} dto := entity.SysCronJob{}
if c.Bind(&dictType) != nil { if c.Bind(&dto) != nil {
resp.ParamError(c) resp.ParamError(c)
return return
} }
//检验定时任务是否存在 record := a.cronJobService.GetInfoByAlias(dto.FuncAlias)
if a.dictTypeService.CheckDictTypeUnique(dictType) { if record.JobId > 0 {
resp.Error(c, "新增字典'"+dictType.DictName+"'失败, 定时任务已存在") resp.Error(c, "任务已存在!")
} else {
a.cronJobService.Save(dto)
cronjob.AppendCronFunc(dto.JobCron, dto.FuncAlias, dto.Status)
resp.OK(c)
}
}
// Remove 删除定时任务
func (a CronJobApi) Remove(c *gin.Context) {
jobId := common.StringToInt64(c.Param("jobId"))
funcAlias := c.Param("funcAlias")
if a.cronJobService.GetInfo(jobId).Status == "1" {
resp.Error(c, "任务运行中, 无法删除!")
return return
} }
//新增定时任务
if a.dictTypeService.Save(dictType) { cronjob.RemoveCronFunc(funcAlias)
resp.OK(c) a.cronJobService.Remove([]int64{jobId})
} else { resp.OK(c)
resp.Error(c)
}
} }
// Remove 批量删除定时任务 // ChangeStatus 改变任务状态
func (a CronJobApi) Remove(c *gin.Context) { func (a CronJobApi) ChangeStatus(c *gin.Context) {
param := c.Param("dictId") record := entity.SysCronJob{}
split := strings.Split(param, ",") if c.Bind(&record) != nil {
ids := make([]int64, 0) resp.ParamError(c)
types := make([]string, 0) return
for _, s := range split {
parseInt, _ := strconv.ParseInt(s, 10, 64)
ids = append(ids, parseInt)
} }
//校验定时任务是否使用 if a.cronJobService.Edit(record) > 0 {
for _, id := range ids { if record.Status == "1" {
dictType := a.dictTypeService.GetById(id) cronjob.StartCronFunc(record.FuncAlias)
if len(a.dictDataService.FindByDictType(dictType.DictType)) > 0 { } else {
resp.Error(c, dictType.DictName+"已分配,不能删除") cronjob.StopCronFunc(record.FuncAlias)
return
} }
types = append(types, dictType.DictType)
}
//批量删除
if a.dictTypeService.Remove(ids) {
//从缓存中删除数据
cache.RemoveList(types)
resp.OK(c)
} else {
resp.Error(c)
} }
} resp.OK(c)
// 改变任务状态
func (a CronJobApi) ChangeStatus(context *gin.Context) {
} }

View File

@ -5,4 +5,5 @@ import "cutego/pkg/base"
type CronJobQuery struct { type CronJobQuery struct {
base.GlobalQuery base.GlobalQuery
JobName string `form:"jobName"` JobName string `form:"jobName"`
Status string `form:"Status"`
} }

View File

@ -0,0 +1,12 @@
package response
type CronJobResponse struct {
JobId int `json:"jobId"` // 任务主键
JobName string `json:"jobName"` // 任务名称
JobCron string `json:"jobCron"` // cron表达式
FuncAlias string `json:"funcAlias"` // 方法别名(程序内注册的别名)
FuncParam string `json:"funcParam"` // 方法参数
Status string `json:"status"` // 状态(1、Running 0、Stop)
Level int `json:"level"` // 任务级别(0、普通 1、一般 2、重要 3、强保)
Remark string `json:"remark"` // 备注
}

View File

@ -23,6 +23,9 @@ func (d CronJobDao) SelectPage(query request.CronJobQuery) ([]entity.SysCronJob,
if gotool.StrUtils.HasNotEmpty(query.JobName) { if gotool.StrUtils.HasNotEmpty(query.JobName) {
session.And("job_name like concat('%', ?, '%')", query.JobName) session.And("job_name like concat('%', ?, '%')", query.JobName)
} }
if gotool.StrUtils.HasNotEmpty(query.Status) {
session.And("status = ?", query.Status)
}
total, _ := page.GetTotal(session.Clone()) total, _ := page.GetTotal(session.Clone())
err := session.Limit(query.PageSize, page.StartSize(query.PageNum, query.PageSize)).Find(&configs) err := session.Limit(query.PageSize, page.StartSize(query.PageNum, query.PageSize)).Find(&configs)
if err != nil { if err != nil {
@ -60,23 +63,23 @@ func (d CronJobDao) SelectById(id int64) *entity.SysCronJob {
// Update 修改数据 // Update 修改数据
func (d CronJobDao) Update(config entity.SysCronJob) int64 { func (d CronJobDao) Update(config entity.SysCronJob) int64 {
//session := SqlDB.NewSession() session := SqlDB.NewSession()
//session.Begin() session.Begin()
//update, err := session.Where("job_id = ?", config.JobId).Update(&config) update, err := session.Where("job_id = ?", config.JobId).Update(&config)
//if err != nil { if err != nil {
// common.ErrorLog(err) common.ErrorLog(err)
// session.Rollback() session.Rollback()
// return 0 return 0
//} }
//session.Commit() session.Commit()
return CustomUpdateById("job_id", config.JobId, config) return update
} }
// Remove 删除数据 // Remove 删除数据
func (d CronJobDao) Delete(list []int64) bool { func (d CronJobDao) Delete(list []int64) bool {
session := SqlDB.NewSession() session := SqlDB.NewSession()
session.Begin() session.Begin()
_, err := session.In("config_id", list).Delete(&entity.SysCronJob{}) _, err := session.In("job_id", list).Delete(&entity.SysCronJob{})
if err != nil { if err != nil {
common.ErrorLog(err) common.ErrorLog(err)
session.Rollback() session.Rollback()

View File

@ -113,21 +113,3 @@ func initConfig() {
})) }))
} }
} }
// 通用: 根据ID更新
// @Param idColumnName 字段名称
// @Param idColumnValue 字段值
// @Param bean 更新的bean内容
// @Return
func CustomUpdateById(idColumnName string, idColumnValue interface{}, bean interface{}) int64 {
session := SqlDB.NewSession()
session.Begin()
update, err := session.Where(idColumnName+" = ?", idColumnValue).Update(&bean)
if err != nil {
common.ErrorLog(err)
session.Rollback()
return 0
}
session.Commit()
return update
}

View File

@ -5,17 +5,16 @@ import (
) )
type SysCronJob struct { type SysCronJob struct {
JobId int `xorm:"pk autoincr" json:"jobId"` // 任务主键 JobId int64 `xorm:"pk autoincr" json:"jobId"` // 任务主键
JobName string `xorm:"varchar(100)" json:"jobName"` // 任务名称 JobName string `xorm:"varchar(100)" json:"jobName"` // 任务名称
JobCron string `xorm:"varchar(255)" json:"job_cron"` // cron表达式 JobCron string `xorm:"varchar(255)" json:"jobCron"` // cron表达式
FuncAlias string `xorm:"varchar(100)" json:"funcAlias"` // 方法别名(程序内注册的别名) FuncAlias string `xorm:"varchar(100)" json:"funcAlias"` // 方法别名(程序内注册的别名)
FuncParam string `xorm:"varchar(1)" json:"funcParam"` // 方法参数
Status string `xorm:"char(1)" json:"status"` // 状态(1、Running 0、Stop) Status string `xorm:"char(1)" json:"status"` // 状态(1、Running 0、Stop)
Level int `xorm:"int(1)" json:"level"` // 任务级别(0、普通 1、一般 2、重要 3、强保) Level int `xorm:"int(1)" json:"level"` // 任务级别(0、普通 1、一般 2、重要 3、强保)
CreateBy string `xorm:"varchar(64)" json:"createBy"` // 创建人 CreateBy string `xorm:"varchar(64)"` // 创建人
CreateTime time.Time `xorm:"created" json:"createTime"` // 创建时间 CreateTime time.Time `xorm:"created"` // 创建时间
UpdateBy string `xorm:"varchar(64)" json:"updateBy"` // 更新人 UpdateBy string `xorm:"varchar(64)"` // 更新人
UpdateTime time.Time `json:"updateTime"` // 更新时间 UpdateTime time.Time `xorm:"datetime"` // 更新时间
Remark string `xorm:"varchar(500)" json:"remark"` // 备注 Remark string `xorm:"varchar(500)" json:"remark"` // 备注
} }

View File

@ -17,5 +17,5 @@ func RegisterFunc(aliasName string, f func()) {
// 注册方法 // 注册方法
func init() { func init() {
//RegisterFunc("test1", TestJob) RegisterFunc("test1", TestJob)
} }

View File

@ -8,17 +8,19 @@ import (
// 初始化定时任务路由 // 初始化定时任务路由
func initCronJobRouter(router *gin.RouterGroup) { func initCronJobRouter(router *gin.RouterGroup) {
v := new(v1.CronJobApi) v := new(v1.CronJobApi)
group := router.Group("/cronJob") group := router.Group("/monitor/cronJob")
{ {
// 查询定时任务分页数据 // 查询定时任务分页数据
group.GET("/list", v.List) group.GET("/list", v.List)
// 查询定时任务分页数据
group.GET("/:jobId", v.GetById)
// 修改定时任务 // 修改定时任务
group.PUT("/modify", v.Edit) group.PUT("/modify", v.Edit)
// 新增定时任务 // 新增定时任务
group.POST("/create", v.Add) group.POST("/create", v.Add)
// 删除定时任务 // 删除定时任务
group.DELETE("/:jobId", v.Remove) group.DELETE("/:jobId/:funcAlias", v.Remove)
// 改变定时任务状态 // 改变定时任务状态
group.DELETE("/changeStatus", v.ChangeStatus) group.PUT("/changeStatus", v.ChangeStatus)
} }
} }

View File

@ -13,6 +13,30 @@ import (
func IntToString(n int) string { func IntToString(n int) string {
return strconv.Itoa(n) return strconv.Itoa(n)
} }
// StringToInt string转int
func StringToInt(s string) int {
i, err := strconv.Atoi(s)
if err != nil {
panic(err)
}
return i
}
// StringToInt64 string转int64
func StringToInt64(s string) int64 {
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
panic(err)
}
return i
}
// Int64ToString int64转string
func Int64ToString(n int64) string {
return strconv.FormatInt(n, 10)
}
func mapToBytes(data map[string]interface{}) []byte { func mapToBytes(data map[string]interface{}) []byte {
bytes, _ := json.Marshal(data) bytes, _ := json.Marshal(data)
return bytes return bytes

View File

@ -6,6 +6,10 @@ func ErrorLog(v ...interface{}) {
gotool.Logs.ErrorLog().Println(v) gotool.Logs.ErrorLog().Println(v)
} }
func ErrorLogf(format string, v ...interface{}) {
gotool.Logs.ErrorLog().Printf(format+"\n", v)
}
func FatalfLog(format string, v ...interface{}) { func FatalfLog(format string, v ...interface{}) {
gotool.Logs.ErrorLog().Fatalf(format, v) gotool.Logs.ErrorLog().Fatalf(format, v)
} }

View File

@ -6,7 +6,6 @@ import (
"cutego/core/service" "cutego/core/service"
"cutego/pkg/common" "cutego/pkg/common"
"github.com/robfig/cron" "github.com/robfig/cron"
"time"
) )
// Cron表达式参考 // Cron表达式参考
@ -23,20 +22,41 @@ import (
// 定时任务: 别名与调度器的映射 // 定时任务: 别名与调度器的映射
var AliasCronMap = make(map[string]*cron.Cron) var AliasCronMap = make(map[string]*cron.Cron)
// 停止任务, 不会停止已开始的任务 // StopCronFunc 停止任务, 不会停止已开始的任务
func StopCronFunc(aliasName string) { func StopCronFunc(aliasName string) {
common.InfoLogf("停止任务 %s ---> Start", aliasName) common.InfoLogf("停止任务 %s ---> Start", aliasName)
AliasCronMap[aliasName].Stop() AliasCronMap[aliasName].Stop()
common.InfoLogf("停止任务 %s ---> Finish", aliasName) common.InfoLogf("停止任务 %s ---> Finish", aliasName)
} }
// 开始任务 // StartCronFunc 开始任务
func StartCronFunc(aliasName string) { func StartCronFunc(aliasName string) {
common.InfoLogf("唤起任务 %s ---> Start", aliasName) common.InfoLogf("唤起任务 %s ---> Start", aliasName)
AliasCronMap[aliasName].Start() AliasCronMap[aliasName].Start()
common.InfoLogf("唤起任务 %s ---> Finish", aliasName) common.InfoLogf("唤起任务 %s ---> Finish", aliasName)
} }
// RemoveCronFunc 移除任务
func RemoveCronFunc(aliasName string) {
common.InfoLogf("移除任务 %s ---> Start", aliasName)
StopCronFunc(aliasName)
delete(AliasCronMap, aliasName)
common.InfoLogf("移除任务 %s ---> Finish", aliasName)
}
// AppendCronFunc 新增任务
func AppendCronFunc(jobCron string, aliasName string, status string) {
common.InfoLogf("新增任务 %s ---> Start", aliasName)
c := cron.New()
c.AddFunc(jobCron, job.AliasFuncMap[aliasName])
if status == "1" {
c.Start()
common.InfoLogf("调度定时任务 --- %s ---> Success", aliasName)
}
AliasCronMap[aliasName] = c
common.InfoLogf("新增任务 %s ---> Finish", aliasName)
}
func init() { func init() {
if len(job.AliasFuncMap) > 0 { if len(job.AliasFuncMap) > 0 {
//go test() //go test()
@ -49,22 +69,9 @@ func init() {
break break
} }
for _, datum := range data { for _, datum := range data {
c := cron.New() AppendCronFunc(datum.JobCron, datum.FuncAlias, datum.Status)
c.AddFunc(datum.JobCron, job.AliasFuncMap[datum.FuncAlias])
c.Start()
AliasCronMap[datum.FuncAlias] = c
common.InfoLogf("调度定时任务 --- %s ---> Success", datum.JobName)
} }
index += 1 index += 1
} }
} }
} }
// 测试通过
func test() {
time.Sleep(time.Second * 10)
StopCronFunc("test1")
time.Sleep(time.Second * 10)
StartCronFunc("test1")
}

12
pkg/util/rest_util.go Normal file
View File

@ -0,0 +1,12 @@
package util
import "cutego/pkg/page"
func NewPage(list []interface{}, total int64, pageSize int) page.Page {
// 值对象, 不可变
return page.Page{
List: list,
Total: total,
Size: pageSize,
}
}

27
pkg/util/struct_util.go Normal file
View File

@ -0,0 +1,27 @@
package util
import (
"bytes"
"cutego/pkg/common"
"encoding/gob"
)
// DeepCopy 深度拷贝对象
// @Param src 原对象
// @Param dst 目标对象
// @Return 目标对象
// @Usage util.DeepCopy(item, &response.CronJobPageResponse{})
//
// @Author tianjun@odboy.cn
// @Date 2022-03-01
func DeepCopy(src, dst interface{}) interface{} {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(src); err != nil {
return err
}
err := gob.NewDecoder(bytes.NewBuffer(buf.Bytes())).Decode(dst)
if err != nil {
common.ErrorLogf("src(%v)---> dst(%v), error", src, dst)
}
return dst
}