Goroutines and messages de-duplicatioin

Issue

So I have some kind of event queues and several goroutines which are getting the events from their corresponding queues in an infinite loop, process them, and send results into a channel. Different queues may give you the same event, so I need to make sure that each event is sent to channel exactly once, and any occurence of that message in another queue will be ignored. I believe that’s more of an architectural issue but I can’t figure out how to handle this properly.

Simplified version of my current code is below.

Goroutines that get and handle incoming events look somewhat like this:

func (q *Queue) ProcessEvents(handler Handler) {
   lastEvent = 0
   for {
       events = getEvents(lastEvent)
       for _, e := range events {
           if e.ID > lastEvent  {
                lastEvent = event.ID
           }
           handler.Handle(e)
       }
   }
}

Handler:

type Handler struct {
    c chan Event
}

func (h *Handler) Handle(event *Event) {
    //event processing omitted
    h.c <- event //Now it just sends a processed event into the channel no matter what.
}

And in main() I do

func main() {
    msgc := make(chan Event)
    for _, q := range queues {
        go func(queue Queue) {
            queue.ProcessEvents(&Handler{msgc})
        }
    }
}

Solution

So you represent your current architecture as follows:

Current architecture

With this type of solution the Generators need to check a shared resource to see if an event was already emitted. This might look something like this:

var hasEmmited map[string]bool
var lock sync.Mutex

func HasEmitted(event e) bool {
   lock.Lock()
   defer lock.Unlock()
   e,ok := hasEmmited[e.ID]
   return e && ok
}

func SetEmmited(event e) {
   lock.Lock()
   defer lock.Lock()
   hasEmmited[e.ID] = true
}

This requires locking/unlocking, which even in the best case scenario with no contention is till a great over-head considering the small amount of work being done in the critical section.

With a small change in the architecture, like in the second diagram, it would be possible for one go-routine to to do the filtering without any locking.

A potential solution

Some commenters have said that designing solutions using go-routines is the same as designing for single-threaded applications. I do not believe this is the case.
I would suggest looking at:

Golang related messaging: https://blog.golang.org/pipelines

Some message handling design patterns: http://www.enterpriseintegrationpatterns.com/

The enterprise integration patterns might look out of place here, but it covers a lot of message passing patters that also applies in go.

Answered By – Hein Oldewage

Answer Checked By – Pedro (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.