Run Goroutines on separate processes (multiprocessing)

Issue

I currently have a MQTT code that can subscribe to a topic, print out the messages received, then publish further instructions to a new topic. The subscribing/printing is completed in one Goroutine, and the publishing is done in another Goroutine. Here is my code:

var wg, pg sync.WaitGroup
// All messages are handled here - printing published messages and publishing new messages
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {

wg.Add(1)
pg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Printf("%s\n", msg.Payload())
        //fmt.Println(os.Getpid())
    }()
go func(){
    defer pg.Done()
    message := ""
    //Changing configurations
    if strings.Contains(string(msg.Payload()), "arduinoLED") == true {
        message = fmt.Sprintf("change configuration")
    }
    if  strings.Contains(string(msg.Payload()), "NAME CHANGED") == true{
        message = fmt.Sprintf("change back")
    }
    // Publish further instructions to "sensor/instruction"
    token := client.Publish("sensor/instruction", 0, false, message)
    //fmt.Println(os.Getpid())
    token.Wait()

}()
}

func main() {

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    opts := MQTT.NewClientOptions().AddBroker("tcp://test.mosquitto.org:1883")

    opts.SetDefaultPublishHandler(f)
    // Topic to subscribe to for sensor data
    topic := "sensor/data"

    opts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic, 0, f); token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
    }
    // Creating new client
    client := MQTT.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        fmt.Printf("Connected to server\n")
    }
    wg.Wait()
    pg.Wait()
    <-c
}

The commented out os.Getpid() line is to check which process I am running that Goroutine on. Right now they both display the same number (which means both are running on the same process?).

My question is: How can I run the two Goroutines on separate processes? Is there a way?

Edit: If this cannot be done, I want to write this code using channels. Here is the code I have for that:

var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    sensorData := make(chan []byte)
wg.Add(1)
pg.Add(1)
    go func() {
        defer wg.Done()
        //fmt.Printf("%s\n", msg.Payload())
        sensorData <- string(msg.Payload())
        fmt.Println(<-sensorData) //currently not printing anything
    }()
go func(){
    defer pg.Done()
    message := ""
    //Changing configurations
    if strings.Contains(<-sensorData, "arduinoLED") == true{
        message = fmt.Sprintf("change configuration")
    }
    if strings.Contains(<-sensorData, "NAME CHANGED") == true{
        message = fmt.Sprintf("change back")
    }
    // Publish further instructions to "sensor/instruction"
    token := client.Publish("sensor/instruction", 0, false, message)
    token.Wait()

}()

}

However, I am not able to print out any data using channels. What am I doing wrong?

Solution

You might be coming from Python, right? 😉

It has the module named
multiprocessing
in its stdlib, and this might well explain why you have used
this name in the title of your question and why you apparently
are having trouble interpreting what @JimB meant by saying

If you need a separate process, you need to exec it yourself

"Multiprocessing" in Python

The thing is, Python’s multiprocessing is a quite high-level
thing which hides under its hood a whole lot of stuff.
When you spawn a multiprocessing.Process and make it run
a function, what really happens is this:

  1. The Python interpreter creates another operating system’s
    process (using
    fork(2) on Unix-like systems
    or CreateProcess on Windows) and arranges
    for it to execute a Python interpter, too.

    The crucial point is that you will now have two processes
    running two Python interpters.

  2. It is arranged for that Python interpterer in the
    child process to have a way to communicate with the Python
    interpreter in the parent process.

    This "communication link" necessarily involves some form
    of IPC @JimB referred to.
    There is simply no other way to communicate data and actions
    between separate processes exactly because a commodity
    contemporary OS provides strict process separation.

  3. When you exchange Python objects between the processes, the two communicating Python
    interpreters serialize and deserialize them behind your back
    before sending them over their IPC link and after receiving
    them from there, correspondingly.
    This is implemented using the pickle module.

Back to Go

Go does not have any direct solution which would closely
match Python’s multiprocessing, and I really doubt it could
have been sensibly implemented.

The chief reason for this mostly stems from the fact Go
is quite more lower level than Python, and hence it does not
have the Python’s luxury of making sheer assumptions about
the types of values it manages, and it also strives to have
as few hidden costs in its constructs as possible.

Go also strives to steer clear of "framework-style" approaches
to solve problems, and use "library-style" solutions when
possible. (A good rundown of the "framework vs library"
is given, for instance, here.)
Go has everything in its standard library to implement
something akin to Python’s multiprocessing but there is no
ready-made frakework-y solution for this.

So what you could do for this is to roll along these lines:

  1. Use os/exec to run another copy of your own process.

    • Make sure the spawned process "knows" it’s started
      in the special "slave" mode—to act accordingly.
    • Use any form of IPC to communicate with the new process.
      Exchanging data via the standard I/O streams
      of the child process is supposedly
      the simplest way to roll (except when you need to exchange
      opened files but this is a harder topic, so let’s not digress).
  2. Use any suitable package in the encoding/ hierarchy — such as binary, gob, xml — to serialize
    and deserialize data when exchanging.

    The "go-to" solution is supposedly encoding/gob
    but encoding/json will also do just fine.

  3. Invent and implement a simple protocol to tell the
    child process what to do, and with which data,
    and how to communicate the results back to master.

Does it really worth the trouble?

I would say that no, it doesn’t—for a number of reasons:

  • Go has nothing like the dreaded GIL,
    so there’s no need to sidestep it to achieve real parallelism
    when it is naturally possible.

  • Memory safety is all in your hands, and achieving it is
    not really that hard when you dutifully obey the principle
    that what is sent over a channel is now owned by
    the receiver.
    In other words, sending values over a channel
    is also the transfer of ownership of those values.

  • The Go toolchain has integrated race detector, so you
    may run your test suite with the -race flag and create evaluation
    builds of your program using go build -race for the same
    purpose: when a program instrumented in such a way runs,
    the race detector crashes it as soon as it detects any
    unsynchronized read/write memory access.
    The printout resulting from that crash includes
    explanatory messages on what, and where went wrong,
    with stack traces.

  • IPC is slow, so the gains may well be offset by the losses.

All-in-all, I see no real reason to separate processes unless
you’re writing something like an e-mail processing server
where this concept comes naturally.

Answered By – kostix

Answer Checked By – Pedro (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.