Iterate over inputs and store outputs using a fixed number of goroutines

Issue

I’m doing something that must be a common pattern, but I can’t see the way this should be handled.

In this contrived example, I’ve got a function which counts the letters in a string, and I want it to run on every element in a slice, and store the results in a map, so

[]string = {"one", "two", "three"}

yields

map[string]int = {"one":3, "two":3, "three":5}

I’m using the guard pattern, to ensure only a cores number of goroutines are running at any one time (I figure it must be ideal to have the number of concurrent goroutines set to be the number of virtual processors on the system?)

const cores int = 2

var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}

type result struct {
    name string
    res  int
}

func count_letters(word string, cGuard chan struct{}, cResults chan result) {
    time.Sleep(1 * time.Second)
    fmt.Println(word)
    <-cGuard
    cResults <- result{word, len(word)}
}

func main() {
    cGuard := make(chan struct{}, cores)
    cResults := make(chan result, cores)

    mResults := map[string]int{}

    for _, name := range words {
        cGuard <- struct{}{}
        // Need to populate mResults with the output from cResults 
        go count_letters(name, cGuard, cResults)
    }
    fmt.Scanln()
}

This works, but I’m not sure how to get the result structs out of the cResults channel to populate the map inline.

I could set the buffer size of cResults to len(words), and then wait until the for loop is finished, and pull them all out afterwards, but that seems very inelegant, and an issue if the length of words is very big?

Solution

For this specific use case a worker pool pattern would be a better fit.

In your example you start a seperate goroutine for each word, while go can handle this it is not very efficient since the runtime has to spin up a new go routine and stop the old one, all the while keeping track of all of them.

With a worker pool we start exactly the amount of goroutines as we want, and we give the workers tasks via a channel. This cuts out a lot of overhead the workers are always the same goroutines. Collection of the results are also done with a channel. And use a WaitGroup to make sure we don’t terminate before all workers are done.

This is the worker pool version of your example:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 2 for testing, in the real world runtime.NumCPU() would be used
const cores int = 2

var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}

type result struct {
    name string
    res  int
}

func count_letters(wg *sync.WaitGroup, cWords chan string, cResults chan result) {
    // Tell the waitgroup we are done once we return
    defer wg.Done()

    // Read from cWords until it is closed, at which point we return
    for word := range cWords {
        time.Sleep(1 * time.Second)
        cResults <- result{word, len(word)}
    }
}

func main() {
    cWords := make(chan string)
    cResults := make(chan result)

    // This waitgroup will later be used to wait for all worker to be done
    var wg sync.WaitGroup
    for i := 0; i < cores; i++ {
        // Add 1 before starting the goroutine
        wg.Add(1)
        go count_letters(&wg, cWords, cResults)
    }

    // Collect the results via a goroutine, since we need to submit tasks and collect results at the same time
    mResults := map[string]int{}
    go func() {
        for result := range cResults {
            mResults[result.name] = result.res
        }
    }()

    // Insert all words into the cWords chan
    for _, word := range words {
        cWords <- word
    }

    // After all words have been inserted, close the channel, this will cause the workers to exit
    // once all words have been read from the channel
    close(cWords)
    // Wait for all workers to be done
    wg.Wait()
    // Close the results chan, this will terminate our collection go routine, good practice but not necessary in this
    // specific example
    close(cResults)

    // Use the results
    fmt.Println(mResults)
}

Answered By – Dylan Reimerink

Answer Checked By – Mildred Charles (GoLangFix Admin)

Leave a Reply

Your email address will not be published.