Golang – to perform X task in N unit of time

Issue

I am trying to simulate a program it need to perform X task in tasks in N seconds and discard other requests of work

I am trying to use timer with select in an infinite loop when the value on <-timer.C is received, I am continuing the simulated task with all data in the slice which is guarded against a maxLimit and reseting the timer again to initial duration

Here is the code

     type Pool struct {
            // The maximum number of items allowed in pool
            maxSize int
            //the queue to hold the data
            queue []interface{}
            // time window for this pool
            time time.Duration
            //timer for the batch
            timer *time.Timer
            //channel
            ch chan interface{}
        }
        
        func NewPool(maxSize int, t int32) *Pool {
            p := &Pool{
                maxSize: maxSize,
                size:    0,
                queue:   make([]interface{}, maxSize),
                time:    time.Duration(t * int32(time.Second)),
                timer:   time.NewTimer(time.Duration(t) * time.Second),
                ch:      make(chan interface{}),
            }
            go p.Schedule()
            return p
        }
        
        func (p *Pool) Add(ele interface{}) {
            p.ch <- ele
        }
        
        func (p *Pool) Schedule() {
            for {
                select {
                case <-p.timer.C:
                    fmt.Println("Time is over")
                    p.queue = make([]interface{}, 0)
                    p.timer.Reset(p.time)
                    p.flush()
                case data := <-p.ch:
                    if len(p.queue) < p.maxSize {
                        fmt.Println("Addding")
                        p.queue = append(p.queue, data)
        
                    }
                    //p.flush()
                    if !p.timer.Stop() {
                        <-p.timer.C
                    }
                    p.queue = make([]interface{}, 0)
        
                }
        
            }
        
        }
        
        func (p *Pool) flush() {
            for _, t := range p.queue {
                fmt.Println("simulate some work here", t)
                time.Sleep(500 * time.Millisecond)
            }
        }

func main() {
    p := NewPool(5, 20)
    for i := 0; i < 10000; i++ {
        p.Add("xyz " + fmt.Sprint(i))
    }

}

But this is not working as expected I am missing few things here, can you guide me to the concurrency pattern which I can use for this requirement, thanks

Solution

Adding some additional output can help you locate the issue:

case data := <-p.ch:
    fmt.Println("Got Data", len(p.queue), p.maxSize)
    if len(p.queue) < p.maxSize {
        p.queue = append(p.queue, data)
    }
    if !p.timer.Stop() {
        fmt.Println("Draining")
        <-p.timer.C
    }
    p.queue = make([]interface{}, 0)
}

Running with that change the output is:

Got Data 5 5
Got Data 0 5
Addding
Draining

So two messages are being processed (the first will not output Adding because len(p.queue) is 5; this is because you initialise it with a size of five – make([]interface{}, maxSize)). Consider what the code is doing when a message is received:

  1. Deal with the queue
  2. Stop the timer
  3. Remake the queue

Now from the docs for timer.Stop():

Stop prevents the Timer from firing. It returns true if the call stops the timer, false if the timer has already expired or been stopped.

On the first iteration this works (the timer is stopped and, if necessary, the channel drained). However you do not reset the timer after stopping it so on the second iteration the timer is already stopped when you call p.timer.Stop(). This means that Stop() will return false (the timer is already stopped!) and you try to drain the channel (but as the timer was already stopped this will block forever).

How you fix this depends upon your aim; I suspect that you meant to reset the timer? If not you could do something like this:

if p.timer.C != nil && !p.timer.Stop() {
    fmt.Println("Draining")
    <-p.timer.C
}
p.timer.C = nil // Timer has been stopped (nil channel will block forever)

Answered By – Brits

Answer Checked By – Candace Johnson (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.