Idiomatic variable-size worker pool in Go

Issue

I’m trying to implement a pool of workers in Go. The go-wiki (and Effective Go in the Channels section) feature excellent examples of bounding resource use. Simply make a channel with a buffer that’s as large as the worker pool. Then fill that channel with workers, and send them back into the channel when they’re done. Receiving from the channel blocks until a worker is available. So the channel and a loop is the entire implementation — very cool!

Alternatively one could block on sending into the channel, but same idea.

My question is about changing the size of the worker pool while it’s running. I don’t believe there’s a way to change the size of a channel. I have some ideas, but most of them seem way too complicated. This page actually implements a semaphore using a channel and empty structs in much the same way, but it has the same problem (these things come up all the time while Googling for “golang semaphore”.

Solution

I would do it the other way round. Instead of spawning many goroutines (which still require a considerable amount of memory) and use a channel to block them, I would model the workers as goroutines and use a channel to distribute the work. Something like this:

package main

import (
    "fmt"
    "sync"
)

type Task string

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                return
            }
            fmt.Println("processing task", task)
        case <-quit:
            return
        }
    }
}

func main() {
    tasks := make(chan Task, 128)
    quit := make(chan bool)
    var wg sync.WaitGroup

    // spawn 5 workers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute some tasks
    tasks <- Task("foo")
    tasks <- Task("bar")

    // remove two workers
    quit <- true
    quit <- true

    // add three more workers
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute more tasks
    for i := 0; i < 20; i++ {
        tasks <- Task(fmt.Sprintf("additional_%d", i+1))
    }

    // end of tasks. the workers should quit afterwards
    close(tasks)
    // use "close(quit)", if you do not want to wait for the remaining tasks

    // wait for all workers to shut down properly
    wg.Wait()
}

It might be a good idea to create a separate WorkerPool type with some convenient methods. Also, instead of type Task string it is quite common to use a struct that also contains a done channel that is used to signal that the task had been executed successfully.

Edit: I’ve played around a bit more and came up with the following: http://play.golang.org/p/VlEirPRk8V. It’s basically the same example, with a nicer API.

Answered By – tux21b

Answer Checked By – Candace Johnson (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.