Why is data being pushed into the channel but never read from the receiver goroutine?

Issue

I am building a daemon and I have two services that will be sending data to and from each other. Service A is what produces the data and service B a is Data Buffer service or like a queue. So from the main.go file, service B is instantiated and started. The Start() method will perform the buffer() function as a goroutine because this function waits for data to be passed onto a channel and I don’t want the main process to halt waiting for buffer to complete. Then Service A is instantiated and started. It is then also "registered" with Service B.

I created a method called RegisterWithBufferService for Service A that creates two new channels. It will store those channels as it’s own attributes and also provide them to Service B.

func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
    newIncomingChan := make(chan *data.DataFrame, 1)
    newOutgoingChan := make(chan []byte, 1)
    s.IncomingBuffChan = newIncomingChan
    s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
    bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
        IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
        OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
    }
    s.DataBufferService = bufService
    bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
    s.Logger.Info().Msg("Registeration completed.")
    return nil
}

Buffer essentially listens for incoming data from Service A, decodes it using Decode() and then adds it to a slice called buf. If the slice is greater in length than bufferPeriod then it will send the first item in the slice in the Outgoing channel back to Service A.

func (b* DataBuffer) buffer(bufferPeriod int) {
    for {
        select {
        case newProvider := <- b.NewProvider:
            b.wg.Add(1)
            /*
            newProvider is a string
            DataProviders is a map the value it returns is a struct containing the Incoming and 
            Outgoing channels for this service
            */
            p := b.DataProviders[newProvider]
            go func(prov string, in chan []byte, out chan *DataFrame) {
                defer b.wg.Done()
                var buf []*DataFrame
                for {
                    select {
                    case rawData := <-in:
                        tmp := Decode(rawData) //custom decoding function. Returns a *DataFrame
                        buf = append(buf, tmp)
                        if len(buf) < bufferPeriod {
                            b.Logger.Info().Msg("Sending decoded data out.")
                            out <- buf[0]
                            buf = buf[1:] //pop
                        }
                    case <- b.Quit:
                        return
                    }
                }
            }(newProvider, p.IncomingChan, p.OutgoingChan)
        }
    case <- b.Quit:
        return
    }
}

Now Service A has a method called record that will periodically push data to all the channels in it’s OutgoingDataChannels attribute.

func (s *ServiceA) record() error {
    ...
    if atomic.LoadInt32(&s.Listeners) != 0 {
        s.Logger.Info().Msg("Sending raw data to data buffer")
        for _, outChan := range s.OutgoingDataChannels {
            outChan <- dataBytes // the receiver (Service B) is already listening and this doesn't hang
        }
        s.Logger.Info().Msg("Raw data sent and received") // The logger will output this so I know it's not hanging 
    }
}

The problem is that Service A seems to push the data successfully using record but Service B never goes into the case rawData := <-in: case in the buffer sub-goroutine. Is this because I have nested goroutines? Incase it’s not clear, when Service B is started, it calls buffer but because it would hang otherwise, I made the call to buffer a goroutine. So then when Service A calls RegisterWithBufferService, the buffer goroutine creates a goroutine to listen for new data from Service B and push it back to Service A once the buffer is filled. I hope I explained it clearly.

EDIT 1
I’ve made a minimal, reproducible example.

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var (
    defaultBufferingPeriod int = 3
    DefaultPollingInterval int64 = 10
)

type DataObject struct{
    Data    string
}

type DataProvider interface {
    RegisterWithBufferService(*DataBuffer) error
    ServiceName() string
}

type DataProviderInfo struct{
    IncomingChan    chan *DataObject
    OutgoingChan    chan *DataObject
}

type DataBuffer struct{
    Running         int32 //used atomically
    DataProviders   map[string]DataProviderInfo
    Quit            chan struct{}
    NewProvider     chan string
    wg              sync.WaitGroup
}

func NewDataBuffer() *DataBuffer{
    var (
        wg sync.WaitGroup
    )
    return &DataBuffer{
        DataProviders: make(map[string]DataProviderInfo),
        Quit: make(chan struct{}),
        NewProvider: make(chan string),
        wg: wg,
    }
}

func (b *DataBuffer) Start() error {
    if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
        return fmt.Errorf("Could not start Data Buffer Service.")
    }
    go b.buffer(defaultBufferingPeriod)
    return nil
}

func (b *DataBuffer) Stop() error {
    if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
        return fmt.Errorf("Could not stop Data Buffer Service.")
    }
    for _, p := range b.DataProviders {
        close(p.IncomingChan)
        close(p.OutgoingChan)
    }
    close(b.Quit)
    b.wg.Wait()
    return nil
}

// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing DataFrames
func (b *DataBuffer) buffer(bufferPeriod int) {
    for {
        select {
        case newProvider := <- b.NewProvider:
            fmt.Println("Received new Data provider.")
            if _, ok := b.DataProviders[newProvider]; ok { 
                b.wg.Add(1)
                p := b.DataProviders[newProvider]
                go func(prov string, in chan *DataObject, out chan *DataObject) {
                    defer b.wg.Done()
                    var (
                        buf []*DataObject
                    )
                    fmt.Printf("Waiting for data from: %s\n", prov)
                    for {
                        select {
                        case inData := <-in:
                            fmt.Printf("Received data from: %s\n", prov)
                            buf = append(buf, inData)
                            if len(buf) > bufferPeriod {
                                fmt.Printf("Queue is filled, sending data back to %s\n", prov)
                                out <- buf[0]
                                fmt.Println("Data Sent")
                                buf = buf[1:] //pop
                            }
                        case <- b.Quit:
                            return
                        }
                    }
                }(newProvider, p.IncomingChan, p.OutgoingChan)
            }
        case <- b.Quit:
            return
        }
    }
}

type ServiceA struct{
    Active                  int32 // atomic
    Stopping                int32 // atomic
    Recording               int32 // atomic
    Listeners               int32 // atomic
    name                    string
    QuitChan                chan struct{}
    IncomingBuffChan        chan *DataObject
    OutgoingBuffChans       []chan *DataObject
    DataBufferService       *DataBuffer
}

// A compile time check to ensure ServiceA fully implements the DataProvider interface
var _ DataProvider = (*ServiceA)(nil)

func NewServiceA() (*ServiceA, error) {
    var newSliceOutChans []chan *DataObject
    return &ServiceA{
        QuitChan:  make(chan struct{}),
        OutgoingBuffChans: newSliceOutChans,
        name:   "SERVICEA",
    }, nil
}

// Start starts the service. Returns an error if any issues occur
func (s *ServiceA) Start() error {
    atomic.StoreInt32(&s.Active, 1)
    return nil
}

// Stop stops the service. Returns an error if any issues occur
func (s *ServiceA) Stop() error {
    atomic.StoreInt32(&s.Stopping, 1)
    close(s.QuitChan)
    return nil
}

func (s *ServiceA) StartRecording(pol_int int64) error {
    if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok {
        return fmt.Errorf("Could not start recording. Data recording already started")
    }
    ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
    go func() {
        for {
            select {
            case <-ticker.C:
                fmt.Println("Time to record...")
                err := s.record()
                if err != nil {
                    return
                }
            case <-s.QuitChan:
                ticker.Stop()
                return
            }
        }
    }()
    return nil
}

func (s *ServiceA) record() error {
    current_time := time.Now()
    ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
    dataObject := &DataObject{
        Data: ct,
    }
    if atomic.LoadInt32(&s.Listeners) != 0 {
        fmt.Println("Sending data to Data buffer...")
        for _, outChan := range s.OutgoingBuffChans {
            outChan <- dataObject // the receivers should already be listening
        }
        fmt.Println("Data sent.")
    }
    return nil
}

// RegisterWithBufferService satisfies the DataProvider interface. It provides the bufService with new incoming and outgoing channels along with a polling interval
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
    if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
        return fmt.Errorf("%v data provider already registered with Data Buffer.", s.ServiceName())
    }
    newIncomingChan := make(chan *DataObject, 1)
    newOutgoingChan := make(chan *DataObject, 1)
    s.IncomingBuffChan = newIncomingChan
    s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
    bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
        IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
        OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
    }
    s.DataBufferService = bufService
    bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
    return nil
}

// ServiceName satisfies the DataProvider interface. It returns the name of the service.
func (s ServiceA) ServiceName() string {
    return s.name
}

func main() {
    var BufferedServices []DataProvider
    fmt.Println("Instantiating and Starting Data Buffer Service...")
    bufService := NewDataBuffer()
    err := bufService.Start()
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    defer bufService.Stop()
    fmt.Println("Data Buffer Service successfully started.")

    fmt.Println("Instantiating and Starting Service A...")
    serviceA, err := NewServiceA()
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    BufferedServices = append(BufferedServices, *serviceA)
    err = serviceA.Start()
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    defer serviceA.Stop()
    fmt.Println("Service A successfully started.")

    fmt.Println("Registering services with Data Buffer...")
    for _, s := range BufferedServices {
        _ = s.RegisterWithBufferService(bufService) // ignoring error msgs for base case
    }
    fmt.Println("Registration complete.")

    fmt.Println("Beginning recording...")
    _ = atomic.AddInt32(&serviceA.Listeners, 1)
    err = serviceA.StartRecording(DefaultPollingInterval)
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    for {
        select {
        case RTD := <-serviceA.IncomingBuffChan:
            fmt.Println(RTD)
        case <-serviceA.QuitChan:
            atomic.StoreInt32(&serviceA.Listeners, 0)
            bufService.Quit<-struct{}{}
        }
    }
}

Running on Go 1.17. When running the example, it should print the following every 10 seconds:

Time to record...
Sending data to Data buffer...
Data sent.

But then Data buffer never goes into the inData := <-in case.

Solution

To diagnose this I changed fmt.Println("Sending data to Data buffer...") to fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans) and the output was:

Time to record...
Sending data to Data buffer... []

So you are not actually sending the data to any channels. The reason for this is:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

As the receiver is not a pointer when you do the s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan) you are changing s.OutgoingBuffChans in a copy of the ServiceA which is discarded when the function exits. To fix this change:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

to

func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

and

BufferedServices = append(BufferedServices, *serviceA)

to

BufferedServices = append(BufferedServices, serviceA)

The amended version outputs:

Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA

So this resolves the reported issue (I would not be suprised if there are other issues but hopefully this points you in the right direction). I did notice that the code you originally posted does use a pointer receiver so that might have suffered from another issue (but its difficult to comment on code fragments in a case like this).

Answered By – Brits

Answer Checked By – Pedro (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.