I'm encountering multiple errors when trying concurrently parse sites with GO

Issue

Following some @Sam Whited advance and doing some research on stack, i’ve rewritten my code see below: This version of the code seems more stable, however, it is having issues where every once and a while the i get a slew of TCP errors as if i’m no closing my requests. I’ve throttled the requests by adding a sleep. It seems to help a bit.

func main() {
    runtime.GOMAXPROCS(maxParallelism())
    var file = flag.String("f", "", "Enter new line deliminated text file")
    var fileName = flag.String("s", "contact_bot.csv", "Enter new line deliminated text file")
    flag.Parse()

    if *file != "" {
        counter := 0
        filters = []string{"info", "ads", "sales", "sale", "info", "media", "mediarelations", "media_relations", "contact", "contacts", "contactus", "contact_us", "contact-us", "about_us", "general", "advertise", "support", "systems", "system"}
        emailRE = regexp.MustCompile(`([a-z0-9!#$%&'*+\/=?^_{|}~-]+(?:\.[a-z0-9!#$%&'*+\/=?^_{|}~-]+)*(@|\sat\s)(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?(\.|\sdot\s))+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?)`)

        seedUrls, err := readLines(*file)
        checkErr(err)
        numberOfUrls := len(seedUrls)
        usr, err := user.Current()
        checkErr(err)
        parentPath := filepath.Join(usr.HomeDir, "/Desktop/"+*fileName)

        file, err := os.Create(parentPath)
        checkErr(err)
        defer file.Close()

        writer := csv.NewWriter(file)
        defer writer.Flush()

        var header = []string{"URL", "EMAILS"}
        err = writer.Write(header)
        checkErr(err)

        data = make(chan *HTTPResponse)
        go asyncHTTPGets(seedUrls)

    loop:
        for result := range data {
            counter++
            emails := findEmails(result.HTML, filters)
            fmt.Printf("%s, %s, %s\n", result.URL, emails, strconv.Itoa(numberOfUrls))
            var row = []string{result.URL, strings.Join(emails, ",")}
            err := writer.Write(row)
            // writer.Flush()
            checkErr(err)
            if counter == len(seedUrls) {
                break loop
            }
            numberOfUrls--
        }
    }
}


// AsyncHTTPGets ...
func asyncHTTPGets(urls []string) {
    counter := 0
    for _, url := range urls {
        counter++
        if counter%10 == 0 {
            time.Sleep(1 * time.Second)
        }
        go func(url string) {
            fmt.Printf("Fetching %s \n", url)
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println(err.Error())
                data <- &HTTPResponse{url, err.Error()}
                return
            }
            b := resp.Body
            buf := new(bytes.Buffer)
            buf.ReadFrom(b)
            resp.Body.Close()
            myHTML := buf.String()
            data <- &HTTPResponse{url, myHTML}

        }(url)
    }
}

func findEmails(html string, filters []string) []string {
    emails := emailRE.FindAllString(html, -1)
    filteredEmails := []string{}
    for _, email := range emails {
        if stringInSlice(email, filters) {
            if !stringInSlice(email, filteredEmails) {
                filteredEmails = append(filteredEmails, email)
            }
        }
    }
    sort.Strings(filteredEmails)
    return filteredEmails
}

Solution

The application will open a large number of sockets and possibly breach file descriptor limits. I suggest limiting the number of concurrent requests to prevent this issue:

    var (
        requestMu    sync.Mutex  // protects requestCount
        requestCount int         // incremented on each request
    )
    // Create 10 workers. Adjust up or down as needed.
    for w := 0; w < 10; w++ {
        go func() {
            for {
                // Increment request count. Exit at end.

                requestMu.Lock()
                i := requestCount
                requestCount++
                requestMu.Unlock()
                if i >= len(seedUrls) {
                    return
                }

                // Fetch the current URL.

                myURL := seedUrls[i]
                resp, err := http.Get(myUrl)
                if err != nil {
                    fmt.Println(myURL, err.Error(), i)
                    data <- &HTTPResponse{myURL, err.Error()}
                    continue
                }

                // Read body and close.

                b, err := ioutil.ReadAll(resp.Body)
                resp.Body.Close()
                if err != nil {
                    fmt.Println(myURL, err.Error(), i)
                    data <- &HTTPResponse{myURL, err.Error()}
                    continue
                }

                myHTML := string(b)
                data <- &HTTPResponse{myURL, myHTML}
            }
        }()
    }

    // Recieve expected number of results
    for i := 0; i < len(seedUrls); i++ {
        result <- data
        emails := findEmails(result.HTML, filters)
        fmt.Printf("%s, %s, %d\n", result.URL, emails, i)
        var row = []string{result.URL, strings.Join(emails, ",")}
        err := writer.Write(row)
        writer.Flush()
        if err != nil {
            panic(err)
        }
    }

Answered By – Bayta Darell

Answer Checked By – Katrina (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.