Recursive calls from function started as goroutine & Idiomatic way to continue caller when all worker goroutines finished

Issue

I am implementing a (sort of a) combinatorial backtracking algorithm in go utilising goroutines. My problem can be represented as a tree with a certain degree/spread where I want to visit each leaf and calculate a result depending on the path taken. On a given level, I want to spawn goroutines to process the subproblems concurrently, i.e. if I have a tree with degree 3 and I want to start the concurrency after level 2, I’d spawn 3*3=9 goroutines that proceed with processing the subproblems concurrently.

func main() {
    cRes := make(chan string, 100)
    res := []string{}
    numLevels := 5
    spread := 3
    startConcurrencyAtLevel := 2
    nTree("", numLevels, spread, startConcurrencyAtLevel, cRes)
    for {
        select {
        case r := <-cRes:
            res = append(res, r)
        case <-time.After(10 * time.Second):
            fmt.Println("Caculation timed out")
            fmt.Println(len(res), math.Pow(float64(spread), float64(numLevels)))
            return
        }
    }
}

func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string) {
    if len(path) == maxLevels {
        // some longer running task here associated with the found path, also using a lookup table
        // real problem actually returns not the path but the result if it satisfies some condition
        cRes <- path
        return
    }
    for i := 1; i <= spread; i++ {
        nextPath := path + fmt.Sprint(i)
        if len(path) == startConcurrencyAtLevel {
            go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
        } else {
            nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
        }
    }
}

The above code works, however I rely on the for select statement timing out. I am looking for a way to continue with main() as soon as all goroutines have finished, i.e. all subproblems have been processed.

I already came up with two possible (unpreferred/unelegant) solutions:

  • Using a mutex protected result map + a waitgroup instead of a channel-based approach should do the trick, but I’m curious if there is a neat solution with channels.

  • Using a quit channel (of type int). Every time a goroutine is spawned, the quit channel gets a +1 int, everytime a comptutation finished in a leaf, it gets a -1 int and the caller sums up the values. See the following snippet, this however is not a good solution as it (rather blatantly) runs into timing issues I don’t want to deal with. It quits prematurely if for instance the first goroutine finishes before another one has been spawned.

    for {
        select {
        case q := <-cRunningRoutines:
            runningRoutines += q
            if runningRoutines == 0 {
                fmt.Println("Calculation complete")
                return res
            }
        // ...same cases as above
    }

Playground: https://go.dev/play/p/9jzeCvl8Clj

Following questions:

  1. Is doing recursive calls from a function started as a goroutine to itself a valid approach?
  2. What would be an idiomatic way of reading the results from cRes until all spawned goroutines finish? I read somewhere that channels should be closed when computation is done, but I just cant wrap my head around how to integrate it in this case.

Happy about any ideas, thanks!

Solution

As mentioned by torek, I spun off an anonymous function closing the channel after the waitgroup finished waiting. Also needed some logic around calling the wg.Done() of the spawned goroutines only after the the recursion of the goroutine spawning level returns.

Generally I think this is a useful idiom (correct me if I’m wrong :))

Playground: https://go.dev/play/p/bQjHENsZL25

func main() {
    cRes := make(chan string, 100)
    numLevels := 3
    spread := 3
    startConcurrencyAtLevel := 2
    var wg sync.WaitGroup
    nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)

    go func() {
        time.Sleep(1 * time.Second)
        wg.Wait()
        close(cRes)
    }()

    for r := range cRes {
        fmt.Println(r)
    }

    fmt.Println("Done!")
}

func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {
    if len(path) == maxLevels {
        // some longer running task here associated with the found path
        cRes <- path
        return
    }
    for i := 1; i <= spread; i++ {
        nextPath := path + fmt.Sprint(i)
        if len(path) == startConcurrencyAtLevel {
            go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
        } else {
            nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
        }
    }
}

Answered By – t-h-

Answer Checked By – Marilyn (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.