Count aggregator in Cascalog
26 Aug 2012At my current job we use the excellent Cascalog library. Cascalog is a declarative query language for "Big Data" processing on top of Hadoop. Cascalog queries get compiled into MapReduce tasks (through the underlying Cascading library). This approach is a huge win over writing MapReduce tasks yourself. The "Hello World" of MapReduce tasks is the word-count example. However this example is a bit deceptive as it consists of only one Map task and one Reduce task. Most queries that we run require multiple MapReduce tasks chained together. With Cascalog we write a query declaratively while the underlying libraries take care to create (efficient!) chains of MapReduce tasks.
In the beginning I struggled with writing Cascalog queries with aggregators. The best example of this is the count aggregator. The following code uses "count" in Cascalog and SQL.
1 (ns cascalog-count.core-test 2 (:use clojure.test 3 cascalog.api 4 cascalog.testing 5 midje.cascalog) 6 (:require [cascalog.ops :as c] 7 [clojure.java.jdbc :as jdbc])) 8 9 (def in [[5] [6] [7] [5] [6] [5]]) 10 11 (def db "postgresql://test:test@localhost:5432/testdb") 12 13 (use-fixtures :once 14 (fn [f] 15 (jdbc/with-connection db 16 (jdbc/do-commands "CREATE TABLE integers 17 (n integer)") 18 (apply jdbc/insert-values "integers" ["n"] in) 19 (f) 20 (jdbc/do-commands "DROP TABLE integers") 21 ))) 22 23 (deftest count-all 24 (let [count-all-out [[6]]] 25 (testing "count all sql" 26 (jdbc/with-query-results res 27 ["SELECT count(*) as count 28 FROM integers;"] 29 (is (= count-all-out 30 (map (juxt :count) res))))) 31 (testing "count all cascalog" 32 (fact?- count-all-out 33 (<- [?count] 34 (in _) 35 (c/count ?count) 36 ))))) 37 38 39 (deftest count-group-by 40 (let [count-group-by-out [[5 3] [6 2] [7 1]]] 41 (testing "count group-by sql" 42 (jdbc/with-query-results res 43 ["SELECT n, count(*) as count 44 FROM integers 45 GROUP BY n;"] 46 (is (= count-group-by-out 47 (map (juxt :n :count) res))))) 48 (testing "count group-by cascalog" 49 (fact?- count-group-by-out 50 (<- [?n ?count] 51 (in ?n) 52 (c/count ?count) 53 )))))
SQL is also a declarative query, albeit more well known than Cascalog. The "GROUP BY" clause in the SQL query on line 45 is explicit. The Cascalog query on lines 50 through 52 has no group by clause, as this is done implicitly. The "GROUP BY" clause for the Cascalog query is derived from variables in query. All the variables that are not output from aggregates are used to group by. In this case that is only the "?n" variable. Compare this to the Cascalog on line 33 where all the tuples are counted.
Great resources about Cascalog are:
- Cascalog on GitHub
- Cascalog made easy - Explains aggregators and custom operators
- Big Data MEAP - Book on "Big Data" architectures by Cascalog author Nathan Marz, Chapter 5 discusses the Java variant of Cascalog