Problem synchronizing composable goroutines by reading data from file with scanner.Scan()

Issue

I’m building a filtering pipeline with channels by function composition instead of 120+LOC all in a single method
since I might reuse some part of this pipeline later.

I’m not been able to make it work as intended. I suspect that the funcion readValuesFromFile
is exiting before the scanner.Scan() puts a value in the inputStream channel (ie
that method’s main goroutine is exiting before (1) goroutine).

If I replace the scanner.Scan() with just putting some random string in the channel
the whole pipeline works as expected.

Is this the problem or I’m missing something?

How can this be fixed in an elegant way?

Thanks!

func readValuesFromFile(filename string) <-chan string {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    inputStream := make(chan string)

    go func() { //(1)
        count := 0
        scanner := bufio.NewScanner(file)
        for scanner.Scan() { // (2)
            inputStream <- strings.TrimSpace(scanner.Text())
            count = count + 1
        }
        
        close(inputStream)
    }()
    return inputStream
}

func validateValues(inputStream <-chan string) <-chan string {
    //read from the input stream + validate&filter + creating and putting values in an output stream
}

func writeResults(validStream <-chan string) {
    //read from the validated stream and write data to file
}

func main() {
    valueStream := readValuesFromFile("myfile.txt")
    validatedStream := validateValues(valueStream)
    
    writeResults(validatedStream)
}

Solution

The function readValuesFromFile is guaranteed to return before the first value is sent to inputStream. Communication on the unbuffered channel inputStream does not succeed until a sender and receiver are ready. There is no receive on inputStream until after readValuesFromFile returns, therefore the send from the goroutine will not succeed until after readValuesFromFile returns.

When the function readValuesFromFile returns, the defer statement closes the file used by the scanner. It’s possible that the scanner buffers some data before the file is closed underneath it, but it’s also possible that the scanner does not read any data.

Fix by closing the file from the goroutine.

The error returned from the scanner describes problem. Always handle errors.

func readValuesFromFile(filename string) <-chan string {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }

    inputStream := make(chan string)

    go func() {
        defer file.Close()
        defer close(inputStream)
        count := 0
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            inputStream <- strings.TrimSpace(scanner.Text())
            count = count + 1
        }
        if scanner.Err() != nil {
            // Handle error as appropriate for your application.
            log.Print("scan error", err)
        }
    }()

    return inputStream
}

Answered By – Bayta Darell

Answer Checked By – Candace Johnson (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.