Execute multiple independent jobs continuously

Issue

I have a set of jobs which are independent of each other. Hence each of these jobs can be run concurrently using goroutines. Note that once a single job completes, it should wait for few seconds and start again (applies to all the jobs) and this goes on in a loop until the Go API service stops. Also note that all these jobs execute the same goroutine (makes a REST call). What would be the best pattern to implement this in Go. Please note that I would also want to wait for currently executing jobs to complete before my service shuts down.

Solution

If I got you right, you are looking for something likes this
This is a service with a consumer pool to execute jobs concurrently. When a job is done, it will repeat again after a interval until you stop the service.

type job struct {
    id     int
    result chan error
}

func newJob(id int) job {
    return job{
        id:     id,
        result: make(chan error, 1),
    }
}

type service struct {
    pending chan job

    consumerLimit  int
    repeatInterval time.Duration

    isClosed chan struct{}
    shutdown chan chan error
}

func newService(repeatInterval time.Duration, consumerLimit int, pendingChannelSize int) *service {
    s := &service{
        pending:        make(chan job, pendingChannelSize),
        consumerLimit:  consumerLimit,
        repeatInterval: repeatInterval,
        isClosed:       make(chan struct{}, consumerLimit),
        shutdown:       make(chan chan error),
    }

    for i := 0; i < s.consumerLimit; i++ {
        go s.consumer()
    }

    return s
}

func (s *service) do(ctx context.Context, job job) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case s.pending <- job:
        return <-job.result
    case <-s.isClosed:
        return errors.New("service has been shut down")
    }
}

func (s *service) consumer() {
    for {
        select {
        case j := <-s.pending:
            //Simulate working process
            time.Sleep(time.Duration(rand.Intn(200)) + 200)
            j.result <- nil
            fmt.Println(fmt.Sprintf("job %v is done", j.id))

            go func() {
                //Repeat after a time
                time.Sleep(s.repeatInterval)
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
                defer cancel()
                if err := s.do(ctx, newJob(j.id)); err != nil {
                    fmt.Println(fmt.Errorf("failed to send job to repeat: %v", err))
                }
            }()
        case result := <-s.shutdown:
            result <- nil
            return
        }
    }
}

func (s *service) close() error {
    result := make(chan error, 1)
    for i := 0; i < s.consumerLimit; i++ {
        s.shutdown <- result
    }
    close(s.isClosed)
    return <-result
}

func main() {
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    service := newService(time.Second, 5, 1000)

    //Assign jobs
    for i := 1; i < 10; i++ {
        go func(i int) {
            if err := service.do(context.Background(), newJob(i)); err != nil {
                fmt.Println(fmt.Errorf("failed to send job: %v", err))
            }
        }(i)
    }

    select {
    case <-interrupt:
        switch err := service.close(); err {
        case nil:
            fmt.Println("service has been shutdown successfully")
        default:
            fmt.Println(fmt.Errorf("failed to graceful shut down service: %w", err))
        }
        return
    }
}

Answered By – Son Huynh

Answer Checked By – Clifford M. (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.