Wait for N items in channel before executing sequentially

Issue

So I am very new to go! But I had this idea about something I wanted to try.

I would like to have a go routine that accepts strings from a channel but only after it has received N strings should it execute on them.

I looked around for similar questions or cases but I only found ones where the idea was to execute several routines in parallel and wait to aggregate the result.

I though about the idea of creating an array and just pass it to a routine where the length was sufficient. However I want to keep a certain separation of concerns and control this on the receiving end.

My questions are.

  1. Is this bad practice for some reason?
  2. Is there a better way to do this, what is it?

    func main() {
        ch := make(chan string)
        go func() {
            tasks := []string{}
            for {
                tasks = append(tasks,<- ch)
    
                if len(tasks) < 3 {
                    fmt.Println("Queue still to small")
                }
                if len(tasks) > 3 {
                    for i := 0; i < len(tasks); i++ {
                        fmt.Println(tasks[i])
                    }
                }
            }
        }()
    
        ch <- "Msg 1"
        time.Sleep(time.Second)
        ch <- "Msg 2"
        time.Sleep(time.Second)
        ch <- "Msg 3"
        time.Sleep(time.Second)
        ch <- "Msg 4"
        time.Sleep(time.Second)
    }
    

Edit for simpler more accurate example.

Solution

Based on a few comments, it looks like what you are looking for is some form of batching.

Batching has a few scenarios when you would want to take the batch and send it along:

  1. Batch size is sufficient size
  2. Enough time has passed and a partial batch should be flushed

Your given example does not account for the second scenario. This can lead to some awkward behavior if you just never flush because you quit getting load.

Therefore I would recommend either looking into a library (e.g., cloudfoundry/go-batching) or simply use channels, a Timer and a select statement.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go func() {
        tasks := []string{}
        timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
        for {
            select {
            case <-timer.C:
                fmt.Println("Flush partial batch due to time")
                flush(tasks)
                tasks = nil
                timer.Reset(time.Second)
            case data := <-ch:
                tasks = append(tasks, data)

                // Reset the timer for each data point so that we only flush
                // partial batches when we stop receiving data.
                if !timer.Stop() {
                    <-timer.C
                }
                timer.Reset(time.Second)

                // Guard clause to for batch size
                if len(tasks) < 3 {
                    fmt.Println("Queue still too small")
                    continue
                }

                flush(tasks)
                tasks = nil // reset tasks
            }
        }
    }()

    ch <- "Msg 1"
    time.Sleep(time.Second)
    ch <- "Msg 2"
    time.Sleep(time.Second)
    ch <- "Msg 3"
    time.Sleep(time.Second)
    ch <- "Msg 4"
    time.Sleep(time.Second)
}

func flush(tasks []string) {
    // Guard against emtpy flushes
    if len(tasks) == 0 {
        return
    }

    fmt.Println("Flush")
    for _, t := range tasks {
        fmt.Println(t)
    }
}

Answered By – poy

Answer Checked By – Marilyn (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.