网站建设流程和方法,找南昌网站开发公司电话,域名网站是什么,微网站免费制作go语言定时任务工具类#xff0c;支持crontab#xff08;精确到秒#xff09;和 timer 两种模式。
本文介绍了一个基于Go语言的定时任务工具类#xff0c;支持crontab#xff08;精确到秒#xff09;和timer两种模式。工具类使用github.com/robfig/cron/v3包实现#xf…go语言定时任务工具类支持crontab精确到秒和 timer 两种模式。本文介绍了一个基于Go语言的定时任务工具类支持crontab精确到秒和timer两种模式。工具类使用github.com/robfig/cron/v3包实现主要功能包括提供NamedCronJobTask接口定义定时任务支持通过cron表达式或时间间隔两种调度方式实现了任务添加AddTask、更新UpdateTask等功能内部使用map管理任务支持同名任务替换对cron和ticker两种调度方式进行了封装自动处理panic恢复提供了任务日志记录功能该工具类设计灵活可以方便地集成到各种需要定时任务调度的Go应用中。引入包github.com/robfig/cron/v3我的版本 v3.0.1直接见代码1、cron_job_conf.gopackagejobimport(xxx/utils// 自定义工具包包内容见下fmtlogsynctimegithub.com/google/uuidgithub.com/robfig/cron/v3)// 定时任务接口typeNamedCronJobTaskinterface{// 任务名称Name()string// returns a cron expression and/or a time interval.// Scheduling priority:// - If cronSpec is non-empty, it is used (interval is ignored).// - Else, if interval 0, a ticker-based scheduler runs every interval.// - Otherwise, the task is invalid.// Example:// return */5 * * * * *, 0 // cron every 5 seconds// return , 2*time.Hour // ticker every 2 hours// return 0 0 * * *, 10*time.Minute // cron used (every day at 00:00)SpecOrInterval()(cronSpecstring,interval time.Duration)// 执行方法Execute()}// taskType 任务类型typetaskTypeintconst(taskTypeCron taskTypeiotataskTypeTicker)// cronTask 内部任务元数据typecronTaskstruct{specstring// cron 表达式 或 [TICKER:...] 占位符仅用于日志fnfunc()id cron.EntryID// cron 用ticker*time.Ticker// ticker 用stopChanchanstruct{}// 用于安全停止 ticker goroutinetaskType taskType}var(c*cron.Cron cronTaskMapmake(map[string]*cronTask)cronTaskMapMutexsync.RWMutex{})// init 初始化 cron 调度器带秒支持funcinit(){ccron.New(cron.WithSeconds())c.Start()log.Println(cron scheduler started)}// AddTask 添加定时任务。如果之前已存在同名任务则会被覆盖。funcAddTask(named NamedCronJobTask)(string,error){ifnamednil{return,fmt.Errorf(namedCronJobTask is nil)}name:named.Name()ifname{nameanonymous:uuid.New().String()}returnAddTaskWithName(name,named.SpecOrInterval,named.Execute)}// AddTaskWithName 添加命名任务如果之前已存在同名任务则会被覆盖。// specOrInterval 返回 (cronSpec, interval)规则// - 若 cronSpec ! → 使用 cron忽略 interval// - 否则若 interval 0 → 使用 ticker// - 否则返回错误funcAddTaskWithName(namestring,specOrIntervalfunc()(specstring,interval time.Duration),fnfunc(),)(string,error){iffnnil{returnname,fmt.Errorf(task function cannot be nil)}ifspecOrIntervalnil{returnname,fmt.Errorf(specOrInterval function cannot be nil)}cronSpec,interval:specOrInterval()ifname{nameanonymous:uuid.New().String()}ifcronSpecinterval0{returnname,fmt.Errorf(invalid scheduling policy for task %q: must return non-empty cronSpec or interval 0,name,)}wrappedFn:func(){deferutils.RecoverPanic()fn()}cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()// 替换已存在的同名任务ifold,ok:cronTaskMap[name];ok{removeTask(old)log.Printf(Replaced existing task: %s,name)}ifcronSpec!{// 使用 cronentryID,err:c.AddFunc(cronSpec,wrappedFn)iferr!nil{returnname,fmt.Errorf(invalid cron spec %q: %w,cronSpec,err)}cronTaskMap[name]cronTask{spec:cronSpec,fn:fn,id:entryID,taskType:taskTypeCron,}log.Printf(Added cron task: %s (spec: %s, ID: %d),name,cronSpec,int64(entryID))}elseifinterval0{// 使用 tickerstopChan:make(chanstruct{})ticker:time.NewTicker(interval)gofunc(){deferfunc(){ifr:recover();r!nil{log.Printf(Recovered panic in ticker task %s: %v,name,r)}ticker.Stop()}()for{select{case-ticker.C:wrappedFn()case-stopChan:return}}}()cronTaskMap[name]cronTask{spec:fmt.Sprintf([TICKER:%v],interval),fn:fn,ticker:ticker,stopChan:stopChan,taskType:taskTypeTicker,}log.Printf(Added ticker task: %s (every %v),name,interval)}returnname,nil}// AddTaskWithName 添加匿名任务通过函数动态获取调度策略。// specOrInterval 返回 (cronSpec, interval)规则// - 若 cronSpec ! → 使用 cron忽略 interval// - 否则若 interval 0 → 使用 ticker// - 否则返回错误funcAddTaskWithoutName(specOrIntervalfunc()(specstring,interval time.Duration),fnfunc())(string,error){name:anonymous:uuid.New().String()returnAddTaskWithName(name,specOrInterval,fn)}// UpdateTask 更新任务调度策略支持在 cron 和 ticker 之间切换。// newSpecOrInterval 应返回新的 (cronSpec, interval)。// 规则// - 若 cronSpec ! → 使用 cron忽略 interval// - 否则若 interval 0 → 使用 ticker// - 否则返回错误//// 要求任务必须已存在。funcUpdateTask(namestring,newSpecOrIntervalfunc()(newSpecstring,newInterval time.Duration))error{ifname{returnfmt.Errorf(task name cannot be empty)}ifnewSpecOrIntervalnil{returnfmt.Errorf(newSpecOrInterval function cannot be nil)}newSpec,newInterval:newSpecOrInterval()ifnewSpecnewInterval0{returnfmt.Errorf(newSpecOrInterval returned invalid spec (%q) and interval (%v): at least one must be valid,newSpec,newInterval)}cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()oldTask,exists:cronTaskMap[name]if!exists{returnfmt.Errorf(task not found: %s,name)}ifoldTask.fnnil{returnfmt.Errorf(task function is nil for %s,name)}// 安全停止旧任务removeTask(oldTask)ifnewSpec!{// 切换为 cronwrappedFn:func(){deferutils.RecoverPanic()oldTask.fn()}entryID,e:c.AddFunc(newSpec,wrappedFn)ife!nil{delete(cronTaskMap,name)// 防止残留无效条目returnfmt.Errorf(failed to parse new cron spec %q: %w,newSpec,e)}cronTaskMap[name]cronTask{spec:newSpec,fn:oldTask.fn,id:entryID,taskType:taskTypeCron,}log.Printf(Updated task %s to cron (spec: %s),name,newSpec)}elseifnewInterval0{// 切换为 tickerstopChan:make(chanstruct{})ticker:time.NewTicker(newInterval)gofunc(){deferfunc(){ifr:recover();r!nil{log.Printf(Recovered panic in updated ticker task %s: %v,name,r)}ticker.Stop()}()for{select{case-ticker.C:func(){deferutils.RecoverPanic()oldTask.fn()}()case-stopChan:return}}}()cronTaskMap[name]cronTask{spec:fmt.Sprintf([TICKER:%v],newInterval),fn:oldTask.fn,ticker:ticker,stopChan:stopChan,taskType:taskTypeTicker,}log.Printf(Updated task %s to ticker (interval: %v),name,newInterval)}returnnil}// RemoveTaskByName 删除任务幂等任务不存在也返回 truefuncRemoveTaskByName(namestring)bool{ifname{returntrue}cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()iftask,ok:cronTaskMap[name];ok{removeTask(task)delete(cronTaskMap,name)log.Printf(Removed task: %s,name)}returntrue}// Exists 检查任务是否存在funcExists(namestring)bool{cronTaskMapMutex.RLock()defercronTaskMapMutex.RUnlock()_,ok:cronTaskMap[name]returnok}// GetAllTaskNames 获取所有任务名funcGetAllTaskNames()[]string{cronTaskMapMutex.RLock()defercronTaskMapMutex.RUnlock()names:make([]string,0,len(cronTaskMap))forname:rangecronTaskMap{namesappend(names,name)}returnnames}// Stop 停止整个调度器funcStop(){log.Println(Stopping cron scheduler...)c.Stop()// 停止 cron 调度器不再触发新任务cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()// 复用 removeTask 清理所有任务资源for_,task:rangecronTaskMap{removeTask(task)}log.Println(All scheduled tasks stopped)}// 内部使用安全移除一个任务funcremoveTask(task*cronTask){switchtask.taskType{casetaskTypeCron:c.Remove(task.id)casetaskTypeTicker:iftask.ticker!nil{task.ticker.Stop()}iftask.stopChan!nil{close(task.stopChan)// 唯一关闭点task.stopChannil// 防止重复 close虽已加锁但更安全}}}2、panic_tookit.gopackageutilsimport(logruntime/debug)// 捕获 PanicfuncRecoverPanic(){ifr:recover();r!nil{log.Printf(panic recovered: %v\n%s,r,debug.Stack())// 后期这里还可以 SendMetrics、SendAlert、Sentry.Capture等指标}}// 在捕获 Panic 下运行某个方法funcRunWithRecoverPanic[T any](data T,fnfunc(T)){deferRecoverPanic()// 增加一层防火墙fn(data)}// 在捕获 Panic 下运行某个方法无入参funcRunWithRecoverPanic2(fnfunc()){deferRecoverPanic()// 增加一层防火墙fn()}// 在捕获 Panic 下 异步运行(goroutine) 某个方法funcAsyncRunWithRecoverPanic[T any](data T,fnfunc(T)){gofunc(d T){deferRecoverPanic()// 增加一层防火墙fn(d)}(data)}// 在捕获 Panic 下 异步运行(goroutine) 某个方法无入参funcAsyncRunWithRecoverPanic2(fnfunc()){gofunc(){deferRecoverPanic()// 增加一层防火墙fn()}()}