Nested errgroup inside bunch of goroutines

Issue

I am fairly new to golang and its concurrency principles. My use-case involves performing multiple http requests(for a single entity), on batch of entities. If any of the http request fails for an entity, I need to stop all parallel http requests for it. Also, I have to manage counts of entities failed with errors. I am trying to implement errorgroup inside entities goroutines, such that if any http request fails for a single entity the errorgroup terminates and return error to its parent goroutine. But I am not sure how to maintain count of errors.

func main(entity[] string) {
    errorC := make(chan string) // channel to insert failed entity
    var wg sync.WaitGroup

    for _, link := range entity {
        wg.Add(1)
        // Spawn errorgroup here. errorgroup_spawn
    }

    go func() {
        wg.Wait()   
        close(errorC)    
    }()

    for msg := range errorC {
        // here storing error entityIds somewhere.
    }
}

and errorgroup like this

func errorgroup_spawn(ctx context.Context, errorC chan string, wg *sync.WaitGroup) { // and other params
    defer (*wg).Done()
    
   goRoutineCollection, ctxx := errgroup.WithContext(ctx)
    results := make(chan *result)
    goRoutineCollection.Go(func() error {
        // http calls for single entity
        // if error occurs, push it in errorC, and return Error.
        return nil
    })

    go func() {
        goRoutineCollection.Wait()
        close(result)
    }()

   return goRoutineCollection.Wait()
}

PS: I was also thinking to apply nested errorgroups, but can’t think to maintain error counts, while running other errorgroups
Can anyone guide me, is this a correct approach to handle such real world scenarios?

Solution

One way to keep track of errors is to use a status struct to keep track of which error came from where:

type Status struct {
   Entity string
   Err error
}
...

errorC := make(chan Status) 

// Spawn error groups with name of the entity, and when error happens, push Status{Entity:entityName,Err:err} to the chanel

You can then read all errors from the error channel and figure out what failed why.

Another option is not to use errorgroups at all. This makes things more explicit, but whether it is better or not is debatable:

// Keep entity statuses
statuses:=make([]Status,len(entity))
for i, link := range entity {
   statuses[i].Entity=link
   wg.Add(1)
   go func(i index) {
      defer wg.Done()
      ctx, cancel:=context.WithCancel(context.Background())
      defer cancel()

      // Error collector
      status:=make(chan error)
      defer close(status)
      go func() {
         for st:=range status {
             if st!=nil {
                cancel()  // Stop all calls 
                // store first error
                if statuses[i].Err==nil {
                   statuses[i].Err=st
                }
             }
         }
      }()

      innerWg:=sync.WaitGroup{}
      innerWg.Add(1)
      go func() {
         defer innerWg.Done()
         status<- makeHttpCall(ctx)
      }()
      innerWg.Add(1)
      go func() {
         defer innerWg.Done()
         status<- makeHttpCall(ctx)
      }()
      ...
      innerWg.Wait()

   }(i)
}

When everything is done, statuses will contain all entities and corresponding statuses.

Answered By – Burak Serdar

Answer Checked By – Clifford M. (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.