Handle goroutine termination and error handling via error group?

Issue

I am trying to read multiple files in parallel in such a way so that each go routine that is reading a file write its data to that channel, then have a single go-routine that listens to that channel and adds the data to the map. Here is my play.

Below is the example from the play:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var myFiles = []string{"file1", "file2", "file3"}
    var myMap = make(map[string][]byte)
    dataChan := make(chan fileData, len(myFiles))
    wg := sync.WaitGroup{}
    defer close(dataChan)
    // we create a wait group of N
    wg.Add(len(myFiles))
    for _, file := range myFiles {
        // we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
        // the file via the dataChan channel
        go getBytesFromFile(file, dataChan, &wg)
    }
    // we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
    wg.Wait()
    for i := 0; i < len(myFiles); i++ {
        // we can now read from the data channel N times.
        file := <-dataChan
        myMap[file.name] = file.bytes
    }
    fmt.Printf("%+v\n", myMap)
}

type fileData struct {
    name  string
    bytes []byte
}

// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
    bytes := openFileAndGetBytes(file)
    dataChan <- fileData{name: file, bytes: bytes}
    waitGroup.Done()
}

func openFileAndGetBytes(file string) []byte {
    return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}

Problem Statement

How can I use golang.org/x/sync/errgroup to wait on and handle errors from goroutines or if there is any better way like using semaphore? For example if any one of my go routine fails to read data from file then I want to cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller). And it should automatically waits for all the supplied go routines to complete successfully for success case.

I also don’t want to spawn 100 go-routines if total number of files is 100. I want to control the parallelism if possible if there is any way.

Solution

How can I use golang.org/x/sync/errgroup to wait on and handle errors from goroutines or if there is any better way like using semaphore? For example […] I want to cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller). And it should automatically waits for all the supplied go routines to complete successfully for success case.

There are many ways to communicate error states across goroutines. errgroup does a bunch of heavy lifting though, and is appropriate for this case. Otherwise you’re going to end up implementing the same thing.

To use errgroup we’ll need to handle errors (and for your demo, generate some). In addition, to cancel existing goroutines, we’ll use a context from errgroup.NewWithContext.

From the errgroup reference,

Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

Your play doesn’t support any error handling. We can’t collect and cancel on errors if we don’t do any error handling. So I added some code to inject error handling:

func openFileAndGetBytes(file string) (string, error) {
    if file == "file2" {
        return "", fmt.Errorf("%s cannot be read", file)
    }
    return fmt.Sprintf("these are some bytes for file %s", file), nil
}

Then that error had to be passed back from getBytesFromFile as well:

func getBytesFromFile(file string, dataChan chan fileData) error {
    bytes, err := openFileAndGetBytes(file)
    if err == nil {
        dataChan <- fileData{name: file, bytes: bytes}
    }
    return err
}

Now that we’ve done that, we can turn our attention to how we’re going to start up a number of goroutines.

I also don’t want to spawn 100 go-routines if total number of files is 100. I want to control the parallelism if possible if there is any way.

Written well, the number of tasks, channel size, and number of workers are typically independent values. The trick is to use channel closure – and in your case, context cancellation – to communicate state between the goroutines. We’ll need an additional channel for the distribution of filenames, and an additional goroutine for the collection of the results.

To illustate this point, my code uses 3 workers, and adds a few more files. My channels are unbuffered. This allows us to see some of the files get processed, while others are aborted. If you buffer the channels, the example will still work, but it’s more likely for additional work to be processed before the cancellation is handled. Experiment with buffer size along with worker count and number of files to process.

    var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
    fileChan := make(chan string)
    dataChan := make(chan fileData)

To start up the workers, instead of starting one for each file, we start the number we desire – here, 3.

    for i := 0; i < 3; i++ {
        worker_num := i
        g.Go(func() error {
            for file := range fileChan {
                if err := getBytesFromFile(file, dataChan); err != nil {
                    fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
                    return err
                } else if err := ctx.Err(); err != nil {
                    fmt.Println("worker", worker_num, "context error in worker:", err.Error())
                    return err
                }
            }
            fmt.Println("worker", worker_num, "processed all work on channel")
            return nil

        })
    }

The workers call your getBytesFromFile function. If it returns an err, we return an err. errgroup will cancel our context automatically in this case. However, the exact order of operations is not deterministic, so more files may or may not get processed before the context is cancelled. I’ll show several possibilties below.

by rangeing over fileChan, the worker automatically picks up end of work from the channel closure. If we get an error, we can return it to errgroup immediately. Otherwise, if the context has been cancelled, we can return the cancellation error immediately.

You might think that g.Go would automatically cancel our function. But it cannot. There is no way to cancel a running function in Go other than process termination. errgroup.Group.Go‘s function argument must cancel itself when appropriate based on the state of its context.

Now we can turn our attention to the thing that puts the files on fileChan. We have 2 options here: we can use a buffered channel of the size of myFiles, like you did. We can fill the entire channel with pending jobs. This is only an option if you know the number of jobs when you create the channel. The other option is to use an additional "distribution" goroutine that can block on writes to fileChan so that our "main" goroutine can continue.

    // dispatch files
    g.Go(func() error {
        defer close(fileChan)
        done := ctx.Done()
        for _, file := range myFiles {
            select {
            case fileChan <- file:
                continue
            case <-done:
                break
            }
        }
        return ctx.Err()
    })

I’m not sure it’s strictly necessary to put this in the same errgroup in this case, because we can’t get an error in the distributor goroutine. But this general pattern, drawn from the Pipeline example from errgroup, works regardless of whether the work dispatcher might generate errors.

This functions pretty simple, but the magic is in select along with ctx.Done() channel. Either we write to the work channel, or we fail if our context is done. This allows us to stop distributing work when one worker has failed one file.

We defer close(fileChan) so that, regardless of why we have finished (either we distributed all work, or the context was cancelled), the workers know there will be no more work on the incoming work queue (ie fileChan).

We need one more synchronization mechanism: once all the work is distributed, and all the results are in or work was finished being cancelled, (eg, after our errgroup’s Wait() returns), we need to close our results channel, dataChan. This signals the results collector that there are no more results to be collected.

    var err error // we'll need this later!
    go func() {
        err = g.Wait()
        close(dataChan)
    }()

We can’t – and don’t need to – put this in the errgroup.Group. The function can’t return an error, and it can’t wait for itself to close(dataChan). So it goes into a regular old goroutine, sans errgroup.

Finally we can collect the results. With dedicated worker goroutines, a distributor goroutine, and a goroutine waiting on the work and notifying that there will be no more writes to the dataChan, we can collect all the results right in the "primary" goroutine in main.

    for data := range dataChan {
        myMap[data.name] = data.bytes
    }
    if err != nil { // this was set in our final goroutine, remember
        fmt.Println("errgroup Error:", err.Error())
    }

I made a few small changes so that it was easier to see the output. You may already have noticed I changed the file contents from []byte to string. This was purely so that the results were easy to read. Pursuant also to that end, I am using encoding/json to format the results so that it is easy to read them and paste them into SO. This is a common pattern that I often use to indent structured data:

    enc := json.NewEncoder(os.Stdout)
    enc.SetIndent("", " ")
    if err := enc.Encode(myMap); err != nil {
        panic(err)
    }

Finally we’re ready to run. Now we can see a number of different results depending on just what order the goroutines execute. But all of them are valid execution paths.

worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
 "file1": "these are some bytes for file file1",
 "file3": "these are some bytes for file file3"
}

Program exited.

In this result, the remaining work (file4 and file5) were not added to the channel. Remember, an unbuffered channel stores no data. For those tasks to be written to the channel, a worker would have to be there to read them. Instead, the context was cancelled after file2 failed, and the distribution function followed the <-done path within its select. file1 and file3 were already processed.

Here’s a different result (I just ran the playground share a few times to get different results).

worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
 "file1": "these are some bytes for file file1",
 "file3": "these are some bytes for file file3",
 "file4": "these are some bytes for file file4",
 "file5": "these are some bytes for file file5",
 "file6": "these are some bytes for file file6"
}

In this case, it looks a little like our cancellation failed. but what really happened is that the goroutines just "happened" to queue and finish the rest of the work before errorgroup picked upon worker `’s failure and cancelled the context.

what errorgroup does

When you use errorgroup, you’re really getting 2 things out of it:

  • easily accessing the first error your workers returned;
  • getting a context that errorgroup will cancel for you when

Keep in mind that errorgroup does not cancel goroutines. This tripped me up a bit at first. Errorgroup cancels the context. It’s your responsibility to apply the status of that context to your goroutines (remember, the goroutine must end itself, errorgroup can’t end it).

A final aside about contexts with file operations, and failing outstanding work

Most of your file operations, eg io.Copy or os.ReadFile, are actually a loop of subsequent Read operations. But io and os don’t support contexts directly. so if you have a worker reading a file, and you don’t implement the Read loop yourself, you won’t have an opportunity to cancel based on context. That’s probably okay in your case – sure, you may have read some more files than you really needed to, but only because you were already reading them when the error occurred. I would personally accept this state of affairs and not implement my own read loop.

The code

https://go.dev/play/p/9qfESp_eB-C

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "os"

    "golang.org/x/sync/errgroup"
)

func main() {
    var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
    fileChan := make(chan string)
    dataChan := make(chan fileData)
    g, ctx := errgroup.WithContext(context.Background())
    for i := 0; i < 3; i++ {
        worker_num := i
        g.Go(func() error {
            for file := range fileChan {
                if err := getBytesFromFile(file, dataChan); err != nil {
                    fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
                    return err
                } else if err := ctx.Err(); err != nil {
                    fmt.Println("worker", worker_num, "context error in worker:", err.Error())
                    return err
                }
            }
            fmt.Println("worker", worker_num, "processed all work on channel")
            return nil

        })
    }
    // dispatch files
    g.Go(func() error {
        defer close(fileChan)
        done := ctx.Done()
        for _, file := range myFiles {
            if err := ctx.Err(); err != nil {
                return err
            }
            select {
            case fileChan <- file:
                continue
            case <-done:
                break
            }
        }
        return ctx.Err()
    })
    var err error
    go func() {
        err = g.Wait()
        close(dataChan)
    }()
    var myMap = make(map[string]string)

    for data := range dataChan {
        myMap[data.name] = data.bytes
    }
    if err != nil {
        fmt.Println("errgroup Error:", err.Error())
    }
    enc := json.NewEncoder(os.Stdout)
    enc.SetIndent("", " ")
    if err := enc.Encode(myMap); err != nil {
        panic(err)
    }
}

type fileData struct {
    name,
    bytes string
}

func getBytesFromFile(file string, dataChan chan fileData) error {
    bytes, err := openFileAndGetBytes(file)
    if err == nil {
        dataChan <- fileData{name: file, bytes: bytes}
    }
    return err
}

func openFileAndGetBytes(file string) (string, error) {
    if file == "file2" {
        return "", fmt.Errorf("%s cannot be read", file)
    }
    return fmt.Sprintf("these are some bytes for file %s", file), nil
}

Answered By – Daniel Farrell

Answer Checked By – Mildred Charles (GoLangFix Admin)

Leave a Reply

Your email address will not be published.