Populate concurrent map while iterating parquet files in parallel efficiently

Issue

I am working with parquet file where I am reading all those files in parallel and populating concurrent map after going through all those rows inside each file. Total number of files are 50 and each file size is around 60MB max.

I need to parallelize my for loop which read all these parquet files in parallel and also populate map by reading all these parquet files in parallel. These concurrent maps inside data struct will be read by multiple reader threads concurrently and also written by multiple writers in parallel inside for loop. I want to make sure they are safe and operation is atomic. I also got getter method to access those concurrent map.

Below is the code I got but I am not sure if this is the right way to parallelize it or I missing something very basics?

import (
//....
pars3 "github.com/xitongsys/parquet-go-source/s3"
"github.com/xitongsys/parquet-go/reader"
cmap "github.com/orcaman/concurrent-map"
//....
)

type Data interface {
    GetCustomerMap() *cmap.ConcurrentMap
    GetCustomerMetricsMap() *cmap.ConcurrentMap
}

type data struct {
    // will have getter methods to access these below map
    customers          *cmap.ConcurrentMap
    customerMetrics    *cmap.ConcurrentMap
}

//loadParquet.. This will be called by background thread periodically
func (r *data) loadParquet(path string, bucket string, key string) error {
    var err error
    var files []string
    files, err = r.s3Client.ListObjects(bucket, key, ".parquet")
    if err != nil {
        return err
    }

    var waitGroup sync.WaitGroup 
    // Set number of effective goroutines we want to wait upon 
    waitGroup.Add(len(files)) 

    // parallelize below for loop in such a way so that I can populate my map in thread safe way?
    // And same map will be accessed by multiple reader threads too.
    // This writes to our map happens from background thread but there are lot of reader threads reading from the map.
    for i, file := range files {
        err = func() error {
            fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
            if err != nil {
                return errs.Wrap(err)
            }
            defer xio.CloseIgnoringErrors(fr)

            pr, err := reader.NewParquetReader(fr, nil, 4)
            if err != nil {
                return errs.Wrap(err)
            }

            // confuse on this for loop?
            // do we need to parallelize here too?
            for {
                rows, err := pr.ReadByNumber(100)
                if err != nil {
                    return errs.Wrap(err)
                }

                if len(rows) <= 0 {
                    break
                }

                byteSlice, err := json.Marshal(rows)
                if err != nil {
                    return errs.Wrap(err)
                }

                var rows []ParquetProduct
                err = json.Unmarshal(byteSlice, &rows)
                if err != nil {
                    return errs.Wrap(err)
                }

                // read rows struct and put inside concurrent map.
                // Need to populate map in such a way so that it is atomic and thread safe
                // from multiple parallel writes inside this for loop 
                // and multiple reads will happen from reader threads on these maps
                for i := range rows {
                  // ...
                  // ...
                  r.customers.Set(.....)
                  r.customerMetrics.Set(....)
                }
            }
            return nil
        }()

        if err != nil {
            return err
        }

        go task(&waitGroup) // Achieving maximum concurrency 
    }
    // Wait until all goroutines have completed execution. 
    waitGroup.Wait()  
    return nil
}

//GetCustomerMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMap() *cmap.ConcurrentMap {
    return r.customers
}

//GetCustomerMetricsMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMetricsMap() *cmap.ConcurrentMap {
    return r.customerMetrics
}

I am using this parquet library in go to read files.

Solution

Concurrently accessing a map tends to be an anti-pattern in Golang, and will generally require the usage of locks to prevent panics, eg: Golang fatal error: concurrent map writes.

Instead, you could use a chan (channel) type, have each go routine that is reading a file write its data to that channel, then have a single go-routine that listens to that channel and adds the data to the map. In this approach, there is only a single reader/writer to the map, no locks, and each go-routine is not blocked when writing its results (if the channel is buffered).

An example of this pattern can be seen here: https://play.golang.com/p/u-uYDaWiQiD

Replace doWork() with the function of yours that reads files, and listen to the output channel for bytes, files, whatever type you wish, so you can then place them into your map.

Answered By – sbrichards

Answer Checked By – Terry (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.