Go deadlock all goroutines asleep

Issue

This is a follow up to my earlier post:

    http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825

I’m still having trouble figuring out where the channels should be closed, after reading multiple topics and articles both on and off SO.

This program will open a list of files, create an output file for each input file (with the same name),visit all the urls in each input file and get all href links from these – which are saved to the corresponding output file.
However, I’m getting the following error:

    http://play.golang.org/p/8X-1rM3aXC

The linkgetter, and getHref functions are mainly for processing. Head and tail are run as separate goroutines, and worker does the processing.

    package main

    import (
    "bufio"
    "bytes"
    "fmt"
    "golang.org/x/net/html"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "regexp"
    "sync"
    )

    type Work struct {
    Link     string
    Filename string
    }

    type Output struct {
    Href     string
    Filename string
    }

    func getHref(t html.Token) (href string, ok bool) {
    // Iterate over all of the Token's attributes until we find an    "href"
    for _, a := range t.Attr {
            if a.Key == "href" {
                    href = a.Val
                    ok = true
            }
    }
    return
    }

    func linkGetter(out chan<- Output, r io.Reader, filename string) {
    z := html.NewTokenizer(r)
    for {
            tt := z.Next()
            switch {
            case tt == html.ErrorToken:
                    return
            case tt == html.StartTagToken:
                    t := z.Token()
                    isAnchor := t.Data == "a"
                    if !isAnchor {
                            continue
                    }

                    // Extract the href value, if there is one
                    url, ok := getHref(t)
                    if !ok {
                            continue
                    }

                    out <- Output{url, filename}
            }
    }
    }

    func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup)    {
    defer wg.Done()
    for work := range in {
            resp, err := http.Get(work.Link)
            if err != nil {
                    continue
            }
            body, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                    continue
            }
            if err = resp.Body.Close(); err != nil {
                    fmt.Println(err)
            }
            linkGetter(out, bytes.NewReader(body), work.Filename)
    }
    }

    func head(c chan<- Work) {
    r, _ := regexp.Compile("(.*)(?:.json)")
    files, _ := filepath.Glob("*.json")

    for _, elem := range files {
            res := r.FindStringSubmatch(elem)
            for k, v := range res {

                    if k == 0 {
                            outpath, _ :=  filepath.Abs(fmt.Sprintf("go_tester/%s", v))

                            abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
                            f, _ := os.Open(abspath)
                            scanner := bufio.NewScanner(f)

                            for scanner.Scan() {
                                    c <- Work{outpath, scanner.Text()}
                            }

                    }
            }

    }


    }

    func tail(c <-chan Output) {
    currentfile := ""
    var f *os.File
    var err error
    for out := range c {
            if out.Filename != currentfile {
                    if err = f.Close(); err != nil { // might cause an error on first run
                            fmt.Println(err)
                    }
                    f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
                    if err != nil {
                            log.Fatal(err)
                    }
                    currentfile = out.Filename
            }
            if _, err = f.WriteString(out.Href + "\n"); err != nil {
                    fmt.Println(err)
            }
    }

    }

    const (
    nworkers = 80
    )

    func main() {
    //fmt.Println("hi")
    in := make(chan Work)
    out := make(chan Output)

    go head(in)
    go tail(out)

    var wg sync.WaitGroup
    for i := 0; i < 85; i++ {
            wg.Add(1)
            go worker(out, in, &wg)
    }
    close(in)   
    close(out)    
    wg.Wait()


    }

What is wrong with the way the channels are closed?

Solution

You’re not really paying attention to your pipeline design here. You have to ask yourself “When is section X done? What should happen when it is done? What happens after it is done?” for every section of the pipeline.

You start up head, tail, and worker to range over channels. The only way these functions are going to return successfully is if these channels are closed.

Draw it out of you need to.

  1. head(in) feeds in to in
  2. worker(out, in, &wg) ranges over in, feeds into out, and tells you it is done with wg once in is closed
  3. tail(out) ranges over out

So what do you need to do to:

  1. Make sure all input is processed?
  2. Make sure all goroutines return?

Like so:

  1. You need to close in from head once it is done processing all of the files.
  2. This will cause worker to actually return once all items it can get from in are processed, causing wg.Wait() to return
  3. it is now safe to close out since nothing is feeding in to it and this will cause tail to eventually return.

But you’ll probably need another sync.WaitGroup associated with tail for this particular design because the whole program will exit right when wg.Wait() returns, thus possibly not finishing all of the work that tail is doing. See here. Specifically:

Program execution begins by initializing the main package and then
invoking the function main. When that function invocation returns, the
program exits. It does not wait for other (non-main) goroutines to
complete.

You’ll probably also want to use buffered channels referenced here to aid in not having switch execution between goroutines so much. With your current design you’re wasting a lot of time with context switching.

Answered By – user3591723

Answer Checked By – Candace Johnson (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.