How to concurrently stream response back to the client in golang?

Issue

I have below code in a method on server side which send streaming response back to the client.

var cfg = "..." // comes from request parameters
var clientId = "...." // comes from request parameters
var data = allCids.([]int64)
// can we run this in parallel with X number of workers?
for _, cid := range data {
  pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
  if !pd.IsCorrect {
    continue
  }
  resources := us.helperCom.GenerateResourceString(pd)
  val, _ := us.GenerateInfo(clientId, resources, cfg)
  if err := stream.Send(val); err != nil {
    log.Printf("send error %v", err)
  }
}

My confusion is can we make this to run in concurrent? or streaming always run in single threaded way?

Solution

You can make the work concurrent, but you can’t make sending on the stream concurrent. From grpc-go docs:

When using streams, one must take care to avoid calling either SendMsg or RecvMsg multiple times against the same Stream from different goroutines. […]

So you can run your concurrent code in separate goroutines and send the output values on a common channel. The main stream handler then ranges over this channel and calls stream.Send sequentially — so keep in mind that all this is worth it only if the network response takes less than fetching the data.

The code looks like this:

// buffered, so the workers can send and exit immediately
out := make(chan <type of val>, len(data))

// you will have to close the out chan when all work is done
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
    wg.Wait()
    close(out)
}()

for _, cid := range data {
  // don't close around the loop variable
  go func (id int64) {
    defer wg.Done()
    val, err := // obtain output value somehow
    if err != nil {
        return
    }
    out <- val
  }(cid)
}

for val := range out {
  if err := stream.Send(val); err != nil {
    log.Printf("send error %v", err)
  }
}

The number of goroutines is the number of elements in data. If you want to control the number of goroutines, batch data. If you do this, adjust the channel buffer accordingly.

Answered By – blackgreen

Answer Checked By – Mildred Charles (GoLangFix Admin)

Leave a Reply

Your email address will not be published.