Race condition on fixed number of workers pattern

Issue

I’m playing with some code for learning purposes and I am getting a race condition on its execution when using the -race flag and I want to understand why. The code starts a fixed set of goroutines that act as workers consuming tasks from a channel, there is no fixed number of tasks, as long as the channel receives tasks the workers must keep working.

I’m getting a race condition when calling the WaitGroup functions. From what I understand (taking a look at the data race report) the race condition happens when the first wg.Add call is executed by one of the spawned goroutines and the main routine calls wg.Wait at the same time. Is that correct? If it is, it means that I must always execute calls to Add on the main routine to avoid this kind of race on the resource? But, that also would mean that I need to know how many tasks the workers will need to handle in advance, which kinds of sucks if I need that the code handles any number of tasks that may come once the workers are running…

The code:

func Test(t *testing.T) {
    t.Run("", func(t *testing.T) {
        var wg sync.WaitGroup
        queuedTaskC := make(chan func())
        for i := 0; i < 5; i++ {
            wID := i + 1
            go func(workerID int) {
                for task := range queuedTaskC {
                    wg.Add(1)
                    task()
                }
            }(wID)
        }

        taskFn := func() {
            fmt.Println("executing task...")
            wg.Done()
        }
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn

        wg.Wait()
        close(queuedTaskC)

        fmt.Println(len(queuedTaskC))
    })
}

The report:

==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
  internal/race.Read()
      /src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /src/sync/waitgroup.go:71 +0x219
  workerpool.Test.func1.1()
      /workerpool/workerpool_test.go:36 +0x64

Previous write at 0x00c0001280d8 by goroutine 8:
  internal/race.Write()
      /src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /src/sync/waitgroup.go:128 +0x126
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:57 +0x292
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 11 (running) created at:
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:34 +0xe4
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /src/testing/testing.go:1168 +0x5bb
  workerpool.Test()
      workerpool_test.go:29 +0x4c
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202
==================

Solution

WaitGroup implementation is based on the internal counter which is changed by Add and Done methods. The Wait method will not return until the counter is zeroed. It is also possible to reuse WaitGroup but under certain conditions described in the documentation:

// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.

Although your code is not reusing wg it’s able to zero the WaitGroup counter multiple times. This happens when no tasks are being processed at a given time, which is entirely possible in concurrent code. And since your code does not wait Wait to return before calling Add you get the race condition error.

As everyone suggests in the comments you should abandon the idea of tracking the task with WaitGroup in favor of controlling running goroutines. Attaching the code proposal.

func Test(t *testing.T) {
    var wg sync.WaitGroup
    queuedTaskC := make(chan func(), 10)
    for i := 0; i < 5; i++ {
        wID := i + 1
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for task := range queuedTaskC {
                task()
            }
        }(wID)
    }
    for i := 0; i < 10; i++ {
        queuedTaskC <- func() {
            fmt.Println("executing task...")
        }
    }
    close(queuedTaskC)
    wg.Wait()
    fmt.Println(len(queuedTaskC))
}

Answered By – Jaroslaw

Answer Checked By – Marie Seifert (GoLangFix Admin)

Leave a Reply

Your email address will not be published.