Limit the number of processed messages from channel


I recieve around 200 000 message/seconds over channel to my worker, I need to limit the number of messages I will send to the client to only 20 per second.
This make it 1 message per 50 milliseconds

And the worker will still alive during all the program lifetime with the help of the LOOP (and not opening a channel for each message).

My goal:
– Since the order of the messages is important, I want to skip all the messages that comes during that blocked 50ms and save only the latest one
– If the latest one come during the blocked 50ms, I want the saved message to be processed when the block time is over inside the loop and no new message coming ! <– This is my problem

My strategy
– Keep sending the latest message that is not yet processed to the same channel

But the problem with it, what if that message is sent after a new message that comes (from the application) ?

The code below is more an algorythm as a working code, just want a tip/way on how to do it.

func example (new_message_from_channel <-chan *message) {
    default = message
    time = now_milliseconds
    diff_accepted = 50milli
    for this_message := range new_message_from_channel {
        if now_millisecond -  time >= diff_accepted {
            time = now_milliseconds
        } else {
            //save the latest message
            default = this_message

            //My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!

            //My strategy - keep sending it to the same channel
            theChannel <- default


If you got an elegent way to do it, you are welcome to share it with me 🙂


Using a rate-limiter, you can create a throttle function which will take: a rate and a channel as input; and return two channels – one which includes all of the original channels items, the other only relaying items at a fixed rate:

func throttle(r time.Duration, in <-chan event) (C, tC <-chan event) {

    // "writeable" channels
    var (
        wC  = make(chan event)
        wtC = make(chan event)

    // read-only channels - returned to caller
    C = wC
    tC = wtC

    go func() {
        defer close(wC)
        defer close(wtC)

        rl := rate.NewLimiter(

        // relays input channel's items to two channels:
        // (1) gets all writes from original channel
        // (2) only writes at a fixed frequency
        for ev := range in {
            wC <- ev
            if rl.Allow() {
                wtC <- ev

Working example:


To avoid using a rate-limiter and instead use a simple time.Ticker:

tick := time.NewTicker(r)

for ev := range in {
    select {
    case wC <- ev: // write to main
    case <-tick.C:
        wC <- ev  // write to main ...
        wtC <- ev // ... plus throttle channel

Working example:

Answered By – colm.anseo

Answer Checked By – Candace Johnson (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.