Apache Beam Go SDK: how to convert PCollection<string> to PCollection<KV<string, string>>?

Issue

I’m using the Apache Beam Go SDK and having a hard time getting a PCollection in the correct format for grouping/combining by key.

I have multiple records per key in a PCollection of strings that look like this:

Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse

I want to use GroupByKey and CombinePerKey so I can aggregate each person’s pets like this:

Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]

How do I convert a PCollection<string> to PCollection<KV<string, string>>?

They mention something similar here, but the code to aggregate the string values is not included.

I can use a ParDo to get the string key and string value as shown below, but I can’t figure out how to convert to the KV<string, string> or CoGBK<string, string> format required as input to GroupPerKey.

pcolOut := beam.ParDo(s, func(line string) (string, string) {
  cleanString := strings.TrimSpace(line)
  openingChar := ","
  iStart := strings.Index(cleanString, openingChar)
  key := cleanString[0:iStart]
  value := cleanString[iStart+1:]
        
// How to convert to PCollection<KV<string, string>> before returning?
  return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut) 

It fails with the following error. Any suggestions?

panic:  inserting ParDo in scope root
        creating new DoFn in scope root
        binding fn main.main.func2
        binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}

Solution

To map to KVs, you can apply MapElements and use into() to set KV types and in the via() logic, create a new KV.of(myKey, myValue), for example, to get a KV<String,String>, use something like this:

    PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
        TypeDescriptors.kvs(
            TypeDescriptors.strings(),
            TypeDescriptors.strings()))
        .via(
            linkpage -> KV.of(dataFile, linkpage)));

Answered By – denise

Answer Checked By – Cary Denson (GoLangFix Admin)

Leave a Reply

Your email address will not be published.