This workflow counts the occurrences of words in a text corpus. It implements a Cuneiform example workflow first published in Bux et al. 2015.

Introduction

The canonical word count is a basic workflow often used to exemplify the fundamental concepts of a data analysis platform. Similar workflows have been published for data analysis platforms like Hadoop, Spark, or Flink.

The workflow takes a text corpus and produces a table associating to each word occurring in the corpus a count, denoting the word’s absolute frequency. While the example shown in Bux et al. 2015 uses Python task definitions to perform the counting and aggregation of counts, here, we use only standard Unix tools like uniq or awk.

Task Definitions

Utility Tasks

unzip

The unzip task consumes a single zip file and extracts it into a local subdirectory dir. The extracted files are returned as a list.

deftask unzip( <out( File )> : zip( File ) ) in bash *{
  unzip -d dir $zip
  out=`ls dir | awk '{print "dir/" $0}'`
}*

split

The split task consumes a single text file and splits it every 1024 lines. The split partitions are returned as a list.

deftask split( <out( File )> : file( File ) ) in bash *{
  split -l 1024 $file txt
  out=txt*
}*

Tasks for Counting Words

count

The count task consumes a single text file and creates from it a table associating to each word occurring in the text file a count. The resulting csv table is returned.

deftask count( csv( File ) : txt( File ) ) in bash *{
  csv=count.csv
  tr ' ' '\n' < $txt | sort | uniq -c -i > $csv
}*

join

The join task consumes a list of csv table files and aggregates them. The occurrences of words are added. The resulting csv table is returned.

deftask join( ret( File ) : <csv( File )> ) in bash *{
  ret=ret.csv
  cat ${csv[@]} | awk '{a[$2]+=$1}END{for(i in a) print a[i],i}' > $ret
}*

Workflow Definition

The workflow definition defines a variable sotu which denotes a zip file containing a single text file. This text file is the corpus on which we are going to perform word counting.

sotu = "sotu/stateoftheunion1790-2014.txt.zip";

After extracting the text file we split it into smaller partitions. This enables us to perform word counting on each individual partition in parallel.

fileLst = split( file: unzip( zip: sotu ) );

Now word counting is performed for each partition. Afterwards the word counts for each partition are joined.

result  = join( csv: count( txt: fileLst ) );

Query

Up to now we have stated only task definitions and assignments. Neither of these in and of themselves trigger any computation. By querying an expression we define what the eventual goal of the workflow is. Only computations contributing to that goal are actually performed.

Here, we are interested in what the variable result holds, which we defined in the workflow definition. Thus, we query that variable.

result;