Queuing ajax requests using go routines

Issue

I have the following code:

var (
    WorkersNum int = 12
    HTTPAddr string = "127.0.0.1:8080"
    Delay = 3e9
)

var (
    RequestQueue = make(chan Request, 1024)
    WorkerQueue chan chan Request
)

type Request struct {
    Buf []byte
    Delay time.Duration
}

type Worker struct {
    ID          int
    Request     chan Request
    WorkerQueue chan chan Request
    QuitChan    chan bool
}

func main() {
    fmt.Println("Starting the dispatcher")
    StartDispatcher()

    fmt.Println("Registering the handler")
    http.HandleFunc("/", handleRequest)

    fmt.Println("HTTP server listening on", HTTPAddr)
    if err := http.ListenAndServe(HTTPAddr, nil); err != nil {
        fmt.Println(err.Error())
    }
}

func StartDispatcher() {
    WorkerQueue = make(chan chan Request, WorkersNum)

    for i := 0; i < WorkersNum; i++ {
        fmt.Println("Starting worker", i + 1)
        worker := NewWorker(i + 1, WorkerQueue)
        worker.Start()
    }

    go func() {
        for {
            select {
            case request := <-RequestQueue:
                    fmt.Println("Received requeust")
                    go func() {
                        worker := <-WorkerQueue
                        fmt.Println("Dispatching request")
                        worker <- request
                    }()
            }
        }
    }()
}

func NewWorker(id int, workerQueue chan chan Request) Worker {
    worker := Worker{
        ID:          id,
        Request:     make(chan Request),
        WorkerQueue: workerQueue,
        QuitChan:    make(chan bool),
    }
    return worker
}

func (w *Worker) Start() {
    go func() {
        for {
            w.WorkerQueue <- w.Request
            select {
            case request := <-w.Request:
                    fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, request.Delay.Seconds())
                    time.Sleep(request.Delay)
                    writeToFile(request.Buf)
                    fmt.Printf("worker%d: Saved to file!\n", w.ID)
                case <-w.QuitChan:
                    fmt.Printf("worker%d stopping\n", w.ID)
                    return
            }
        }
    }()
}

func handleRequest(w http.ResponseWriter, r *http.Request) {
    // make sure it's POST
    if r.Method != "POST" {
        w.Header().Set("Allow", "POST")
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    // add cors
    w.Header().Set("Access-Control-Allow-Origin", "*")
    // retrieve
    buf, err := ioutil.ReadAll(r.Body)
    if err != nil {
        //http.Error(w, err, http.StatusBadRequest)
        return
    }
    request := Request{Buf: buf, Delay: Delay}
    RequestQueue <- request
    fmt.Println("Request queued")
}

I’m pretty new to go language and go routines – can you help me to understand how this code works?

First I call start() function on each worker which assigns Worker.Request to Worker.WorkerQueue – how can I assign empty channel to empty array of channels?

Then in StartDispatcher() I create go routine waiting for requests.

When request comes I add it to RequestQueue variable. What’s next? Start() function should trigger, but case is waiting for w.Request. Which is not filled because it’s RequestQueue variable that changes.

Could you give me some simple explanation?

Solution

And i dont like go func() {...} inside Worker.Start(), IMO Worker.Start() must be synchronous, then you must call it as go worker.Start() in StartDispatcher().

How it works.

In StartDispatcher() it creates workers in a loop, which in turn put their’s input channel on WorkerQueue buffered channel (buffered channels works like arrays, but channels) and blocks waiting for requests. Then we start a new goroutine to serve incoming requests: pick 1st worker’s input channel (worker variable) from buffered channel WorkerQueue and send request to it.

Worker will pick it up, do the work, and go for the next cycle: put his input channel to WorkerQueue (yes, its the place where its done 1st time when StartDispatcher() starts).

Anytime you may close workers QuitChan and worker will terminate in case <-w.QuitChan case (read from a closed channels returns immediately).

Btw, your RequestQueue = make(chan Request, 1024) is also buffered channel, so writes to it does not block (unless its full).

Hope it helps.

Answered By – Darigaaz

Answer Checked By – Candace Johnson (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.