简介
gron
是一个比较小巧、灵活的定时任务库,可以执行定时的、周期性的任务。gron
提供简洁的、并发安全的接口。我们先介绍gron
库的使用,然后简单分析一下源码。
快速使用
先安装:
1
|
$ go get github.com/roylee0704/gron
|
后使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package main
import (
"fmt"
"sync"
"time"
"github.com/roylee0704/gron"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
c := gron.New()
c.AddFunc(gron.Every(5*time.Second), func() {
fmt.Println("runs every 5 seconds.")
})
c.Start()
wg.Wait()
}
|
gron
的使用比较简单:
- 首先调用
gron.New()
创建一个管理器,这是一个定时任务的管理器;
- 然后调用管理器的
AddFunc()
或Add()
方法向它添加任务,在启动时添加也是可以的,见下文分析;
- 最后调用管理器的
Start()
方法启动它。
gron
支持两种添加任务的方式,一种是使用无参数的函数,另一种是实现任务接口。上面例子中使用的是前一种方式,实现接口的方式我们后面会介绍。添加任务时通过gron.Every()
指定周期任务的间隔,上面添加了一个 5s 的周期任务,每隔 5s 输出一行文字。
需要注意的是,我们使用sync.WaitGroup
保证主 goroutine 不退出。因为c.Start()
中只是启动了一个 goroutine,如果主 goroutine 退出了,整个程序就停止了。
运行程序,每隔 5s 输出:
1
2
3
|
runs every 5 seconds.
runs every 5 seconds.
runs every 5 seconds.
|
该程序需要按下ctrl + c
停止!
时间格式
gron
接受time.Duration
类型的时间间隔,除了time
包中定义的基础Second/Minute/Hour
,gron
中的xtime
子包还提供了Day/Week
单位的时间。有一点需要注意,gron
支持的时间精度为 1s,小于 1s 的间隔是不支持的。除了单位时间间隔,我们还可以使用4m10s
这样的时间:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func main() {
var wg sync.WaitGroup
wg.Add(1)
c := gron.New()
c.AddFunc(gron.Every(1*time.Second), func() {
fmt.Println("runs every second.")
})
c.AddFunc(gron.Every(1*time.Minute), func() {
fmt.Println("runs every minute.")
})
c.AddFunc(gron.Every(1*time.Hour), func() {
fmt.Println("runs every hour.")
})
c.AddFunc(gron.Every(1*xtime.Day), func() {
fmt.Println("runs every day.")
})
c.AddFunc(gron.Every(1*xtime.Week), func() {
fmt.Println("runs every week.")
})
t, _ := time.ParseDuration("4m10s")
c.AddFunc(gron.Every(t), func() {
fmt.Println("runs every 4 minutes 10 seconds.")
})
c.Start()
wg.Wait()
}
|
通过gron.Every()
设置每隔多长时间执行一次任务。对于大于 1 天的时间间隔,我们还可以使用gron.Every().At()
指定其在某个时间点执行。例如下面的程序,从第二天的22:00
开始,每隔一天触发一次,即每天的22:00
触发:
1
2
3
4
5
6
7
8
9
10
11
12
|
func main() {
var wg sync.WaitGroup
wg.Add(1)
c := gron.New()
c.AddFunc(gron.Every(1*xtime.Day).At("22:00"), func() {
fmt.Println("runs every second.")
})
c.Start()
wg.Wait()
}
|
自定义任务
实现自定义任务也很简单,只需要实现gron.Job
接口即可:
1
2
3
4
|
// src/github.com/roylee0704/gron/cron.go
type Job interface {
Run()
}
|
我们需要调用调度器的Add()
方法向管理器添加自定义任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
type GreetingJob struct {
Name string
}
func (g GreetingJob) Run() {
fmt.Println("Hello ", g.Name)
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
g1 := GreetingJob{Name: "dj"}
g2 := GreetingJob{Name: "dajun"}
c := gron.New()
c.Add(gron.Every(5*time.Second), g1)
c.Add(gron.Every(10*time.Second), g2)
c.Start()
wg.Wait()
}
|
上面我们编写了一个GreetingJob
结构,实现gron.Job
接口,然后创建两个对象g1/g2
,一个 5s 触发一次,一个 10s 触发一次。使用自定义任务的方式可以比较好地处理携带状态的任务,如上面的Name
字段。
实际上,AddFunc()
方法内部也是通过Add()
实现的:
1
2
3
4
5
6
7
8
9
10
|
// src/github.com/roylee0704/gron/cron.go
func (c *Cron) AddFunc(s Schedule, j func()) {
c.Add(s, JobFunc(j))
}
type JobFunc func()
func (j JobFunc) Run() {
j()
}
|
在AddFunc()
内部,将传入的函数转为JobFunc
类型,而gron
为JobFunc
实现了gron.Job
接口。是不是与net/http
包中的HandleFunc
和Handle
很像。如果注意观察的话,在很多 Go 语言的代码中都有此类模式。
一点源码
gron
的源码只有两个文件cron.go
和schedule.go
,cron.go
中实现添加任务和调度的方法,schedule.go
中是时间策略相关的代码。两个文件算上注释一共才 260 行!我们添加的任务在gron
内部都是以Entry
结构表示的:
1
2
3
4
5
6
|
type Entry struct {
Schedule Schedule
Job Job
Next time.Time
Prev time.Time
}
|
Next
为下次执行时间,Prev
为上次执行时间,Job
是要执行的任务,Schedule
为gron.Schedule
接口类型,调用其Next()
可计算出下次执行的时间点。
管理器使用gron.Cron
结构表示:
1
2
3
4
5
6
|
type Cron struct {
entries []*Entry
running bool
add chan *Entry
stop chan struct{}
}
|
任务的调度在另外一个 goroutine 中。如果调度未开始,添加任务可直接append
到entries
切片中;如果调度已开始(Start()
方法已调用),需要向通道add
发送待添加的任务。任务调度的核心逻辑在Run()
方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
func (c *Cron) run() {
var effective time.Time
now := time.Now().Local()
// to figure next trig time for entries, referenced from now
for _, e := range c.entries {
e.Next = e.Schedule.Next(now)
}
for {
sort.Sort(byTime(c.entries))
if len(c.entries) > 0 {
effective = c.entries[0].Next
} else {
effective = now.AddDate(15, 0, 0) // to prevent phantom jobs.
}
select {
case now = <-after(effective.Sub(now)):
// entries with same time gets run.
for _, entry := range c.entries {
if entry.Next != effective {
break
}
entry.Prev = now
entry.Next = entry.Schedule.Next(now)
go entry.Job.Run()
}
case e := <-c.add:
e.Next = e.Schedule.Next(time.Now())
c.entries = append(c.entries, e)
case <-c.stop:
return // terminate go-routine.
}
}
}
|
执行流程如下:
- 调度器刚启动时,先计算所有任务的下次执行时间;
- 然后在一个
for
循环中,按照执行时间从早到晚排序,取出最近需要执行任务的时间点;
- 在
select
语句中等待到这个时间点,启动新的 goroutine 执行到期的任务,每个任务一个新的 goroutine;
- 如果在等待的过程中,又添加了新的任务(通过通道
c.add
),计算这个新任务的首次执行时间。跳到步骤 2,因为新添加的任务可能最早执行。
有几个细节需要注意一下:
- 任务到期判断使用的是本地时间:
time.Now().Local()
;
- 如果没有任务,等待时间设置为
now.AddDate(15, 0, 0)
,即 15 年,防止 CPU 空转;
- 任务都是在独立的 goroutine 中执行的;
- 通过实现
sort.Interface
接口可以实现自定义排序(代码中的byTime
)。
最后,我们来看一下时间策略的代码。我们知道在Entry
结构中存储了一个gron.Schedule
类型的对象,调用该对象的Next()
方法返回下次执行的时间点:
1
2
3
4
|
// src/github.com/roylee0704/gron/schedule.go
type Schedule interface {
Next(t time.Time) time.Time
}
|
gron
内置实现了两种Schedule
,一种是periodicSchedule
,即周期触发,gron.Every()
函数返回的就是这个对象:
1
2
3
4
|
// src/github.com/roylee0704/gron/schedule.go
type periodicSchedule struct {
period time.Duration
}
|
一种是固定时刻的周期触发,它实际上也是周期触发,只是固定了时间点:
1
2
3
4
5
|
type atSchedule struct {
period time.Duration
hh int
mm int
}
|
他们的核心逻辑在Next()
方法中,periodicSchedule
只需要用当前时间加上周期即可得到下次触发时间。这里Truncate()
方法截掉了当前时间中小于 1s 的部分:
1
2
3
|
func (ps periodicSchedule) Next(t time.Time) time.Time {
return t.Truncate(time.Second).Add(ps.period)
}
|
atSchedule
的Next()
方法先计算当天该时间点,再加上周期就是下次触发的时间:
1
2
3
4
5
6
7
8
9
10
11
|
func (as atSchedule) reset(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), as.hh, as.mm, 0, 0, time.UTC)
}
func (as atSchedule) Next(t time.Time) time.Time {
next := as.reset(t)
if t.After(next) {
return next.Add(as.period)
}
return next
}
|
periodicSchedule
提供了At()
方法可以转为atSchedule
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (ps periodicSchedule) At(t string) Schedule {
if ps.period < xtime.Day {
panic("period must be at least in days")
}
// parse t naively
h, m, err := parse(t)
if err != nil {
panic(err.Error())
}
return &atSchedule{
period: ps.period,
hh: h,
mm: m,
}
}
|
自定义时间策略
我们可以很轻松的实现一个自定义的时间策略。例如,我们要实现一个“指数退避”的时间序列,先等待 1s,然后 2s、4s…
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
type ExponentialBackOffSchedule struct {
last int
}
func (e *ExponentialBackOffSchedule) Next(t time.Time) time.Time {
interval := time.Duration(math.Pow(2.0, float64(e.last))) * time.Second
e.last += 1
return t.Truncate(time.Second).Add(interval)
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
c := gron.New()
c.AddFunc(&ExponentialBackOffSchedule{}, func() {
fmt.Println(time.Now().Local().Format("2006-01-02 15:04:05"), "hello")
})
c.Start()
wg.Wait()
}
|
运行结果如下:
1
2
3
4
|
2020-04-20 23:47:11 hello
2020-04-20 23:47:13 hello
2020-04-20 23:47:17 hello
2020-04-20 23:47:25 hello
|
第二次输出与第一次相差 2s,第三次与第二次相差 4s,第4次与第三次相差 8s,完美!
总结
本文介绍了gron
这个小巧的定时任务库,如何使用,如何自定义任务和时间策略,顺带分析了一下源码。gron
源码实现非常简洁,非常推荐阅读!
大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄
参考
- gron GitHub:https://github.com/roylee0704/gron
- Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib
我
我的博客:https://darjun.github.io
欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~