Where should I close channel on this specific example?

Issue

I am writing just a simple Go pipeline, the goal is to fetch the urls and print status.

On fetchUrl, I need to close out channel to notify main, there will be no data comming so release the main go routine. However i can’t really close channel on fetchurl function after loop because it will be too soon. I don’t want to add wait groups the application because whole goal is for me at the moment understand channels.

On fetchurl function, channel called two is for just making sure there will be only 2 jobs at once.

package main

import (
    "fmt"
    "net/http"
    "os"
)

func gen(val []string) <-chan string {
    out := make(chan string, len(val))
    for _, val := range val {
        out <- val
    }
    close(out)
    return out
}

func fetchUrl(in <-chan string) <-chan string {
    out := make(chan string)
    two := make(chan struct{}, 2)
    fmt.Println("blocked")
    for url := range in {
        two <- struct{}{}
        go fetchWorker(url, two, out)
    }

    return out
}

func fetchWorker(url string, two chan struct{}, out chan string) {
    res, err := http.Get("https://" + url)
    if err != nil {
        panic(err)
    }
    <-two
    out <- fmt.Sprintf("[%d] %s\n", res.StatusCode, url)
}

func main() {
    for val := range fetchUrl(gen(os.Args[1:])) {
        fmt.Println(val)
    }
}

Solution

You need to close the out channel after every result has been written to it. The easiest way to tell this is when all of the worker goroutines have exited, and the easiest way to tell that in turn is by using a sync.WaitGroup. (In Go, channels and goroutines are very closely related concepts, so goroutine management is part of working with channels.)

In the existing code, you can tie that into your fetchUrl function:

var wg sync.WaitGroup
for url := range in {
    two <- struct{}{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        fetchWorker(url, two, out)
    }()
}
wg.Wait()
close(out)

The other structural problem you’ll have with your code as written is that both gen and fetchUrl create channels, run all of the code that is supposed to write to the channels, and return the channel only after those writers finish; since nothing can read from the channel before the function returns, this will lead to deadlocks. You can get around this by creating all of the channels at the top level and passing them into the generator functions.

If you want exactly two workers reading from the same queue of URLs, a standard pattern is to just launch two goroutines reading and writing from the same channels. For example, you could rewrite fetchWorker as

func fetchWorker(urls <-chan string, out chan<- string) {
    for url := range urls {
        res, err := http.Get("https://" + url)
        if err != nil {
            panic(err)
        }
        out <- fmt.Sprintf("[%d] %s\n", res.StatusCode, url)
    }
}

At the top level, create the channels, create the workers, feed the input, and consume the output.

func main() {
    urls := make(chan string)
    out := make(chan string)

    // Launch a goroutine to feed data into urls, then
    // close(urls), then stop
    go gen(os.Args[1:], urls)

    // Launch worker goroutines
    workerCount := 2
    var wg sync.WaitGroup
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fetchWorker(urls, out)
        }()
    }

    // Launch a dedicated goroutine to close the channel
    go func() {
        wg.Wait()
        close(out)
    }()

    // Read the results
    for result := range(out) {
        fmt.Println(result)
    }
}

Answered By – David Maze

Answer Checked By – Cary Denson (GoLangFix Admin)

Leave a Reply

Your email address will not be published.