Race condition between close and send to channel

Issue

I’m trying to build a generic pipeline library using worker pools. I created an interface for a source, pipe, and sink. You see, the pipe’s job is to receive data from an input channel, process it, and output the result onto a channel. Here is its intended behavior:

  1. Receive data from an input channel.
  2. Delegate the data to an available worker.
  3. The worker sends the result to the output channel.
  4. Close the output channel once all workers are finished.
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
    var wg sync.WaitGroup
    out = make(chan interface{}, 100)
    go func() {
        for i := 1; i <= 100; i++ {
            go p.work(in, out, &wg)
        }
        wg.Wait()
        close(out)
    }()

    return
}

func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
    for j := range jobs {
        func(j Job) {
            defer wg.Done()
            wg.Add(1)

            res := doSomethingWith(j)

            out <- res
        }(j)
    }
}

However, running it may either exit without processing all of the inputs or panic with a send on closed channel message. Building the source with the -race flag gives out a data race warning between close(out) and out <- res.

Here’s what I think might happen. Once a number of workers have finished their jobs, there’s a split second where wg‘s counter reach zero. Hence, wg.Wait() is done and the program proceeds to close(out). Meanwhile, the job channel isn’t finished producing data, meaning some workers are still running in another goroutine. Since the out channel is already closed, it results in a panic.

Should the wait group be placed somewhere else? Or is there a better way to wait for all workers to finish?

Solution

It’s not clear why you want one worker per job, but if you do, you can restructure your outer loop setup (see untested code below). This kind of obviates the need for worker pools in the first place.

Always, though, do a wg.Add before spinning off any worker. Right here, you are spinning off exactly 100 workers:

var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
    for i := 1; i <= 100; i++ {
        go p.work(in, out, &wg)
    }
    wg.Wait()
    close(out)
}()

You could therefore do this:

var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
    wg.Add(100)  // ADDED - count the 100 workers
    for i := 1; i <= 100; i++ {
        go p.work(in, out, &wg)
    }
    wg.Wait()
    close(out)
}()

Note that you can now move wg itself down into the goroutine that spins off the workers. This can make things cleaner, if you give up on the notion of having each worker spin off jobs as new goroutines. But if each worker is going to spin off another goroutine, that worker itself must also use wg.Add, like this:

for j := range jobs {
    wg.Add(1)  // ADDED - count the spun-off goroutines
    func(j Job) {
        res := doSomethingWith(j)

        out <- res
        wg.Done()  // MOVED (for illustration only, can defer as before)
    }(j)
}
wg.Done() // ADDED - our work in `p.work` is now done

That is, each anonymous function is another user of the channel, so increment the users-of-channel count (wg.Add(1)) before spinning off a new goroutine. When you have finished reading the input channel jobs, call wg.Done() (perhaps via an earlier defer, but I showed it at the end here).

The key to thinking about this is that wg counts the number of active goroutines that could, at this point, write to the channel. It only goes to zero when no goroutines intend to write any more. That makes it safe to close the channel.


Consider using the rather simpler (but untested):

func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
    out = make(chan interface{})
    var wg sync.WaitGroup
    go func() {
        defer close(out)
        for j := range in {
            wg.Add(1)
            go func(j Job) {
                res := doSomethingWith(j)
                out <- res
                wg.Done()
            }(j)
        }
        wg.Wait()
    }()
    return out
}

You now have one goroutine that is reading the in channel as fast as it can, spinning off jobs as it goes. You’ll get one goroutine per incoming job, except when they finish their work early. There is no pool, just one worker per job (same as your code except that we knock out the pools that aren’t doing anything useful).


Or, since there are only some number of CPUs available, spin off some number of goroutines as you did before at the start, but have each one run one job to completion, and deliver its result, then go back to reading the next job:

func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
    out = make(chan interface{})
    go func() {
        defer close(out)
        var wg sync.WaitGroup
        ncpu := runtime.NumCPU()  // or something fancier if you like
        wg.Add(ncpu)
        for i := 0; i < ncpu; i++ {
            go func() {
                defer wg.Done()
                for j := range in {
                    out <- doSomethingWith(j)
                }
            }()
        }
        wg.Wait()
    }
    return out
}

By using runtime.NumCPU() we get only as many workers reading jobs as there are CPUs to run jobs. Those are the pools and they only do one job at a time.

There’s generally no need to buffer the output channel, if the output-channel readers are well-structured (i.e., don’t cause the pipeline to constipate). If they’re not, the depth of buffering here limits how many jobs you can “work ahead” of whoever is consuming the results. Set it based on how useful it is to do this “working ahead”—not necessarily the number of CPUs, or the number of expected jobs, or whatever.

Answered By – torek

Answer Checked By – Cary Denson (GoLangFix Admin)

Leave a Reply

Your email address will not be published.