Go run loop in parallel with timeout

Issue

I need to run the requests in parallel and not one after another but with timeout. now can I do it in go ?

This is the specific code which I need to run in parallel and the trick here is also to use timeout, i.e. wait to all the request according to the timeout and get the responses after all finished.

    for _, test := range testers {
        checker := NewTap(test.name, test.url, test.timeout)
        res, err := checker.Check()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println(res.name)
        fmt.Println(res.res.StatusCode)

    }

This is the all code (working code)
https://play.golang.org/p/cXnJJ6PW_CF

package main

import (
    `fmt`
    `net/http`
    `time`
)

type HT interface {
    Name() string
    Check() (*testerResponse, error)
}

type testerResponse struct {
    name string
    res  http.Response
}

type Tap struct {
    url     string
    name    string
    timeout time.Duration
    client  *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
    return &Tap{
        url:    url,
        name:   name,
        client: &http.Client{Timeout: timeout},
    }
}

func (p *Tap) Check() (*testerResponse, error) {
    response := &testerResponse{}
    req, err := http.NewRequest("GET", p.url, nil)
    if err != nil {
        return nil, err
    }
    res, e := p.client.Do(req)
    response.name = p.name
    response.res = *res
    if err != nil {
        return response, e
    }
    return response, e
}

func (p *Tap) Name() string {
    return p.name
}

func main() {

    var checkers []HT

    testers := []Tap{
        {
            name:    "first call",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "second call",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
    }

    for _, test := range testers {
        checker := NewTap(test.name, test.url, test.timeout)
        res, err := checker.Check()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println(res.name)
        fmt.Println(res.res.StatusCode)

        checkers = append(checkers, checker)

    }
}

Solution

A popular concurrency pattern in Go is using worker pools.

A basic worker pool uses two channels; one to put jobs on, and another to read results to. In this case, our jobs channel will be of type Tap and our results channel will be of type testerResponse.

Workers

take a job from the jobs channel and puts the result on the results channel.

// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
    for n := range jobs {
        results <- n.Check()
    }
}

Jobs

to add jobs, we need to iterate over our testers and put them on our jobs channel.

// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
    for _, t := range taps {
        jobs <- t
    }
}

Results

In order to read results, we need to iterate over them.

// getResults takes a job from our worker pool and gets the result
func getResults(tr <-chan testerResponse, taps []Tap) {
    for range taps {
        r := <- tr
        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
        if r.err != nil {
            status = fmt.Sprintf(r.err.Error())
        }
        fmt.Println(status)
    }
}

Finally, our main function.

func main() {
    // Make buffered channels
    buffer := len(testers)
    jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`

    // Create worker pool
    // Max workers default is 5
    // maxWorkers := 5
    // for i := 0; i < maxWorkers; i++ {
    //  go worker(jobsPipe, resultsPipe)
    // }

    // the loop above is the same as doing:
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    // ^^ this creates 5 workers..

    makeJobs(jobsPipe, testers)
    getResults(resultsPipe, testers)
}

Putting it all together

I changed the timeout to one millisecond for the ‘second call’ to show how the timeout works.

package main

import (
    "fmt"
    "net/http"
    "time"
)

type HT interface {
    Name() string
    Check() (*testerResponse, error)
}

type testerResponse struct {
    err  error
    name string
    res  http.Response
    url  string
}

type Tap struct {
    url     string
    name    string
    timeout time.Duration
    client  *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
    return &Tap{
        url:    url,
        name:   name,
        client: &http.Client{Timeout: timeout},
    }
}

func (p *Tap) Check() testerResponse {
    fmt.Printf("Fetching %s %s \n", p.name, p.url)
    // theres really no need for NewTap
    nt := NewTap(p.name, p.url, p.timeout)
    res, err := nt.client.Get(p.url)
    if err != nil {
        return testerResponse{err: err}
    }

    // need to close body
    res.Body.Close()
    return testerResponse{name: p.name, res: *res, url: p.url}
}

func (p *Tap) Name() string {
    return p.name
}

// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
    for _, t := range taps {
        jobs <- t
    }
}

// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, taps []Tap) {
    for range taps {
        r := <-tr
        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
        if r.err != nil {
            status = fmt.Sprintf(r.err.Error())
        }
        fmt.Printf(status)
    }
}

// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
    for n := range jobs {
        results <- n.Check()
    }
}

var (
    testers = []Tap{
        {
            name:    "1",
            url:     "http://google.com",
            timeout: time.Second * 20,
        },
        {
            name:    "2",
            url:     "http://www.yahoo.com",
            timeout: time.Second * 10,
        },
        {
            name:    "3",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "4",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "5",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "6",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "7",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "8",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "9",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "10",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "11",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "12",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "13",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "14",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
    }
)

func main() {
    // Make buffered channels
    buffer := len(testers)
    jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`

    // Create worker pool
    // Max workers default is 5
    // maxWorkers := 5
    // for i := 0; i < maxWorkers; i++ {
    //  go worker(jobsPipe, resultsPipe)
    // }

    // the loop above is the same as doing:
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    // ^^ this creates 5 workers..

    makeJobs(jobsPipe, testers)
    getResults(resultsPipe, testers)
}

Which outputs:

// Fetching http://stackoverflow.com 
// Fetching http://www.example.com 
// Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
// 'first call' to 'http://stackoverflow.com' was fetched with status '200'

Answered By – Matt Oestreich

Answer Checked By – Jay B. (GoLangFix Admin)

Leave a Reply

Your email address will not be published.