Undefined behaviour while loading a large CSV concurrently using Goroutines

Issue

I am trying to load a big CSV file using goroutines using Golang. The dimension of the csv is (254882, 100). But using my goroutines when I am parsing the csv and storing it into an 2D list, I am getting rows lesser than 254882 and the number is varying for each run. I feel it is happening due goroutines but can’t seem to point the reason. Can anyone please help me. I am also new in Golang. Here is my code below

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

And my main function looks like the following

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}

If anyone needs to download the CSV too, then click the link below (485 MB)
https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing

Solution

There is no need to use *[][]float64 as that would be a double pointer.

I have made some minor modifications to your program.

dataset is available to new goroutine, since it’s declared in it’s above block of code.

Similarly record is also available, but since record variable, is changing from time to time, we need to pass it to new goroutine.

While there is no need to pass dataset, as it is not changing and that is what we want, so that we can append temp to dataset.

But race condition happens when multiple goroutines are trying to append to same variable, i.e., multiple goroutines are trying to write to same variable.

So we need to make sure that only one can goroutine can add at any instance of time.
So we use a lock to make appending sequential.

package main

import (
    "bufio"
    "encoding/csv"
    "fmt"
    "os"
    "strconv"
    "sync"
)

func loadCSV(csvFile string) [][]float64 {
    var dataset [][]float64

    f, _ := os.Open(csvFile)

    r := csv.NewReader(f)

    var wg sync.WaitGroup
    l := new(sync.Mutex) // lock

    for record, err := r.Read(); err == nil; record, err = r.Read() {
        wg.Add(1)

        go func(record []string) {
            defer wg.Done()

            var temp []float64
            for _, each := range record {
                if f, err := strconv.ParseFloat(each, 64); err == nil {
                    temp = append(temp, f)
                }
            }
            l.Lock() // lock before writing
            dataset = append(dataset, temp) // write
            l.Unlock() // unlock

        }(record)
    }

    wg.Wait()

    return dataset
}

func main() {
    dataset := loadCSV("train.csv")
    fmt.Println(len(dataset))
}

Some errors were not handled to make it minimal, but you should handle errors.

Answered By – nilsocket

Answer Checked By – Jay B. (GoLangFix Admin)

Leave a Reply

Your email address will not be published.