Send messages from kafka into websocket

Issue

I created a web service with gin-framework in golang. In this project I also consume some kafka messages that comes from a particular topic. What I’m trying to achive is to pour out the messages that I receive from the topic into the websocket. So the communication is only one way and more than 1 person can connect to the web socket and see the messages coming in.
I tought to use channel, so inside the function that receives the kafka message I have something like this:

ch <- KafkaMessage

While in gin framework I create something like this:

requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)

func wsWorkFlowLive(c *gin.Context) {
    ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
    if err != nil {
        log.Println("error get connection")
        log.Fatal(err)
    }
    defer ws.Close()

    err = ws.WriteJSON(<-ch)

    if err != nil {
        log.Println("error write message: " + err.Error())
    }
}

var upGrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

Here the html snipped that I use to test the websocket:

 <!DOCTYPE html>
   <html lang="en">
     <head>
       <meta charset="UTF-8" />
       <title>index</title>
    </head>
     <body>
       <h1>test websocket</h1>
       <p id="message-json"></p>
      <p id="message-text"></p>
            <script>
        function jsonWS() {
          var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");

          ws.onmessage = function (evt) {
            console.log("Received Message: " + evt.data);
            document.getElementById("message-json").innerText += evt.data;
          };

          ws.onclose = function (evt) {
            console.log("Connection closed.");
          };
        }

        // Start websocket
        jsonWS();
      </script>
    </body>
  </html>

But I miss something, I’m quite a newbie, becasue I have the following wrong behaviours once I recevie the first kafka message:

  1. Only the first message is shown, after that connection is quickly closed
  2. To see the second ones I have to refresh the page, and this is not the websocket way
  3. Because the connection is closed the channel it not red, so it’s stuck in the cosume function until I read it. I cannot have this behaviour
  4. to avoid the point 3, I think I have to have a mechanism where I send the message to the channel only when one or more ws is connected.

Solution

The Gorilla chat example is close to what you need. The example broadcasts messages received from any client to all connected clients. Do the following to adapt the code to your use case:

  • Change the client read pump to discard received messages instead of sending messages to the hub.

    func (c *Client) readPump() {
        defer func() {
            c.hub.unregister <- c
            c.conn.Close()
        }()
        c.conn.SetReadLimit(maxMessageSize)
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        c.conn.SetPongHandler(func(string) error { 
        c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
        for {
            if _, _, err := c.NextReader(); err != nil {
              break
            }
        }
    }
    
  • Change your Kafka read loop to send messages to the Hub.broadcast channel.

    for {
        msg, err := c.ReadMessage(xxx)
        if err != nil {
              // handle error
        }
        hub.broadcast <- msg
    }
    
  • Delete the code that coalesces messages in the client send queue to a single websocket message or adjust the client to handle multiple Kafka messages in a single websocket message.

Answered By – Zombo

Answer Checked By – Pedro (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.