[λ] thegeez blog index

Count aggregator in Cascalog

26 Aug 2012

At my current job we use the excellent Cascalog library. Cascalog is a declarative query language for "Big Data" processing on top of Hadoop. Cascalog queries get compiled into MapReduce tasks (through the underlying Cascading library). This approach is a huge win over writing MapReduce tasks yourself. The "Hello World" of MapReduce tasks is the word-count example. However this example is a bit deceptive as it consists of only one Map task and one Reduce task. Most queries that we run require multiple MapReduce tasks chained together. With Cascalog we write a query declaratively while the underlying libraries take care to create (efficient!) chains of MapReduce tasks.

In the beginning I struggled with writing Cascalog queries with aggregators. The best example of this is the count aggregator. The following code uses "count" in Cascalog and SQL.

 1 (ns cascalog-count.core-test
 2   (:use clojure.test
 3         cascalog.api
 4         cascalog.testing
 5         midje.cascalog)
 6   (:require [cascalog.ops :as c]
 7             [clojure.java.jdbc :as jdbc]))
 8 
 9 (def in [[5] [6] [7] [5] [6] [5]])
10 
11 (def db "postgresql://test:test@localhost:5432/testdb")
12 
13 (use-fixtures :once
14               (fn [f]
15                 (jdbc/with-connection db
16                   (jdbc/do-commands "CREATE TABLE integers
17                                      (n integer)")
18                   (apply jdbc/insert-values "integers" ["n"] in)
19                   (f)
20                   (jdbc/do-commands "DROP TABLE integers")
21                   )))
22 
23 (deftest count-all
24   (let [count-all-out [[6]]]
25     (testing "count all sql"
26       (jdbc/with-query-results res
27         ["SELECT count(*) as count
28           FROM integers;"]
29         (is (= count-all-out
30                (map (juxt :count) res)))))
31     (testing "count all cascalog"
32       (fact?- count-all-out
33               (<- [?count]
34                   (in _)
35                   (c/count ?count)
36                   )))))
37 
38 
39 (deftest count-group-by
40   (let [count-group-by-out [[5 3] [6 2] [7 1]]]
41     (testing "count group-by sql"
42       (jdbc/with-query-results res
43         ["SELECT n, count(*) as count
44           FROM integers
45           GROUP BY n;"]
46         (is (= count-group-by-out
47                (map (juxt :n :count) res)))))
48     (testing "count group-by cascalog"
49         (fact?- count-group-by-out
50                 (<- [?n ?count]
51                     (in ?n)
52                     (c/count ?count)
53                     )))))

SQL is also a declarative query, albeit more well known than Cascalog. The "GROUP BY" clause in the SQL query on line 45 is explicit. The Cascalog query on lines 50 through 52 has no group by clause, as this is done implicitly. The "GROUP BY" clause for the Cascalog query is derived from variables in query. All the variables that are not output from aggregates are used to group by. In this case that is only the "?n" variable. Compare this to the Cascalog on line 33 where all the tuples are counted.

Great resources about Cascalog are: