diff --git a/worker.go b/worker.go index 05a2285..306018d 100644 --- a/worker.go +++ b/worker.go @@ -91,24 +91,34 @@ func (w *workerWrapper) run() { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { - case w.reqChan <- workRequest{ - jobChan: jobChan, - retChan: retChan, - interruptFunc: w.interrupt, - }: + // give more priority to closeChan. Because select will randomly select one when multiple channel available, + // when many closeChan and reqChan come at the same time in extreme cases, giving high priority to closeChan + // to prevent closeChan to be ignored all the time + case <-w.closeChan: + return + default: + nextReq := workRequest{ + jobChan: jobChan, + retChan: retChan, + interruptFunc: w.interrupt, + } + select { - case payload := <-jobChan: - result := w.worker.Process(payload) + case w.reqChan <- nextReq: select { - case retChan <- result: + case payload := <-jobChan: + result := w.worker.Process(payload) + select { + case retChan <- result: + case <-w.interruptChan: + w.interruptChan = make(chan struct{}) + } case <-w.interruptChan: w.interruptChan = make(chan struct{}) } - case <-w.interruptChan: - w.interruptChan = make(chan struct{}) + case <-w.closeChan: + return } - case <-w.closeChan: - return } } }