Goroutine safe channel close doesn't actually close webscoket

Issue

This one is a tricky issue that bugs me quite a bit.

Essentially, I wrote an integration microservice that provides data streams from Binance crypto exchange using the Go client. A client sends a start messages, starts data stream for a symbol, and at some point, sends a close message to stop the stream. My implementation looks basically like this:


func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
    
    switch clientType {

    case bn.SPOT_LIVE:
        wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
        wsErrHandler := c.handlers.klineHandler.ErrHandler

        _, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
        if err != nil {
            fmt.Println(err)
            return err
        } else {
            c.state.clientSymChanMap[clientType][symbol] = stopC
            return nil
        }
  ... 
}

The clientSymChanMap stores the stopChannel in a nested hashmap so that I can retrieve the stop channel later to stop the data feed. The stop function has been implemented accordingly:


func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
    //mtd := "StopDataStream: "

    stopC := c.state.clientSymChanMap[clientType][symbol]

    if isClosed(stopC) {
        DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
    } else {
        close(stopC)
    }
    // Delete  channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
    delete(c.state.clientSymChanMap[clientType], symbol)
    return
}

To prevent panics from already closed channels, I use a check function that returns true in case the channel is already close.


func isClosed(ch <-chan struct{}) bool {
    select {
    case <-ch:
        return true
    default:
    }
    return false
}

Looks nice, but has a catch. When I run the code with starting data for just one symbol, it starts and closes the datafeed exactly as expected.

However, when starting multiple data feeds, then the above code somehow never closes the websocket and just keeps streaming data forever. Without the isClosed check, I get panics of trying to close a closed channel, but with the check in place, well, nothing gets closed.

When looking at the implementation of the above binance.WsKlineServe function, it’s quite obvious that it just wraps a new websocket with each invocation and then returns the done & stop channel.

The documentation gives the following usage example:


wsKlineHandler := func(event *binance.WsKlineEvent) {
    fmt.Println(event)
}
errHandler := func(err error) {
    fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
    fmt.Println(err)
    return
}
<-doneC 

Because the doneC channel actually blocks, I removed it and thought that storing the stopC channel and then use it later to stop the datafeed would work. However, it only does so for one single instance. When multiple streams are open, this doesn’t work anymore.

Any idea what that’s the case and how to fix it?

Solution

Firstly, this is dangerous:

if isClosed(stopC) {
    DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
    close(stopC) // <- can't be sure channel is still open
}

there is no guarantee that after your polling check of the channel state, that the channel will still be in that same state in the next line of code. So this code could in theory could panic if it’s called concurrently.


If you want an asynchronous action to occur on the channel close – it’s best to do this explicitly from its own goroutine. So you could try this:

go func() {

    stopC := c.state.clientSymChanMap[clientType][symbol]
    <-stopC
    // stopC definitely closed now
    delete(c.state.clientSymChanMap[clientType], symbol)
}()

P.S. you do need some sort of mutex on your map, since the delete is asynchronous – you need to ensure any adds to the map don’t datarace with this.

P.P.S Channels are reclaimed by the GC when they go out of scope. If you are no longer reading from it – they do not need to be explicitly closed to be reclaimed by the GC.

Answered By – colm.anseo

Answer Checked By – Timothy Miller (GoLangFix Admin)

Leave a Reply

Your email address will not be published.