GoLang pubsub server stops listening to new channel subscriptions

Issue

I have a redis pubsub connection in my go websocket app, so whenever a client connects and subscribes to a channel, it listens and sends message.
However, say Client 1 is subscribed to channel X, the pubsub starts listening and receiving messages from it.

Now, Client 1 also subscribes to channel Y, so the server should also listen to messages from that channel, however it stops listening to X and only to Y.

    for {
        switch v := gPubSubConn.Receive().(type) {
        case redis.Message:
            log.Printf("Received message from %s", v.Channel)
            subscriptions := ps.GetSubscriptions(v.Channel, nil)
            for _, sub := range subscriptions {
                if v.Channel == types.TaskResults {
                    go sendTaskResultMessage(v.Data, sub)
                } else if v.Channel == types.TaskCount {
                    go sendTaskCountMessage(v.Data, sub)
                }
            }
        case redis.Subscription:
            log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
        case error:
            log.Println("Error pub/sub, delivery stopped")
            return
        }

Here’s an example log output

go-1  | New Client is connected, total:  1
go-1  | 2022/02/16 17:36:03 signature is invalid
go-1  | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | New Client is connected, total:  2
go-1  | 2022/02/16 17:36:14 signature is invalid
go-1  | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1  | 2022/02/16 17:36:16 Received message from task_count
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results

Any ideas what’s going on?

Edited as per comment:

type PubSub struct {
    Clients       []Client
    Subscriptions []Subscription
}

type Client struct {
    Id         string
    Connection *websocket.Conn
}

type Message struct {
    Action  string          `json:"action"`
    Topic   string          `json:"topic"`
    Message json.RawMessage `json:"message"`
    Token   string          `json:"token"`
}

type Subscription struct {
    Topic  string
    Client *Client
    UserId string
}

func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
    var subscriptionList []Subscription

    for _, subscription := range ps.Subscriptions {
        if client != nil {
            if subscription.Client.Id == client.Id && subscription.Topic == topic {
                subscriptionList = append(subscriptionList, subscription)
            }
        } else {
            if subscription.Topic == topic {
                subscriptionList = append(subscriptionList, subscription)
            }
        }
    }
    return subscriptionList
}

Here’s my websocket handler

func websocketHandler(w http.ResponseWriter, r *http.Request) {
    gRedisConn, err := gRedisConn()
    if err != nil {
        log.Panic(err)
    }
    gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
    upgrader.CheckOrigin = func(r *http.Request) bool {
        return true

    }
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    client := pubsub.Client{
        Id:         autoId(),
        Connection: conn,
    }

    // add this client into the list
    ps.AddClient(client)

    fmt.Println("New Client is connected, total: ", len(ps.Clients))

    for {
        messageType, p, err := conn.ReadMessage()
        if err != nil {
            log.Println("Something went wrong", err)
            ps.RemoveClient(client)
            log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
            return
        }
        go listenToMessages()
        ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
    }
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
    m := Message{}
    err := json.Unmarshal(payload, &m)
    if err != nil {
        fmt.Println("This is not correct message payload")
        return ps
    }
    switch m.Action {
    case PUBLISH:
        ps.Publish(m.Topic, m.Message, nil)
    case SUBSCRIBE:
        ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
    case UNSUBSCRIBE:
        fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
    default:
        break
    }

    return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
    clientSubs := ps.GetSubscriptions(topic, client)
    if len(clientSubs) > 0 {
        return ps
    }
    userId := utils.GetUser(token)
    newSubscription := Subscription{
        Topic:  topic,
        Client: client,
        UserId: userId,
    }
    ps.Subscriptions = append(ps.Subscriptions, newSubscription)
    if err := gPubSubConn.Subscribe(topic); err != nil {
        log.Panic(err)
    }
    return ps
}

Solution

The immediate issue is caused by this line in websocketHandler:

gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}

This line replaces the current pubsub connection with a new connection. The new connection does not have any subscriptions. The previous connection is leaked.

Create the pubsub connection once at application startup.

The application has at least one data race. Run the application with the race detector and fix the reported problems.

Answered By – Zombo

Answer Checked By – Terry (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.