I want to split a file into equally sized "chunks", or slices and use goroutines to process them simultaneously

Issue

Using Go, I have large log files. Currently I open them, create a new scanner bufio.NewScanner, and then for scanner.Scan() to loop through the lines. Each line is sent through a processing function, which matches it to regular expressions and extracts data. I would like to process this file in chunks simultaneously using goroutines. I believe this may be quicker than looping through the whole file sequentially.

It can take a few seconds per file, and I’m wondering if I can process a single file in, say, 10 pieces at a time. I believe I can sacrifice the memory if needed. I have ~3gb, and the biggest log file is maybe 75mb.

I see that a scanner has a .Split() method, where you can provide a custom split function, but I wasn’t able to find a good solution using this method.

I’ve also tried creating a slice of slices, looping through the scanner with scanner.Scan() and appending scanner.Text() to each slice.
eg:

// pseudocode because I couldn't get this to work either

scanner := bufio.NewScanner(logInfo)
threads := [[], [], [], [], []]

i := 0
for scanner.Scan() {
    i = i + 1
    if i > 5 {
        i = 0
    }
    threads[i] = append(threads[i], scanner.Text())
}
fmt.Println(threads)

I’m new to Go and concerned about efficiency and performance. I want to learn how to write good Go code! Any help or advice is really appreciated.

Solution

Peter gives a good starting point, if you wanted to do something like a fan-out, fan-in pattern you could do something like:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "sync"
)

func main() {
    file, err := os.Open("/path/to/file.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    lines := make(chan string)
    // start four workers to do the heavy lifting
    wc1 := startWorker(lines)
    wc2 := startWorker(lines)
    wc3 := startWorker(lines)
    wc4 := startWorker(lines)
    scanner := bufio.NewScanner(file)
    go func() {
        defer close(lines)
        for scanner.Scan() {
            lines <- scanner.Text()
        }

        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    }()

    merged := merge(wc1, wc2, wc3, wc4)
    for line := range merged {
        fmt.Println(line)
    }
}

func startWorker(lines <-chan string) <-chan string {
    finished := make(chan string)
    go func() {
        defer close(finished)
        for line := range lines {
            // Do your heavy work here
            finished <- line
        }
    }()
    return finished
}

func merge(cs ...<-chan string) <-chan string {
    var wg sync.WaitGroup
    out := make(chan string)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan string) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Answered By – Elbert Fliek

Answer Checked By – Senaida (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.