Simple wordcount and basics of Apache Beam

Apache Beam is a unified programming model that can perform data processing using pipelines, it has the ability to perform an operation on Batch and Streaming mode.

Beam provides SDKs with various programming languages to create a pipeline and runners which will be used to run your pipeline with different providers like

  • Apache Apex
  • Apache Flink (Will have a demo on this later)
  • Apache Spark
  • Google Cloud Dataflow (Will have a demo on this later)

For this demo, we will use Java as a programming language and the latest stable beam java SDK version available at the time of writing this blog is 2.16.0.

Why I am writing this blog as a beam has well-written documentation?

Good Question !!!

I would recommend a developer to visit for quick start using java and end-to-end demo.

The Problem:

On the quick start demo, they suggest creating a project using maven archetype

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=2.16.0 \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=org.apache.beam.examples \

Which will create a lot many files in your source directory and a pom.xml with many options.
And that will be tough for a new bee to understand such files as they wanted to understand very basic of the beam.

The Solution:

I would recommend creating a very basic sample project and I am using such practice for the new bee to have more ideas on it.

Follow the below steps to finish your first beam java sample.

  • Create a basic maven project. In this case, you will have a plain pom.xml without any dependency
  • Add beam-sdks-java-core and beam-runners-direct-java dependency in your pom.
    You whole pom will look like

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns=""
  • Create a class with the main method, here I have created called
    package com.kode12;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.Count;
    import org.apache.beam.sdk.transforms.FlatMapElements;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.TypeDescriptors;
    import java.util.Arrays;
    public class FirstDemo {
        public static void main(String[] args) {
            PipelineOptions options = PipelineOptionsFactory.create();
            Pipeline p = Pipeline.create(options);
                    .apply("ExtractWords", FlatMapElements
                            .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
                    .apply("FormatResults", MapElements
                            .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
  • Create an input file, I have placed my input file in my windows machine at F:\\Beam\\input.txt which contains below lines
  • Here you go! execute your main method and see the result, it will create some files with name wordcounts(Name defined in Line #30) in your project as we have not provided an exact path.



2 Dependencies we have used
beam-sdks-java-core: Provides interface for Java SDK
beam-runners-direct-java: Provides Direct runner which is used to run a pipeline in standalone mode, we used it as we are not using any other backend like Flink or Google Dataflow.

The above code looks straight forward for java developer. It has some steps like

  • Basic Initialization (i.e Creating Pipeline and Pipeline options)
  • Reading a given file
  • Extract the words
  • Count the words
  • Perform mapping/Format Result to count the words
  • Write an output files

In my case, I see 3 files created in my project wordcounts-00000-of-00003, wordcounts-00001-of-00003 and wordcounts-00002-of-00003 which contains the count of words.

Now, the question is why 3 files created and what is the logic behind it?
Beam is using Sharding or we can say Parallelism to divide the output and it uses basic rules(Check rule here from line # 71 to 78) to finalize no of Parallelism. It uses maximum from (no of processor available in your machine) and (3). As I have 2 core machine so 3 is greater so it creates 3 files for me.

Cheers :)