How to properly delay between executing a pool of workers

Issue

Good day,

I’m trying to implement the correct delay between the execution of workers, for example, it is necessary for the workers to complete 30 tasks and go to sleep for 5 seconds, how can I track in the code that exactly 30 tasks have been completed and only after that go to sleep for 5 seconds?

Below is the code that creates a pool of 30 workers, who, in turn, perform tasks of 30 pieces at a time in an unordered manner, here is the code:


import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    id       int
    randomno int
}
type Result struct {
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }

    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        if i != 0 && i%30 == 0 {
            fmt.Printf("SLEEPAGE 5 sec...")
            time.Sleep(10 * time.Second)

        }
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 30
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

play: https://go.dev/play/p/lehl7hoo-kp

It is not clear exactly how to make sure that 30 tasks are completed and where to insert the delay, I will be grateful for any help

Solution

Okey, so let’s start with this working example:

func Test_t(t *testing.T) {

    // just a published, this publishes result on a chan
    publish := func(s int, ch chan int, wg *sync.WaitGroup) {
        ch <- s // this is blocking!!!
        wg.Done()
    }

    wg := &sync.WaitGroup{}
    wg.Add(100)

    // we'll use done channel to notify the work is done
    res := make(chan int)
    done := make(chan struct{})
    // create worker that will notify that all results were published
    go func() {
        wg.Wait()
        done <- struct{}{}
    }()
    
    // let's create a jobs that publish on our res chan
    // please note all goroutines are created immediately
    for i := 0; i < 100; i++ {
        go publish(i, res, wg)
    }

    // lets get 30 args and then wait
    var resCounter int
forloop:
    for {
        select {
        case ss := <-res:
            println(ss)
            resCounter += 1
            // break the loop
            if resCounter%30 == 0 {
                // after receiving 30 results we are blocking this thread
                // no more results will be taken from the channel for 5 seconds
                println("received 30 results, waiting...")
                time.Sleep(5 * time.Second)
            }
        case <-done:
            // we are done here, let's break this infinite loop
            break forloop
        }
    }
}

I hope this shows moreover how it can be done.

So, what’s the problem with your code?
To be honest, it looks fine (I mean 30 results are published, then the code wait, then another 30 results, etc.), but the question is where would you like to wait?

There are a few possibilities I guess:

  • creating workers (this is how your code works now, as I see, it publishes jobs in 30-packs; please notice that the 2-second delay you have in the digit function is applicable only to the goroutine the code is executed)

  • triggering workers (so the "wait" code should be in worker function, not allowing to run more workers – so it must watch how many results were published)

  • handling results (this is how my code works and proper synchronization is in the forloop)

Answered By – Cililing

Answer Checked By – Marilyn (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.