go-libp2p – receiving bytes from stream

Issue

I’m building my first go-libp2p application and trying to modify the echo example to read a []byte instead of a string as in the example.

In my code, I changed the doEcho function to run io.ReadAll(s) instead of bufio.NewReader(s) followed by ReadString('\n'):

// doEcho reads a line of data a stream and writes it back
func doEcho(s network.Stream) error {
     b, err := io.ReadAll(s)
     if err != nil {
         return err
     }
     log.Printf("Number of bytes received: %d", len(b))

     _, err = s.Write([]byte("thanks for the bytes"))
     return err
}

When I run this and send a message, I do see the listener received new stream log but the doEcho function gets stuck after the io.ReadAll(s) call and never executes the reply.

So my questions are:

  1. Why does my code not work and how can I make it work?
  2. How does io.ReadAll(s) and bufio‘s ReadString('\n') work under the hood so that they cause this difference in behavior?

Edit:
As per @Stephan Schlecht suggestion I changed my code to this, but it still remains blocked as before:

func doEcho(s network.Stream) error {
    buf := bufio.NewReader(s)
    var data []byte

    for {
        b, err := buf.ReadByte()
        if err != nil {
            break
        }
        data = append(data, b)
    }

    log.Printf("Number of bytes received: %d", len(data))

    _, err := s.Write([]byte("thanks for the bytes"))
    return err
}

Edit 2: I forgot to clarify this, but I don’t want to use ReadString('\n') or ReadBytes('\n') because I don’t know anything about the []byte I’m receiving, so it might not end with \n. I want to read any []byte from the stream and then write back to the stream.

Solution

ReadString('\n') reads until the first occurrence of \n in the input and returns the string.

io.ReadAll(s) reads until an error or EOF and returns the data it read. So unless an error or EOF occurs it does not return.

In principle, there is no natural size for a data structure to be received on stream-oriented connections.

It depends on the remote sender.

If the remote sender sends binary data and closes the stream after sending the last byte, then you can simply read all data up to the EOF on the receiver side.

If the stream is not to be closed immediately and the data size is variable, there are further possibilities: One first sends a header that has a defined size and in the simplest case simply transmits the length of the data. Once you have received the specified amount of data, you know that this round of reception is complete and you can continue.

Alternatively, you can define a special character that marks the end of the data structure to be transmitted. This will not work if you want to transmit arbitrary binary data without encoding.

There are other options that are a little more complicated, such as splitting the data into blocks.

In the example linked in the question, a \n is sent at the end of the data just sent, but this would not work if you want to send arbitrary binary data.

Adapted Echo Example

In order to minimally modify the echo example linked in the question to first send a 1-byte header with the length of the payload and only then the actual payload, it could look something like the following:

Sending

In the function runSender line one could replace the current sending of the payload from:

    log.Println("sender saying hello")
    _, err = s.Write([]byte("Hello, world!\n"))
    if err != nil {
        log.Println(err)
        return
    }

to

    log.Println("sender saying hello")
    payload := []byte("Hello, world!")
    header := []byte{byte(len(payload))}
    _, err = s.Write(header)
    if err != nil {
        log.Println(err)
        return
    }
    _, err = s.Write(payload)
    if err != nil {
        log.Println(err)
        return
    }

So we send one byte with the length of the payload before the actual payload.

Echo

The doEcho would then read the header first and afterwards the payload. It uses ReadFull, which reads exactly len(payload) bytes.

func doEcho(s network.Stream) error {
    buf := bufio.NewReader(s)
    header, err := buf.ReadByte()
    if err != nil {
        return err
    }

    payload := make([]byte, header)
    n, err := io.ReadFull(buf, payload)
    log.Printf("payload has %d bytes", n)
    if err != nil {
        return err
    }

    log.Printf("read: %s", payload)
    _, err = s.Write(payload)
    return err
}

Test

Terminal 1

2022/11/06 09:59:38 I am /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e
2022/11/06 09:59:38 listening for connections
2022/11/06 09:59:38 Now run "./echo -l 8089 -d /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e" on a different terminal
2022/11/06 09:59:55 listener received new stream
2022/11/06 09:59:55 payload has 13 bytes
2022/11/06 09:59:55 read: Hello, world!

Terminal 2

stephan@mac echo % ./echo -l 8089 -d /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e
2022/11/06 09:59:55 I am /ip4/127.0.0.1/tcp/8089/p2p/QmW6iSWiFBG5ugUUwBND14pDZzLDaqSNfxBG6yb8cmL3Di
2022/11/06 09:59:55 sender opening stream
2022/11/06 09:59:55 sender saying hello
2022/11/06 09:59:55 read reply: "Hello, world!"
s

This is certainly a fairly simple example and will certainly need to be customized to your actual requirements, but could perhaps be a first step in the right direction.

Answered By – Stephan Schlecht

Answer Checked By – Marilyn (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.