Shutdown "worker" go routine after buffer is empty

Issue

I want my go routine worker (ProcessToDo() in the code below) to wait until all “queued” work is processed before shutting down.

The worker routine has a “to do” channel (buffered), through which work is sent to it. And it has a “done” channel to tell it to start shutdown. The documentation says that the select on the channels will pick a “pseudo-random value” if more than one of the selects are met… which means the shutdown (return) is being triggered before all the buffered work is completed.

In the code sample below, I want all 20 messages to print…

package main

import (
    "time"
    "fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
    for {
        select {
        case work, ok := <-todo:
            if !ok {
                fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
                return
            }
            fmt.Printf("todo: %q\n", work)
            time.Sleep(100 * time.Millisecond)
        case _, ok := <-done:
            if ok {
                fmt.Printf("Shutting down ProcessToDo - done message received!\n")
            } else {
                fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
            }
            close(todo)
            return
        }
    }
}

func main() {

    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    time.Sleep(1 * time.Second)
    close(done)
    time.Sleep(4 * time.Second)
}

Solution

done channel in your case is completely unnecessary as you can signal the shutdown by closing the todo channel itself.

And use the for range on the channel which will iterate until the channel is closed and its buffer is empty.

You should have a done channel, but only so that the goroutine itself can signal that it finished work and so the main goroutine can continue or exit.

This variant is equivalent to yours, is much simpler and does not require time.Sleep() calls to wait other goroutines (which would be too erroneous and undeterministic anyway). Try it on the Go Playground:

func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
    done <- struct{}{} // Signal that we processed all jobs
}

func main() {
    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    <-done // Wait until the other goroutine finishes all jobs
}

Also note that worker goroutines should signal completion using defer so the main goroutine won’t get stuck waiting for the worker if it returns in some unexpected way, or panics. So it should rather start like this:

defer func() {
    done <- struct{}{} // Signal that we processed all jobs
}()

You can also use sync.WaitGroup to sync the main goroutine to the worker (to wait it up). In fact if you plan to use multiple worker goroutines, that is cleaner than to read multiple values from the done channel. Also it’s simpler to signal the completion with WaitGroup as it has a Done() method (which is a function call) so you don’t need an anonymous function:

defer wg.Done()

See JimB’s anwser for the complete example with WaitGroup.

Using the for range is also idiomatic if you want to use multiple worker goroutines: channels are synchronized so you don’t need any extra code that would synchronize access to the todo channel or the jobs received from it. And if you close the todo channel in the main(), that will properly signal all worker goroutines. But of course all queued jobs will be received and processed exactly once.

Now taking the variant that uses WaitGroup to make the main goroutine to wait for the worker (JimB’s answer): What if you want more than 1 worker goroutine; to process your jobs concurrently (and most likely parallel)?

The only thing you need to add / change in your code is this: to really start multiple of them:

for i := 0; i < 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}

Without changing anything else, you now have a correct, concurrent application which receives and processes your jobs using 10 concurrent goroutines. And we haven’t used any “ugly” time.Sleep() (we used one but only to simulate slow processing, not to wait other goroutines), and you don’t need any extra synchronization.

Answered By – icza

Answer Checked By – Dawn Plyler (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.