Concurrency: limit goroutines does not work as expected

Issue

I’m working on a search engine project currently. For faster crawl speed I use one goroutine per a link visit. But I encountered two problems that made me wonder!

First one is a code sample:

package main

import "fmt"
import "sync"
import "time"

type test struct {
    running int
    max     int
    mu      sync.Mutex
}

func main() {
    t := &test{max: 1000}
    t.start()
}

func (t *test) start() {
    for {
        if t.running >= t.max {
            time.Sleep(200 * time.Millisecond)
            continue
        }
        go t.visit()
    }
}

func (t *test) visit() {
    t.inc()
    defer t.dec()
    fmt.Println("visit called")
    fmt.Printf("running: %d, max: %d\n", t.running, t.max)
    fmt.Println()
    time.Sleep(time.Second)
}

func (t *test) inc() {
    t.mu.Lock()
    t.running++
    t.mu.Unlock()
}
func (t *test) dec() {
    t.mu.Lock()
    t.running--
    t.mu.Unlock()
}

Output (cropped):

running: 2485, max: 1000

running: 2485, max: 1000

running: 2485, max: 1000

visit called
running: 2485, max: 1000

running: 2485, max: 1000

running: 2485, max: 1000

running: 2485, max: 1000


visit called
running: 2485, max: 1000


running: 2485, max: 1000

While I’m explicitly checking for maximum allowed goroutines in the loop, Why running goroutines exceeds the maximum?


Second one is a part of real project code:

UPDATE: This is actually fixed, the problem was in LinkProvider.Get() implementation that took too long to return. parser.visit() returns in the mean time, but the loop in Parser.Start() is waiting for a new link… and the output seems sequential!

package worker

import (
    "errors"
    "fmt"
    "sync"
    "time"

    "bitbucket.org/codictive/ise/components/crawler/models"
    "bitbucket.org/codictive/ise/components/log/logger"
    "bitbucket.org/codictive/ise/core/component"
    "bitbucket.org/codictive/ise/core/database"
)

// Worker is a service that processes crawlable links.
type Worker interface {
    Start() error
    Stop() error
    Restart() error
    Status() Status
}

// Status contains runtime status of a worker.
type Status struct {
    Running             bool
    RunningParsersCount int
}

// New return a new defaultWorker with given config.
func New() Worker {
    return &defaultWorker{
        flow: make(chan bool),
        stop: make(chan bool),
    }
}

// defaultWorker is a Worker implementation.
type defaultWorker struct {
    linkProvider         LinkProvider
    handlersLimit        int
    runningHandlersCount int
    running              bool
    mu                   sync.Mutex
    flow                 chan bool
    stop                 chan bool
}

func (w *defaultWorker) init() {
    prate, _ := component.IntConfig("crawler.crawlInterval")
    arate, _ := component.IntConfig("crawler.ad_crawlInterval")
    concLimit, _ := component.IntConfig("crawler.concurrent_workers_limit")
    w.linkProvider = NewLinkProvider(time.Duration(prate)*time.Hour, time.Duration(arate)*time.Hour)
    w.handlersLimit = concLimit
}

// Start runs worker.
func (w *defaultWorker) Start() error {
    logger.Info("Starting crawler worker...")
    w.running = true
    w.init()

    defer func() {
        w.running = false
        logger.Info("Worker stopped.")
    }()

    for {
        select {
        case <-w.stop:
            w.flow <- true
            return nil
        default:
            fmt.Printf("running: %d limit: %d\n", w.runningHandlersCount, w.handlersLimit)
            if w.runningHandlersCount >= w.handlersLimit {
                time.Sleep(200 * time.Millisecond)
                continue
            }

            link := w.linkProvider.Get()
            if link.ID == 0 {
                logger.Debug("no link to crawl")
                time.Sleep(time.Minute)
                continue
            }

            go func(l *models.CrawlLink) {
                go w.visit(l)
            }(link)
        }
    }
}

// Stop stops worker.
func (w *defaultWorker) Stop() error {
    logger.Info("Stopping crawler worker...")
    w.stop <- true
    select {
    case <-w.flow:
        return nil
    case <-time.After(2 * time.Minute):
        return errors.New("worker did not stopped properly")
    }
}

// Restart re-starts worker.
func (w *defaultWorker) Restart() error {
    logger.Info("Re-starting crawler worker...")
    w.stop <- true
    select {
    case <-w.flow:
        return w.Start()
    case <-time.After(2 * time.Minute):
        return errors.New("can not restart worker")
    }
}

// Status reports current worker status.
func (w *defaultWorker) Status() Status {
    return Status{
        Running:             w.running,
        RunningParsersCount: w.runningHandlersCount,
    }
}

func (w *defaultWorker) visit(cl *models.CrawlLink) {
    w.incrementRunningWorkers()
    defer w.decrementRunningWorkers()

    if cl == nil {
        logger.Warning("[crawler.worker.visit] Can not visit a nil link.")
        return
    }
    if err := cl.LoadFull(); err != nil {
        logger.Error("[crawler.worker.visit] Can not load link relations. (%v)", err)
        return
    }

    parser := NewParser(cl)
    if parser == nil {
        logger.Error("[crawler.worker.visit] Parser instantiation failed.")
        return
    }
    before := time.Now()
    if err := parser.Parse(); err != nil {
        cl.Error = err.Error()
        logger.Error("[crawler.worker.visit] Parser finished with error: %v.", err)
        db := database.Open()
        if err := db.Save(&cl).Error; err != nil {
            logger.Error("[crawler.worker.visit] can not update crawl link. (%v)", err)
        }
    }
    logger.Debug("[crawler.worker.visit] Parsing %q took %s.", cl.URL, time.Since(before))
    fmt.Printf("[crawler.worker.visit] Parsing %q took %s.\n", cl.URL, time.Since(before))
}

func (w *defaultWorker) incrementRunningWorkers() {
    w.mu.Lock()
    w.runningHandlersCount++
    w.mu.Unlock()
    fmt.Printf("increment called. current: %d\n", w.runningHandlersCount)
}

func (w *defaultWorker) decrementRunningWorkers() {
    w.mu.Lock()
    w.runningHandlersCount--
    w.mu.Unlock()
    fmt.Printf("decrement called. current: %d\n", w.runningHandlersCount)
}

Output:

2017/12/03 11:24:36 profile: cpu profiling enabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof
running: 0 limit: 1000
Running server on :8080
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%81%D8%B1%D8%A7%D8%B4%D8%A8%D9%86%D8%AF/%D8%A7%D9%85%D9%84%D8%A7%DA%A9/%D9%81%D8%B1%D9%88%D8%B4-%D8%A7%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88-%D8%AA%D8%AC%D8%A7%D8%B1%DB%8C" took 370.140513ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-s3-neo-24252682.html" took 193.193357ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%85%DB%8C%D8%B2%D9%88%D8%B5%D9%86%D8%AF%D9%84%DB%8C-%D8%AA%D8%A7%D9%84%D8%A7%D8%B1-22399505.html" took 201.636741ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/50000%D9%85%D8%AA%D8%B1-%D8%B2%D9%85%DB%8C%D9%86-%D9%85%D8%B1%D8%BA%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88%D8%A7%D9%82%D8%B9-%D8%AF%D8%B1-%D8%AE%D8%B1%D9%85%D8%AF%D8%B1%D9%87-23075331.html" took 210.360596ms.
decrement called. current: 0
^C2017/12/03 11:24:43 profile: caught interrupt, stopping profiles
2017/12/03 11:24:43 profile: cpu profiling disabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof

As you can see the visit method runs completely sequential! Whether I call it with just go visit(link) or the one used in above code.
Why this happens? What is stopping the loop from iterating?

Solution

I would solve this problem using channels and blocking feature – https://play.golang.org/p/KbYOI1oGNs

The main change is that we have a channel guard, we put new item there when goroutine is started (and it will block if size reaches limit), release when finished.

func (t *test) start() {
    maxGoroutines := t.max
    guard := make(chan struct{}, maxGoroutines)

    for {
        guard <- struct{}{}
        go func() {
            t.visit()
            <-guard
        }()
    }
}

Answered By – Alex Pliutau

Answer Checked By – Mary Flores (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.