Multiple Go routines reading from the same channel

Issue

Hi I’m having a problem with a control channel (of sorts).

The essence of my program:

  • I do not know how many go routines I will be running at runtime
  • I will need to restart these go routines at set times, however, they could also potentially error out (and then restarted), so their timing will not be predictable.
  • These go routines will be putting messages onto a single channel.

So What I’ve done is created a simple random message generator to put messages onto a channel.
When the timer is up (random duration for testing) I put a message onto a control channel which is a struct payload, so I know there was a close signal and which go routine it was; in reality I’d then do some other stuff I’d need to do before starting the go routines again.

My problem is:

  • I receive the control message within my reflect.Select loop
  • I do not (or unable to) receive it in my randmsgs() loop

Therefore I can not stop my randmsgs() go routine.

I believe I’m right in understanding that multiple go routines can read from a single channel, therefore I think I’m misunderstanding how reflect.SelectCases fit into all of this.

My code:

package main

import (
    "fmt"
    "math/rand"
    "reflect"
    "time"
)

type testing struct {
    control bool
    market  string
}

func main() {
    rand.Seed(time.Now().UnixNano())
    // explicitly define chanids for tests.
    var chanids []string = []string{"GR I", "GR II", "GR III", "GR IV"}
    stream := make(chan string)
    control := make([]chan testing, len(chanids))
    reflectCases := make([]reflect.SelectCase, len(chanids)+1)

    // MAKE REFLECT SELECTS FOR 4 CONTROL CHANS AND 1 DATA CHANNEL
    for i := range chanids {
        control[i] = make(chan testing)
        reflectCases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(control[i])}
    }
    reflectCases[4] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(stream)}
    
    // START GO ROUTINES
    for i, val := range chanids {
        runningFunc(control[i], val, stream, 1+rand.Intn(30-1))
    }

    // READ DATA
    for {
        o, recieved, ok := reflect.Select(reflectCases)
        if !ok {
            fmt.Println("You really buggered this one up...")
        }
        ty, err := recieved.Interface().(testing)
        if err == true {
            fmt.Printf("Read from: %v, and recieved close signal from: %s\n", o, ty.market)
            // close control & stream here.
        } else {
            ty := recieved.Interface().(string)
            fmt.Printf("Read from: %v, and recieved value from: %s\n", o, ty)
        }

    }
}
// THE GO ROUTINES - TIMER AND RANDMSGS
func runningFunc(q chan testing, chanid string, stream chan string, dur int) {
    go timer(q, dur, chanid)
    go randmsgs(q, chanid, stream)
}

func timer(q chan testing, t int, message string) {
    for t > 0 {
        time.Sleep(time.Second)
        t--
    }
    q <- testing{true, message}
}

func randmsgs(q chan testing, chanid string, stream chan string) {

    for {
        select {
        case <-q:
            fmt.Println("Just sitting by the mailbox. :(")
            return
        default:
            secondsToWait := 1 + rand.Intn(5-1)
            time.Sleep(time.Second * time.Duration(secondsToWait))
            stream <- fmt.Sprintf("%s: %d", chanid, secondsToWait)
        }
    }
}

I apologise for the wall of text, but I’m all out of ideas :(!

K/Regards,
C.

Solution

Your channels q in the second half are the same as control[0...3] in the first.

Your reflect.Select that you are running also reads from all of these channels, with no delay.

The problem I think comes down to that your reflect.Select is simply running too fast and "stealing" all the channel output right away. This is why randmsgs is never able to read the messages.

You’ll notice that if you remove the default case from randmsgs, the function is able to (potentially) read some of the messages from q.

        select {
        case <-q:
            fmt.Println("Just sitting by the mailbox. :(")
            return
        }

This is because now that it is running without delay, it is always waiting for a message on q and thus has the chance to beat the reflect.Select in the race.

If you read from the same channel in multiple goroutines, then the data passed will simply go to whatever goroutine reads it first.


This program appears to just be an experiment / learning experience, but I’ll offer some criticism that may help.

  • Again, generally you don’t have multiple goroutines reading from the same channel if both goroutines are doing different tasks. You’re creating a mostly non-deterministic race as to which goroutine fetches the data first.

  • Second, this is a common beginner’s anti-pattern with select that you should avoid:

for {
    select {
    case v := <-myChan:
        doSomething(v)
    default:
        // Oh no, there wasn't anything! Guess we have to wait and try again.
        time.Sleep(time.Second)
}

This code is redundant because select already behaves in such a way that if no case is initially ready, it will wait until any case is ready and then proceed with that one. This default: sleep is effectively making your select loop slower and yet spending less time actually waiting on the channel (because 99.999…% of the time is spent on time.Sleep).

Answered By – Hymns For Disco

Answer Checked By – Cary Denson (GoLangFix Admin)

Leave a Reply

Your email address will not be published.