Golang 中对时间控制的方法和技巧

文摘   2024-08-10 19:53   中国香港  

本文会介绍在 Go 语言中实现时间控制的方法和技巧,包括超时控制、资源回收、周期性任务执行以及如何使用 cron 包进行复杂的时间控制任务。

问题介绍

最近,我遇到了这样一个需求:我的程序需要定期安排一些任务,要求对时间进行精细控制。让我们看下程序的组件架构图开始:

具体要求是,管理程序需要定期触发任务分配,而工作程序也需要定期从数据库中获取任务并执行。此外,管理器和 Worker 的执行时间必须固定。

例如,让我们从时间的角度来考虑 manager 执行 20 分钟的任务和 worker 执行 60 分钟的任务:

显然,manager 和 worker 都需要定期执行任务。那么,我们应该如何在 Go 中控制时间呢?有哪些用例?我该如何实现这一特定需求?

Time Control

在总结的过程中,我发现时间控制大致可以分为两种情况:

  • 超时和资源恢复场景(channel+context, channel + time.After/time.NewTimer)
  • 周期性场景 (channel + time.After/time.NewTimer, cron package)

不同的场景需要控制不同的方面,让我们继续往下看。

channel + context

func main() {
 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
 defer cancel()
 done := make(chan bool)
 go func(ctx context.Context) {
  ctx, cancel := context.WithTimeout(ctx, time.Duration(time.Millisecond*100))
  defer cancel()
        // Call downstream interface
  fmt.Println("Call downstream interface")
        // if success
  done <- true
 }(ctx)

    // do something
 select {
 case <-done:
  fmt.Println("call successfully!!!")
  // do something
  return
 case <-ctx.Done():
  fmt.Println("timeout or do something else")
  // retry or recycle resource
  return
 }
}

主要逻辑是利用上下文进行时间控制。主程序设置了 800 毫秒的超时时间,如果子程序没有完成执行,我们最多等待 800 毫秒。如果超时,我们可以考虑重试或回收资源。如果子程序请求成功,我们就可以继续主程序的业务逻辑。

可以看出,在这种情况下,时间控制的重点是请求超时和资源恢复。

channel + time.After/time.NewTimer

func main() {
 ch := make(chan struct{}, 1)
 go func() {
  fmt.Println("do something...")
  time.Sleep(4 * time.Second)
  ch <- struct{}{}
 }()

 select {
 case <-ch:
  fmt.Println("thing done")
 case <-time.After(3 * time.Second):
  fmt.Println("timeout")
 }
}

使用 time.After 也可用于超时控制。但是,每次调用 time.After,都会创建一个新的定时器和相应的通道,而且没有很好的资源恢复方法。我不建议使用这种方法。为避免潜在的内存泄漏,常见的做法是使用 time.NewTimer 而不是 time.After,如下所示:

func main() {
 exitCh := make(chan struct{})
 ch := make(chan struct{}, 1)
 timer := time.NewTimer(3 * time.Second)

 // Listen for specified signal
 sig := make(chan os.Signal, 1)
 signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

 go func() {
  fmt.Println("do something...")
  time.Sleep(4 * time.Second)
  ch <- struct{}{}
 }()

 // In a goroutine, listen to os.Interrupt and SIGTERM signals
 go func() {
  <-sig
  fmt.Println("Received an interrupt, stopping services...")
  close(exitCh) // Close exitCh when an interrupt signal is received
 }()

 for {
  select {
  case <-ch:
   fmt.Println("done")
   if !timer.Stop() {
    <-timer.C
   }
   return
  case <-timer.C:
   fmt.Println("timeout")
   return
  case <-exitCh:
   fmt.Println("Service exiting")
   if !timer.Stop() {
    <-timer.C
   }
   // Execute your resource recovery code here ...
   return
  }
 }
}

与 context 相比,我认为 context 是一种更优雅的方法,因为它可以结合上下文的特性进行更精细的控制(如主程序取消,子程序也取消)。然而,channel 方法是一种更简单的方法,两者在超时控制场景中有不同的应用。

当然,时间 channel 也可用于周期性工作。

// Define a task, use an empty struct as a simple example here
type Task struct{}

func worker(taskCh <-chan *Task) {
 for task := range taskCh {
  // Execute the task
  // For demonstration, assume each task takes 1 second
  time.Sleep(2 * time.Second)
  _ = task
  fmt.Println("run task")
 }
}

func main() {
 taskCh := make(chan *Task, 100)

 // Start 3 worker goroutines to perform tasks
 for i := 0; i < 3; i++ {
  go worker(taskCh)
 }

 // Create a timer, execute once every second
 timer := time.NewTicker(500 * time.Millisecond)

 go func() {
  for range timer.C {
   // Every time the timer signal is received, print the current length of the queue
   fmt.Printf("Current task queue length is: %d\n", len(taskCh))
  }
 }()

 // Simulate continuous task adding
 for {
  taskCh <- &Task{}
  time.Sleep(500 * time.Millisecond) // Simulate adding a task every 500 milliseconds
 }
}

注:在实际代码中,可能需要更好的方法来停止计时器并退出程序。此外,本例中的任务是一个空结构。在实际案例中,您需要定义一个符合业务需求的任务结构。

此示例可用于观察服务的正常运行。它会定期检查任务通道的长度,如果长度长时间为零,则我们的任务很可能发生了异常。

cron package

定期观察任务通道的长度是一项轻量级任务,但如果是更复杂的情况呢?答案就是 cron package[1]

调用者可以注册 Funcs,以便在给定的时间表上调用。Cron 将在自己的程序中运行它们。

c := cron.New()
c.AddFunc("0 30 * * * *"func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("@hourly"func() { fmt.Println("Every hour") })
c.AddFunc("@every 1h30m"func() { fmt.Println("Every hour thirty") })
c.Start()
...
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
c.AddFunc("@daily"func() { fmt.Println("Every day") })
...
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())
...
c.Stop()  // Stop the scheduler (does not stop any jobs already running).

最终我们会选择使用 cron package 来实现。

结论

无论是 pacakage time 还是 cron pacakage,它们都依赖于 channel 的力量。我非常感谢 golang context 和 channel 的设计,它让时间控制相关代码的开发变得更加容易。

参考资料
[1]

robfig/cron: https://pkg.go.dev/github.com/robfig/cron


Go Official Blog
Golang官方博客的资讯翻译及独家解读
 最新文章