Run Go asynchronous operations and write to map

Issue

I have this project that tries to run unlimited bigqueries at the same time in Go. The parent project is all Python. I need to be able to keep track of the query results, like in a map.

Input:

{
 'reports_portal': 'select * from reports_portal',
 'billing_portal': 'select * from billing_portal',
}

output:

{
 'reports_portal': [23, 123, 5234, 632],
 'billing_portal': [23, 123, 5234, 632],
}

and so on

these bigqueries need to be run asynchronously as they’re very slow (from a UI perspective, an SRE waiting 15-30 seconds for results.

I first try to asynchronously write items to a map:

package main

import (
    "fmt"
)


func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        go add_to_map(words_map, this_word)
    }
    fmt.Println(words_map)
}

blows up like:

$ go run try_asynchronous.go 
fatal error: concurrent map writes

goroutine 7 [running]:
runtime.throw(0x10b3b96, 0x15)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/panic.go:596 +0x95 fp=0xc420032eb8 sp=0xc420032e98
runtime.mapassign(0x109ad20, 0xc420016270, 0xc420032fa0, 0x10b3268)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/hashmap.go:499 +0x667 fp=0xc420032f58 sp=0xc420032eb8
main.add_to_map(0xc420016270, 0x10b1ba0, 0x3, 0x0)
    /tmp/golang-w-python/try_asynchronous.go:10 +0xa3 fp=0xc420032fc0 sp=0xc420032f58
runtime.goexit()
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420032fc8 sp=0xc420032fc0
created by main.main
    /tmp/golang-w-python/try_asynchronous.go:19 +0xc8

goroutine 1 [runnable]:
fmt.(*pp).fmtString(0xc42001e0c0, 0x10b1f52, 0x7, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:424 +0x1a2
fmt.(*pp).printValue(0xc42001e0c0, 0x10953c0, 0xc42000e260, 0x98, 0x76, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:729 +0x27aa
fmt.(*pp).printValue(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x15, 0x76, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:750 +0x103d
fmt.(*pp).printArg(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:682 +0x217
fmt.(*pp).doPrintln(0xc42001e0c0, 0xc420045f28, 0x1, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:1138 +0xa1
fmt.Fprintln(0x1108140, 0xc42000c018, 0xc420045f28, 0x1, 0x1, 0xc420045ef0, 0xc420045ee0, 0x1087218)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:247 +0x5c
fmt.Println(0xc420045f28, 0x1, 0x1, 0x10b1e6f, 0x6, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:257 +0x57
main.main()
    /tmp/golang-w-python/try_asynchronous.go:21 +0x132
exit status 2

based on needing to run many queries at once and trying to keep track of the results by their name, I expected to write to a map during asynchronous. But fatal error: concurrent map writes says you can’t.

I don’t understand

  1. why not
  2. what I should do the run these bigqueries simultaneously.

EDIT:

The closest thing I have, that returns results, is not asynchronous:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    fmt.Println("Before sleep")
    random_sleep()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Results are wrong:

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

Results should be very fast, no longer than 3 seconds (the max of random I think):

Expectation - 

Before sleep
Before sleep
Before sleep
Before sleep
Added word %v cat
Added word %v giraffe
Added word %v turtle
Added word %v dog

Solution

OK let me clarify some things and help you.

You don’t need to return a modified map from here because your
function gets a reference to map not copy of it. (Let’s ignore the fact
that you are completely ignoring return value)

func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}

Next thing is that you need to synchronize access to map. You can use
mutex for this.

import "sync"

var mutex sync.Mutex //glabal variable but can be created as local also

func add_to_map(m map[string] string, word string) {
    added_word := word + " plus more letters"
    // here you can do long to compute task and calculate result
    // calc here
    mutex.Lock() //result ready lock mutex
    defer mutex.Unlock() // unlock mutex when we return from function
    m[word] = added_word // result write to shared map
}

Note that in Go 1.9 there will be a Concurrent Map type.

Edit:
You need to wait for all go-routines to finish because your main() now finishes before them. You can do this by using WaitGroup

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    // do heavy work here
    //
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Answered By – Radosław Załuska

Answer Checked By – Gilberto Lyons (GoLangFix Admin)

Leave a Reply

Your email address will not be published.