Creating cancelable workers with goroutines and context

Issue

I’m trying to understand how to properly use goroutines along with channels and context, to create a cancelable background worker.

I’m familiar with using contexts that can cancel when explicitly called, attaching it to the worker goroutine should let me stop the worker.

But I cant figure out how to use it to achieve what this.

The example below illustrates a worker goroutine that gets the data from a channel ‘urls’, and it also carries a cancelable context.

//worker.go
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d is starting\n", id)
    select {
    // placeholder for a channel writing the data from the URL
    case url := <-urls:
        fmt.Printf("Worker :%d received url :%s\n", id, url)
    // checking if the process is cancelled
    case <-ctx.Done():
        fmt.Printf("Worker :%d exitting..\n", id)
    }
    fmt.Printf("Worker :%d done..\n", id)
    wg.Done()
}

This doesn’t work for me for two reasons,

  1. For an unbuffered channel, writing to it with no goroutines to read from will block it, so once more data is added to the the urls channel, the sender will block.
  2. It returns immediately, once either of the two channel returns.

I also tried wrapping the select in an infinite loop, but adding a break after the context is raising error.

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d is starting\n", id)
    for {
        select {
        // placeholder for a channel writing the data from the URL
        case url := <-urls:
            fmt.Printf("Worker :%d received url :%s\n", id, url)
        // checking if the process is cancelled
        case <-ctx.Done():
            fmt.Printf("Worker :%d exitting..\n", id)
            break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
        }
    }
    fmt.Printf("Worker :%d done..\n", id) // code is unreachable
    wg.Done()
}

What is the right approach to implement something like this?

PS : Any resources / references about designing worker processes like these will be helpful too.

Solution

You can substitute the break by return and the code will work.

However, a better approach can be:

  1. Workers consume the channel in for / range loop
  2. The producer should be responsible for detect the cancel and close the channel. The for loop will stop in cascade

Answered By – Tiago Peczenyj

Answer Checked By – Mary Flores (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.