Issue
I’m using the Apache Beam Go SDK and having a hard time getting a PCollection in the correct format for grouping/combining by key.
I have multiple records per key in a PCollection of strings that look like this:
Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse
I want to use GroupByKey and CombinePerKey so I can aggregate each person’s pets like this:
Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]
How do I convert a PCollection<string> to PCollection<KV<string, string>>?
They mention something similar here, but the code to aggregate the string values is not included.
I can use a ParDo to get the string key and string value as shown below, but I can’t figure out how to convert to the KV<string, string> or CoGBK<string, string> format required as input to GroupPerKey.
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// How to convert to PCollection<KV<string, string>> before returning?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
It fails with the following error. Any suggestions?
panic: inserting ParDo in scope root
creating new DoFn in scope root
binding fn main.main.func2
binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
Solution
To map to KVs, you can apply MapElements and use into() to set KV types and in the via() logic, create a new KV.of(myKey, myValue)
, for example, to get a KV<String,String>
, use something like this:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));
Answered By – denise
Answer Checked By – Cary Denson (GoLangFix Admin)