How to structure a Go web server, using channels and goroutines?

Issue

I am implementing a server to stream many arrays of floats. I need some help to design my system to achieve the following:

  • The audio process must be independent and does its work even if there isn’t any request coming in. My current approach make the DataProcess function to wait until there is a request.
  • Because the channel can only give data to 1 request, how can 2 or more requests get the data I’ve prepared in the DataProcess?
  • To actually stream data, the request handler cant wait for the whole DataProcess to finish, is there anyway to give the handler data as soon as we complete each iteration in the DataProcess?

Any reply is appreciated. This is my current thoughts:

package main

import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)

func main() {
    c := AudioProcess()
    handleHello := makeHello(c)

    http.HandleFunc("/", handleHello)
    http.ListenAndServe(":8000", nil)
}

func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
    return func(w http.ResponseWriter, r *http.Request) {
        for item := range c { // this loop runs when channel c is closed
            io.WriteString(w, item)
        }
    }
}

func AudioProcess() chan string {
    c := make(chan string)
    go func() {
        for i := 0; i <= 10; i++ { // Iterate the audio file
            c <- strconv.Itoa(i) // have my frame of samples, send to channel c
            time.Sleep(time.Second)
            fmt.Println("send ", i) // logging
        }
        close(c) // done processing, close channel c
        }()
        return c
    }

Solution

I’m not entirely sure if this addresses your problem as I’m not fully aware of your use case, but nevertheless, I’ve come up with a solution below.

I’ve used Gin for the HTTP router because it was more comfortable to me, but I’m pretty sure you can adapt the code to fit yours. I did this in a hurry (sorry), so there may be problems I’m not aware of, but do let me know if there are any.

In short:

  1. I created a Manager that takes care of several Client. It also contains a sync.Mutex to ensure only one thread is modifying the clients at any given time;
  2. There is an InitBackgroundTask() that will generate a random float64 number, and pass it to ALL clients in a Manager (if there are any). If there aren’t any clients, we just sleep, and carry on…
  3. The index handler deals with adding, and removing clients. Clients are identified through a UUID;
  4. 3 things can happen now. Clients are automatically removed when they disconnect via the <-c.Writer.CloseNotify() channel (because the method returns thereby calling the defer). We can also receive the random float64 number in the next background task tick. Finally, we can also terminate if we have not received anything in 20s.

I made several assumptions about your needs here (e.g. that the background task will return X every Y minutes). If you are looking for more fine grain streaming, I’d recommend using websockets instead (and the pattern below can still be used).

Let me know if you have any questions.

Code:

package main

import (
    "github.com/gin-gonic/gin"
    "github.com/satori/go.uuid"
    "log"
    "math/rand"
    "net/http"
    "sync"
    "time"
)

type Client struct {
    uuid string
    out  chan float64
}

type Manager struct {
    clients map[string]*Client
    mutex   sync.Mutex
}

func NewManager() *Manager {
    m := new(Manager)
    m.clients = make(map[string]*Client)
    return m
}

func (m *Manager) AddClient(c *Client) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    log.Printf("add client: %s\n", c.uuid)
    m.clients[c.uuid] = c
}

func (m *Manager) DeleteClient(id string) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    // log.Println("delete client: %s", c.uuid)
    delete(m.clients, id)
}

func (m *Manager) InitBackgroundTask() {
    for {
        f64 := rand.Float64()
        log.Printf("active clients: %d\n", len(m.clients))
        for _, c := range m.clients {
            c.out <- f64
        }
        log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
        time.Sleep(time.Second * 10)
    }
}

func main() {
    r := gin.Default()
    m := NewManager()

    go m.InitBackgroundTask()

    r.GET("/", func(c *gin.Context) {
        cl := new(Client)
        cl.uuid = uuid.NewV4().String()
        cl.out = make(chan float64)

        defer m.DeleteClient(cl.uuid)
        m.AddClient(cl)

        select {
        case <-c.Writer.CloseNotify():
            log.Printf("%s : disconnected\n", cl.uuid)
        case out := <-cl.out:
            log.Printf("%s : received %+v\n", out)
            c.JSON(http.StatusOK, gin.H{
                "output": out,
            })
        case <-time.After(time.Second * 20):
            log.Println("timed out")
        }
    })

    r.Run()
}

Note: if you’re testing this on Chrome, you might have to append a random parameter at the end of the URL so that the request will actually be made, e.g. ?rand=001, ?rand=002 and so on.

Answered By – Ian

Answer Checked By – Clifford M. (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.