diff --git a/README.md b/README.md index fa8ef45..8d72540 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,24 @@ if err == context.DeadlineExceeded { } ``` +## Submitting Async Tasks + +`pool.Process()` is a sync API, and it will not return until the task is finished. Sometimes we need to submit async task. For async task, one simple way is: (see issue #9) + +```go +go func() { + foo := pool.Process(MyTask) +}() +``` + +However, the above method cannot control the number growth of goroutines. If there are 4 workers but 10000 tasks, the above method will create 10000 goroutines. But we wish these are only 4 worker goroutines running these tasks. In this case, `pool.Submit()` is a better choice: +``` go +pool.Submit(func() { + // put your task here + MyTask() +}) +``` + ## Changing Pool Size The size of a Tunny pool can be changed at any time with `SetSize(int)`: diff --git a/tunny.go b/tunny.go index 5957983..c6efc39 100644 --- a/tunny.go +++ b/tunny.go @@ -60,6 +60,10 @@ type Worker interface { // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() + + // bindPool is called when a pool create a worker. this worker will bind + // that pool + BindPool(p *Pool) Worker } //------------------------------------------------------------------------------ @@ -67,6 +71,7 @@ type Worker interface { // closureWorker is a minimal Worker implementation that simply wraps a // func(interface{}) interface{} type closureWorker struct { + pool *Pool processor func(interface{}) interface{} } @@ -77,13 +82,16 @@ func (w *closureWorker) Process(payload interface{}) interface{} { func (w *closureWorker) BlockUntilReady() {} func (w *closureWorker) Interrupt() {} func (w *closureWorker) Terminate() {} +func (w *closureWorker) BindPool(p *Pool) Worker {w.pool = p; return w} //------------------------------------------------------------------------------ // callbackWorker is a minimal Worker implementation that attempts to cast // each job into func() and either calls it if successful or returns // ErrJobNotFunc. -type callbackWorker struct{} +type callbackWorker struct{ + pool *Pool +} func (w *callbackWorker) Process(payload interface{}) interface{} { f, ok := payload.(func()) @@ -97,6 +105,7 @@ func (w *callbackWorker) Process(payload interface{}) interface{} { func (w *callbackWorker) BlockUntilReady() {} func (w *callbackWorker) Interrupt() {} func (w *callbackWorker) Terminate() {} +func (w *callbackWorker) BindPool(p *Pool) Worker {w.pool = p; return w} //------------------------------------------------------------------------------ @@ -255,6 +264,19 @@ func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{} return payload, nil } +// +func (p *Pool) Submit(payload interface{}) bool { + atomic.AddInt64(&p.queuedJobs, 1) + + request, open := <-p.reqChan + if !open { + panic(ErrPoolNotRunning) + } + + request.asyncJobChan <- payload + return true +} + // QueueLength returns the current count of pending queued jobs. func (p *Pool) QueueLength() int64 { return atomic.LoadInt64(&p.queuedJobs) @@ -274,7 +296,7 @@ func (p *Pool) SetSize(n int) { // Add extra workers if N > len(workers) for i := lWorkers; i < n; i++ { - p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor())) + p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor().BindPool(p))) } // Asynchronously stop all workers > N diff --git a/tunny_test.go b/tunny_test.go index aca6a2b..fd6c45f 100644 --- a/tunny_test.go +++ b/tunny_test.go @@ -241,6 +241,74 @@ func TestParallelJobs(t *testing.T) { } } +func TestSubmitJob(t *testing.T) { + pool := NewCallback(10) + defer pool.Close() + + jobGroup := sync.WaitGroup{} + jobGroup.Add(10) + + var counter int32 + for i := 0; i < 10; i++ { + ok := pool.Submit(func() { + time.Sleep(time.Millisecond) + atomic.AddInt32(&counter, 1) + jobGroup.Done() + }) + if ok != true { + t.Error("Failed to submit callback") + } + } + if exp, act := int32(10), counter; exp != act { + t.Logf("Haven't finish, so result is wrong: %v != %v", act, exp) + } + jobGroup.Wait() + if exp, act := int32(10), counter; exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) + } +} + +func TestSubmitJobBlock(t *testing.T) { + pool := NewCallback(10) + defer pool.Close() + + jobGroup := sync.WaitGroup{} + jobGroup.Add(11) + + var counter int32 + t1 := time.Now() + for i := 0; i < 10; i++ { + ok := pool.Submit(func() { + time.Sleep(time.Millisecond) + atomic.AddInt32(&counter, 1) + jobGroup.Done() + }) + if ok != true { + t.Error("Failed to submit callback") + } + } + + t2 := time.Now() + + ok := pool.Submit(func() { + time.Sleep(time.Millisecond) + atomic.AddInt32(&counter, 1) + jobGroup.Done() + }) + if ok != true { + t.Error("Failed to submit callback") + } + t3 := time.Now() + + if t3.Sub(t2).Microseconds() - t2.Sub(t1).Microseconds() < 800 { + t.Error("Job submitting didn't cause block") + } + jobGroup.Wait() + if exp, act := int32(11), counter; exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) + } +} + //------------------------------------------------------------------------------ type mockWorker struct { @@ -270,6 +338,10 @@ func (m *mockWorker) Terminate() { m.terminated = true } +func (m *mockWorker) BindPool(p *Pool) Worker { + return m +} + func TestCustomWorker(t *testing.T) { pool := New(1, func() Worker { return &mockWorker{ diff --git a/worker.go b/worker.go index 05a2285..d36e657 100644 --- a/worker.go +++ b/worker.go @@ -20,6 +20,8 @@ package tunny +import "sync/atomic" + //------------------------------------------------------------------------------ // workRequest is a struct containing context representing a workers intention @@ -28,6 +30,9 @@ type workRequest struct { // jobChan is used to send the payload to this worker. jobChan chan<- interface{} + // jobChan is used to send the payload to this worker. + asyncJobChan chan<- interface{} + // retChan is used to read the result from this worker. retChan <-chan interface{} @@ -80,7 +85,7 @@ func (w *workerWrapper) interrupt() { } func (w *workerWrapper) run() { - jobChan, retChan := make(chan interface{}), make(chan interface{}) + jobChan, retChan, asyncJobChan := make(chan interface{}), make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) @@ -94,6 +99,7 @@ func (w *workerWrapper) run() { case w.reqChan <- workRequest{ jobChan: jobChan, retChan: retChan, + asyncJobChan: asyncJobChan, interruptFunc: w.interrupt, }: select { @@ -104,6 +110,9 @@ func (w *workerWrapper) run() { case <-w.interruptChan: w.interruptChan = make(chan struct{}) } + case payload := <-asyncJobChan: + _ = w.worker.Process(payload) + atomic.AddInt64(&w.worker.(*callbackWorker).pool.queuedJobs, -1) case <-w.interruptChan: w.interruptChan = make(chan struct{}) }