Go 協程池解析~通俗易懂

協程池主要是爲了減小 go 協程頻繁建立、銷燬帶來的性能損耗,雖然能夠忽略不計,可是網上說特殊狀況仍是有用的。app

那這個協程池通俗易懂來說,好比老闆給員工分配任務:性能

老闆領了一堆任務,得找工人幹活呀, 那領導就拿出一個任務,給一個空閒的員工 A,再把下一個任務,給另一個空閒的員工 B 。fetch

這時候 A 或者 B,指不定誰先忙完了spa

若是有人忙完了,領導就把下一個任務,給先忙完的人。A/B 就是協程池裏面的兩個協程code

下面這段代碼,完成了以下功能協程

  1. 協程池數量上限控制、最大空閒時間設置
  2. 定時清理空閒協程清理,釋放內存
  3. 任務分發
  4. 協程複用
package gopool
import (
   "context"
 "log" "sync" "time")
type Task func()
// boss 老闆
type GoPool struct {
   MaxWorkerIdleTime time.Duration // worker 最大空閒時間
 MaxWorkerNum int32 // 協程最大數量
 TaskEntryChan chan *Task // 任務入列
 Workers []*worker // 已建立worker
 FreeWorkerChan chan *worker // 空閒worker
 Lock sync.Mutex
}
const (
   WorkerStatusStop = 1
 WorkerStatusLive = 0
)
// 幹活的人
type worker struct {
   Pool *GoPool
 StartTime time.Time // 開始時間
 TaskChan chan *Task // 執行隊列
 LastWorkTime time.Time // 最後執行時間
 Ctx context.Context
 Cancel context.CancelFunc
 Status int32 // 被過時刪掉的標記
}
var defaultPool = func() *GoPool {
   return NewPool()
}()
// 初始化
func NewPool() *GoPool {
   g := &GoPool{
      MaxWorkerIdleTime: 10 * time.Second,
      MaxWorkerNum:      20,
      TaskEntryChan:     make(chan *Task, 2000),
      FreeWorkerChan:    make(chan *worker, 2000),
   }
   // 分發任務
 go g.dispatchTask()
   //清理空閒worker
 go g.fireWorker()
   return g
}
// 按期清理空閒worker
func (g *GoPool) fireWorker() {
   for {
      select {
      // 10秒執行一次
 case <-time.After(10 * time.Second):
         for k, w := range g.Workers {
            if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {
               log.Printf("overtime %v %p", k, w)
               // 終止協程
 w.Cancel()
               // 清理Free
 w.Status = WorkerStatusStop
            }
         }
         g.Lock.Lock()
         g.Workers = g.cleanWorker(g.Workers)
         g.Lock.Unlock()
      }
   }
}
// 遞歸清理無用worker
func (g *GoPool) cleanWorker(workers []*worker) []*worker {
   for k, w := range workers {
      if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {
         workers = append(workers[:k], workers[k+1:]...) // 刪除中間1個元素
 return g.cleanWorker(workers)
      }
   }
   return workers
}
// 分發任務
func (g *GoPool) dispatchTask() {
   for {
      select {
      case t := <-g.TaskEntryChan:
         log.Printf("dispatch task %p", t)
         // 獲取worker
 w := g.fetchWorker()
         // 將任務扔給worker
 w.accept(t)
      }
   }
}
// 獲取可用worker
func (g *GoPool) fetchWorker() *worker {
   for {
      select {
      // 獲取空閒worker
 case w := <-g.FreeWorkerChan:
         if w.Status == WorkerStatusLive {
            return w
         }
      default:
         // 建立新的worker
 if int32(len(g.Workers)) < g.MaxWorkerNum {
            w := &worker{
               Pool:         g,
               StartTime:    time.Now(),
               LastWorkTime: time.Now(),
               TaskChan:     make(chan *Task, 1),
               Ctx:          context.Background(),
               Status:       WorkerStatusLive,
            }
            ctx, cancel := context.WithCancel(w.Ctx)
            w.Cancel = cancel
            // 接到任務本身去執行吧
 go w.execute(ctx)
            g.Lock.Lock()
            g.Workers = append(g.Workers, w)
            g.Lock.Unlock()
            g.FreeWorkerChan <- w
            log.Printf("worker create %p", w)
         }
      }
   }
}
// 添加任務
func (g *GoPool) addTask(t Task) {
   // 將任務放到入口任務隊列
 g.TaskEntryChan <- &t
}
// 接受任務
func (w *worker) accept(t *Task) {
   // 每一個worker本身的工做隊列
 w.TaskChan <- t
}
// 執行任務
func (w *worker) execute(ctx context.Context) {
   for {
      select {
      case t := <-w.TaskChan:
         // 執行
 (*t)()
         // 記錄工做狀態
 w.LastWorkTime = time.Now()
         w.Pool.FreeWorkerChan <- w
      case <-ctx.Done():
         log.Printf("worker done %p", w)
         return
 }
   }
}
// 執行
func SafeGo(t Task) {
   defaultPool.addTask(t)
}