How to start & stop heartbeat per session using context.WithCancel?

Issue

I’m implementing currently the Golang client for TypeDB and struggle with their session based heartbeat convention. Usually, you implement heartbeat per client so that’s relatively easy, just run a gorountine in the background and send a heartbeat every few seconds.

TypeDB, however, chose to implement heartbeat (they call it pulse) on a per session base. which means, every time a new session gets created, I have to start monitoring that session with a separate GoRoutine. Conversely, if the client closes a session, I have to stop the monitoring. What’s particularly ugly, I also have to check for stalled session every once in a while. There is is GH issue to switch over to per client heartbeat, but no ETA so I have to make session heartbeat work to prevent serve side session termination.

So far, my solution:

  1. Create a new session
  2. Open that session & check for error
  3. If no error, add session to a hashmap keyed by session ID

This seems to work for now. Code, just for context is here:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session.go

For monitoring each session, I am mulling over two issues:

  1. Chanel close over multiple gorountines is a bit tricky and may lead to race conditions.

  2. I would need some kind of error group to catch heartbeat failures i.e. in case the server shuts down or a network link error.

With all that in mind, I believe a context.WithCancel might be safe & sane solution.

What I came up so far is this:

  1. Pass the global context as parameter to the heartbeat function
  2. Create a new context WithCancel for each session calling heartbeat
  3. Run heartbeat in a GoRoutine until either cancel gets called (by stopMonitoring) or or error occurs

What’s not so clear to me is, how do I track all the cancel functions returned from each tracked session as to ensure I am closing the right GoRotuine matching the session to close ?

Thank you for any hint to solve this.

The code:


func (s SessionManager) startMonitorSession(sessionID []byte) {
    // How do I track each goRoutine per session

}

func (s SessionManager) stopMonitorSession(sessionID []byte) {
    // How do I call the correct cancel function to stop the GoRoutine matching the session?
}

func (s SessionManager) runHeartbeat(ctx context.Context, sessionID []byte) context.CancelFunc {

    // Create a new context, with its cancellation function from the original context
    ctx, cancel := context.WithCancel(ctx)
    go func() {
        select {
        case <-ctx.Done():
            fmt.Println("Stopped monitoring session: ")
        default:
            err := s.sendPulseRequest(sessionID)
            // If this operation returns an error
            // cancel all operations using this local context created above
            if err != nil {
                cancel()
            }
            fmt.Println("done")
        }
    }()

    // return cancel function for call site to close at a later stage
    return cancel
}

func (s SessionManager) sendPulseRequest(sessionID []byte) error {
    mtd := "sendPulse: "

    req := requests.GetSessionPulseReq(sessionID)
    res, pulseErr := s.client.client.SessionPulse(s.client.ctx, req)
    if pulseErr != nil {
        dbgPrint(mtd, "Heartbeat error. Close session")
        return pulseErr
    }
    if res.Alive == false {
        dbgPrint(mtd, "Server not alive anymore. Close session")
        closeErr := s.CloseSession(sessionID)
        if closeErr != nil {
            return closeErr
        }
    }

    // no error
    return nil
}

Update:

Thanks to the comment(s) I managed to solve the bulk of the issue by wrapping session & CancelFunc in a dedicated struct, called TypeDBSession.

That way, the stop function simply pulls the CancelFunc from the struct, calls it, and stops the monitoring GoRoutine. With some more tweaking, tests seems to pass although this is not concurrency safe for the time being.

That being said, this was a non-trivial issue to solve. Again, but thanks to the comments!

If any one is open to suggesting some code improvements especially w.r.t to make this concurrency safe, feel free to comment here or fill a GH issue / PR.

SessionType:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_type.go

SessionMonitoring:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_monitor.go

Tests:

https://github.com/marvin-hansen/typedb-client-go/tree/main/test/client/session

Solution

My two cents:

  1. You may need run the hearbeat repeatedly. Use a for with a time.Ticker around the select
  2. Store a map session id —> func() to track all cancellable context. Perhaps you should convert the id to string

Answered By – Tiago Peczenyj

Answer Checked By – Mary Flores (GoLangFix Volunteer)

Leave a Reply

Your email address will not be published.