How to efficiently parallelize array list and control the parallelism?

Issue

I have a resourceId array which I need loop in parallel. And generate URL for each resource and then put inside a map which is key (resourcId) and value is url.

I got below code which does the job but I am not sure if this is the right way to do it. I am using sizedwaitgroup here to parallelize the resourceId list. And also using lock on map while writing the data to it. I am sure this isn’t efficient code as using lock and then using sizedwaitgroup will have some performance problem.

What is the best and efficient way to do this? Should I use channels here? I want to control the parallelism on how much I should have instead of running length of resourceId list. If any resourceId url generation fails, I want to log that as an error for that resourceId but do not disrupt other go routine running in parallel to get the url generated for other resourceId.

For example: If there are 10 resources, and 2 fails then log error for those 2 and map should have entry for remaining 8.

// running 20 threads in parallel
swg := sizedwaitgroup.New(20)
var mutex = &sync.Mutex{}
start := time.Now()
m := make(map[string]*customerPbV1.CustomerResponse)
for _, resources := range resourcesList {
  swg.Add()
  go func(resources string) {
    defer swg.Done()
    customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
    if err != nil {
      errs.NewWithCausef(err, "Could not generate the url for %s", resources)
    }
    mutex.Lock()
    m[resources] = customerUrl
    mutex.Unlock()
  }(resources)
}
swg.Wait()

elapsed := time.Since(start)
fmt.Println(elapsed)

Note: Above code will be called at high throughput from multiple reader threads so it needs to perform well.

Solution

I’m not sure what sizedwaitgroup is and it’s not explained, but overall this approach doesn’t look very typical of Go. For that matter, "best" is a matter of opinion, but the most typical approach in Go would be something along these lines:

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results)
    }
    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}

(Though, based on the name, I would assume errs.NewWithCause doesn’t actually handle errors, but returns one, in which case the current code is dropping them on the floor, and a proper solution would have an additional chan error for handling errors:

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    errors := make(chan error)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results, errors)
    }

    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
        close(errors)
    }()

    go func() {
        for err := range errors {
            // Do something with err
        }
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result, errs chan error) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs <- errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}

Answered By – Adrian

Answer Checked By – Clifford M. (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.