本文会介绍在 Go 语言中实现时间控制的方法和技巧,包括超时控制、资源回收、周期性任务执行以及如何使用 cron 包进行复杂的时间控制任务。
问题介绍
最近,我遇到了这样一个需求:我的程序需要定期安排一些任务,要求对时间进行精细控制。让我们看下程序的组件架构图开始:
具体要求是,管理程序需要定期触发任务分配,而工作程序也需要定期从数据库中获取任务并执行。此外,管理器和 Worker 的执行时间必须固定。
例如,让我们从时间的角度来考虑 manager 执行 20 分钟的任务和 worker 执行 60 分钟的任务:
显然,manager 和 worker 都需要定期执行任务。那么,我们应该如何在 Go 中控制时间呢?有哪些用例?我该如何实现这一特定需求?
Time Control
在总结的过程中,我发现时间控制大致可以分为两种情况:
超时和资源恢复场景(channel+context, channel + time.After/time.NewTimer) 周期性场景 (channel + time.After/time.NewTimer, cron package)
不同的场景需要控制不同的方面,让我们继续往下看。
channel + context
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
defer cancel()
done := make(chan bool)
go func(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(time.Millisecond*100))
defer cancel()
// Call downstream interface
fmt.Println("Call downstream interface")
// if success
done <- true
}(ctx)
// do something
select {
case <-done:
fmt.Println("call successfully!!!")
// do something
return
case <-ctx.Done():
fmt.Println("timeout or do something else")
// retry or recycle resource
return
}
}
主要逻辑是利用上下文进行时间控制。主程序设置了 800 毫秒的超时时间,如果子程序没有完成执行,我们最多等待 800 毫秒。如果超时,我们可以考虑重试或回收资源。如果子程序请求成功,我们就可以继续主程序的业务逻辑。
可以看出,在这种情况下,时间控制的重点是请求超时和资源恢复。
channel + time.After/time.NewTimer
func main() {
ch := make(chan struct{}, 1)
go func() {
fmt.Println("do something...")
time.Sleep(4 * time.Second)
ch <- struct{}{}
}()
select {
case <-ch:
fmt.Println("thing done")
case <-time.After(3 * time.Second):
fmt.Println("timeout")
}
}
使用 time.After 也可用于超时控制。但是,每次调用 time.After,都会创建一个新的定时器和相应的通道,而且没有很好的资源恢复方法。我不建议使用这种方法。为避免潜在的内存泄漏,常见的做法是使用 time.NewTimer 而不是 time.After,如下所示:
func main() {
exitCh := make(chan struct{})
ch := make(chan struct{}, 1)
timer := time.NewTimer(3 * time.Second)
// Listen for specified signal
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
go func() {
fmt.Println("do something...")
time.Sleep(4 * time.Second)
ch <- struct{}{}
}()
// In a goroutine, listen to os.Interrupt and SIGTERM signals
go func() {
<-sig
fmt.Println("Received an interrupt, stopping services...")
close(exitCh) // Close exitCh when an interrupt signal is received
}()
for {
select {
case <-ch:
fmt.Println("done")
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
fmt.Println("timeout")
return
case <-exitCh:
fmt.Println("Service exiting")
if !timer.Stop() {
<-timer.C
}
// Execute your resource recovery code here ...
return
}
}
}
与 context 相比,我认为 context 是一种更优雅的方法,因为它可以结合上下文的特性进行更精细的控制(如主程序取消,子程序也取消)。然而,channel 方法是一种更简单的方法,两者在超时控制场景中有不同的应用。
当然,时间 channel 也可用于周期性工作。
// Define a task, use an empty struct as a simple example here
type Task struct{}
func worker(taskCh <-chan *Task) {
for task := range taskCh {
// Execute the task
// For demonstration, assume each task takes 1 second
time.Sleep(2 * time.Second)
_ = task
fmt.Println("run task")
}
}
func main() {
taskCh := make(chan *Task, 100)
// Start 3 worker goroutines to perform tasks
for i := 0; i < 3; i++ {
go worker(taskCh)
}
// Create a timer, execute once every second
timer := time.NewTicker(500 * time.Millisecond)
go func() {
for range timer.C {
// Every time the timer signal is received, print the current length of the queue
fmt.Printf("Current task queue length is: %d\n", len(taskCh))
}
}()
// Simulate continuous task adding
for {
taskCh <- &Task{}
time.Sleep(500 * time.Millisecond) // Simulate adding a task every 500 milliseconds
}
}
注:在实际代码中,可能需要更好的方法来停止计时器并退出程序。此外,本例中的任务是一个空结构。在实际案例中,您需要定义一个符合业务需求的任务结构。
此示例可用于观察服务的正常运行。它会定期检查任务通道的长度,如果长度长时间为零,则我们的任务很可能发生了异常。
cron package
定期观察任务通道的长度是一项轻量级任务,但如果是更复杂的情况呢?答案就是 cron package[1]。
调用者可以注册 Funcs,以便在给定的时间表上调用。Cron 将在自己的程序中运行它们。
c := cron.New()
c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("@hourly", func() { fmt.Println("Every hour") })
c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") })
c.Start()
...
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
c.AddFunc("@daily", func() { fmt.Println("Every day") })
...
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())
...
c.Stop() // Stop the scheduler (does not stop any jobs already running).
最终我们会选择使用 cron package 来实现。
结论
无论是 pacakage time 还是 cron pacakage,它们都依赖于 channel 的力量。我非常感谢 golang context 和 channel 的设计,它让时间控制相关代码的开发变得更加容易。
robfig/cron: https://pkg.go.dev/github.com/robfig/cron