Concurrent handler is blocking

Issue

We found one mqtt.MessageHandler is not working properly. In the handler, we will filter the coming message then pass the valid event to one func to process. The func is implemented as below:

func processEvent(i models.Foo) (string, error) {
    var wg sync.WaitGroup
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)

    err := func1()
    if err != nil {
        return err
    }

    switch strings.ToUpper(i.Status) {
    case "OK":
        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask1()
            ch := done
            if err != nil {
                log.Error("%s", err.Error())
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask2()
            ch := done
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        result := "processed"
        count := 0
        for {
            select {
            case err := <-errc:
                close(quit)
                log.Info("event: %s, %s", "", err.Error())
                return "", err
            case <-done:
                count++
                if count == 4 { // why 4???
                    return result, nil
                }
            }
        }

        wg.Wait()

        if err != nil {
            log.Info("event: %s, %s", result, err.Error())
            return result, err
        }
        close(quit)
        close(errc)
        close(done)
        return result, nil
    default:
        return "", nil
    }

    return "", nil
}

I understand, it’s trying to sync the longTimeTask1() and longTimeTask2(). But it’s quite complex for me to understand. What’s the purpose of count and count == 4? Why the close at the last? The wg.Wait() is unreachable by the code hint.
Before this func is working well. but recently longTimeTask1() or longTimeTask2() might return some error, which breaks the code, this func seems is blocked totally. Could you please help me understand the code and find the potential issues and refactor this part?

Solution

Looking at count, it appears like the code is expecting to receive four messages from the done channel. However, this code can produce at most two such messages from the two goroutines, so that’s a bug.

Also, if any of the goroutines returns an error, it will not write to the done channel, so that’s another bug.

Another way to write this could be:

...
result := "processed"
for {
    select {
       case err := <-errc:
          close(quit) // Tell the goroutines to terminate
          log.Info("event: %s, %s", "", err.Error())
          wg.Wait() // Wait for them to finish
          return "", err
  
       case <-done:
          count++
          if count == 2 {
              wg.Wait()
              return result, nil
          }    
}

Answered By – Burak Serdar

Answer Checked By – Robin (GoLangFix Admin)

Leave a Reply

Your email address will not be published.