Limit the number of processed messages from channel

Issue

I recieve around 200 000 message/seconds over channel to my worker, I need to limit the number of messages I will send to the client to only 20 per second.
This make it 1 message per 50 milliseconds

And the worker will still alive during all the program lifetime with the help of the LOOP (and not opening a channel for each message).

My goal:
– Since the order of the messages is important, I want to skip all the messages that comes during that blocked 50ms and save only the latest one
– If the latest one come during the blocked 50ms, I want the saved message to be processed when the block time is over inside the loop and no new message coming ! <– This is my problem

My strategy
– Keep sending the latest message that is not yet processed to the same channel

But the problem with it, what if that message is sent after a new message that comes (from the application) ?

The code below is more an algorythm as a working code, just want a tip/way on how to do it.

func example (new_message_from_channel <-chan *message) {
    default = message
    time = now_milliseconds
    diff_accepted = 50milli
    for this_message := range new_message_from_channel {
        if now_millisecond -  time >= diff_accepted {
            send_it_to_the_client
            time = now_milliseconds
        } else {
            //save the latest message
            default = this_message

            //My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!

            //My strategy - keep sending it to the same channel
            theChannel <- default
        }

    }
}

If you got an elegent way to do it, you are welcome to share it with me 🙂

Solution

Using a rate-limiter, you can create a throttle function which will take: a rate and a channel as input; and return two channels – one which includes all of the original channels items, the other only relaying items at a fixed rate:

func throttle(r time.Duration, in <-chan event) (C, tC <-chan event) {

    // "writeable" channels
    var (
        wC  = make(chan event)
        wtC = make(chan event)
    )

    // read-only channels - returned to caller
    C = wC
    tC = wtC

    go func() {
        defer close(wC)
        defer close(wtC)

        rl := rate.NewLimiter(
            rate.Every(r),
            1,
        )

        // relays input channel's items to two channels:
        // (1) gets all writes from original channel
        // (2) only writes at a fixed frequency
        for ev := range in {
            wC <- ev
            if rl.Allow() {
                wtC <- ev
            }
        }
    }()
    return
}

Working example: https://play.golang.org/p/upei0TiyzNr


EDIT:

To avoid using a rate-limiter and instead use a simple time.Ticker:

tick := time.NewTicker(r)

for ev := range in {
    select {
    case wC <- ev: // write to main
    case <-tick.C:
        wC <- ev  // write to main ...
        wtC <- ev // ... plus throttle channel
    }
}

Working example: https://play.golang.org/p/UTRXh72BvRl

Answered By – colm.anseo

Answer Checked By – Candace Johnson (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.