Reading from non buffered channels

Issue

I am trying to understand non buffered channels, so I have written a small application that iterates through an array of user input, does some work, places info on a non buffered channel and then reads it. However, I’m not able to read from the channels.
This is my code

toProcess := os.Args[1:]

var wg sync.WaitGroup
results := make(chan string)
errs := make(chan error)

for _, t := range toProcess {
    wg.Add(1)
    go Worker(t, "text", results, errs, &wg)
}


go func() {
    for err := range errs {
        if err != nil {
            fmt.Println(err)
        }
    }
}()


go func() {
    for res := range results {
        fmt.Println(res)
    }
}()

What am I not understanding about non buffered channels? I thought I should be placing information on it, and have another go routine reading from it.

EDIT: using two goroutines solves the issues, but it still gives me the following when there are errors:

open /Users/roosingh/go/src/github.com/nonbuff/files/22.txt: no such file or directory
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc42001416c)
    /usr/local/Cellar/go/1.10.2/libexec/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc420014160)
    /usr/local/Cellar/go/1.10.2/libexec/src/sync/waitgroup.go:129 +0x72
main.main()
    /Users/roosingh/go/src/github.com/nonbuff/main.go:39 +0x207

goroutine 6 [chan receive]:
main.main.func1(0xc4200780c0)
    /Users/roosingh/go/src/github.com/nonbuff/main.go:25 +0x41
created by main.main
    /Users/roosingh/go/src/github.com/nonbuff/main.go:24 +0x1d4

goroutine 7 [chan receive]:
main.main.func2(0xc420078060)
    /Users/roosingh/go/src/github.com/nonbuff/main.go:34 +0xb2
created by main.main
    /Users/roosingh/go/src/github.com/nonbuff/main.go:33 +0x1f6

So it is able to print out the error message.
My worker code is as follows;

func Worker(fn string, text string, results chan string, errs chan error, wg *sync.WaitGroup) {
    file, err := os.Open(fn)
    if err != nil {
        errs <- err
        return
    }
    defer func() {
        file.Close()
        wg.Done()
    }()

    reader := bufio.NewReader(file)


    for {
        var buffer bytes.Buffer

        var l []byte
        var isPrefix bool
        for {
            l, isPrefix, err = reader.ReadLine()
            buffer.Write(l)

            if !isPrefix {
                break
            }

            if err != nil {
                errs <- err
                return

            }
        }

        if err == io.EOF {
            return
        }

        line := buffer.String()

        results <- fmt. Sprintf("%s, %s", line, text)

    }

    if err != io.EOF {
        errs <- err
        return
    }

    return
}

Solution

As for unbuffered channels, you seem to understand the concept, meaning it’s used to pass messages between goroutines but cannot hold any. Therefore, a write on an unbuffered channel will block until another goroutine is reading from the channel and a read from a channel will block until another goroutine writes to this channel.

In your case, you seem to want to read from 2 channels simultaneously in the same goroutine. Because the way channels work, you cannot range on a non closed channel and further down in the same goroutine range on another channel. Unless the first channel gets closed, you won’t reach the second range.

But, it doesn’t mean it’s impossible! This is where the select statement comes in.

The select statement allows you to selectively read from multiple channels, meaning that it will read the first one that has something available to be read.

With that in mind, you can use the for combined with the select and rewrite your routine this way:

go func() {
    for {
        select {
            case err := <- errs: // you got an error
                fmt.Println(err) 
            case res := <- results: // you got a result
                fmt.Println(res)
        }
    }
}()

Also, you don’t need a waitgroup here, because you know how many workers you are starting, you could just count how many errors and results you get and stop when you reach the number of workers.

Example:

go func() {
    var i int
    for {
        select {
            case err := <- errs: // you got an error
                fmt.Println(err)
                i++
            case res := <- results: // you got a result
                fmt.Println(res)
                i++
        }
        // all our workers are done
        if i == len(toProcess) {
            return 
        }
    }
}()

Answered By – Francois

Answer Checked By – Katrina (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.