Hadoop with Scala: hacking notes

by Sebastian Benthall

I am trying to learn how to use Hadoop. I’m am trying to learn to program in Scala. I mostly forget how to program in Java. In this post I will take notes on things that come up as I try to get my frickin’ code to compile so I can run a Hadoop job.

There was a brief window in my life when I was becoming a good programmer. It was around the end of my second year as a professional software engineer that I could write original code to accomplish novel tasks.

Since then, the tools and my tasks have changed. For the most part, my coding has been about projects for classes, really just trying to get a basic competence in commodity open tools. So, my “programming” consists largely of cargo-culting code snippets and trying to get them to run in a slightly modified environment.

Right now I’ve got an SBT project; I’m trying to write a MapReduce job in Scala that will compile as a .jar that I can run on Hadoop.

One problem I’m having is there are apparently several different coding patterns for doing this, and several frameworks that are supposed to make my life easier. These include SMR, Shadoop, and Scalding. But since I’m doing this for a class and I actually want to learn something about how Hadoop works, I’m worried about having to good a level of abstraction.

So I’m somewhat perversely taking the Scala Wordcount example from jweslley’s Shadoop and make it dumber. I.e., not use Shadoop.

One thing that has been confusing as hell is that there Hadoop has a Mapper interface and a Mapper class, both with map() functions (1,2), but those functions haved different type signatures.

I started working with some other code that used the second map() function. One of the arguments to this function is of type Mapper.Context. I.e., the Context class is a nested member of the Mapper class. Unfortunately, referencing this class within Scala is super hairy. I saw a code snippet that did this:

override def map(key:Object, value:Text, context:Mapper[Object,Text,Text,IntWritable]#Context) = {
    for (t <-  value.toString().split("\\s")) {
      word.set(t)
      context.write(word, one)
    }
  }

But I couldn’t get this thing to compile. Kept getting this awesome error:

type Context is not a member of org.apache.hadoop.mapred.Mapper[java.lang.Object,org.apache.hadoop.io.Text,org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable]

Note the gnarliness here. It’s not super clear whether or how Context is parameterized by the type parameters of Mapper. The docs for the Mapper class make it seem like you can refer to Context without type parameterization within the code of the class extending Mapper. But I didn’t see that until I had deleted everything and tried a different track, which was to use the Mapper interface in a class extending MapReduceBase.

Oddly, this interface hides the Context mechanic and instead introduces the Reporter class as a final argument to map(). I find this less intimidating for some reason. Probably because after years of working in Python and JavaScript my savvinness around the Java type hierarchy is both rusty and obsolete. With the added type magicalness of Scala to add complexity to the mix, I think I’ve got to steer towards the dumbest implementation possible. And at the level I’m at, it looks like I don’t ever have to touch or think about this Reporter.

So, now starting with the example from Shadoop, now I just need to decode the Scala syntactic sugar that Shadoop provides to figure out what the hell is actually going on.

Consider:

  class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {

    val one = 1

    def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter) =
      (value split " ") foreach (output collect (_, one))
  }

This is beautiful concise code. But since I want to know something about the underlying program I’m going to uglify it by removing the implict conversions provided by Shadoop.

The Shadoop page provides a Java equivalent for this, but that’s not really what I want either. For some reason I demand the mildy more concise syntax of Scala over Java but not the kind of condensed, beautiful syntax Scala makes possible with additional libraries.

This compiles at least:

  class Map extends MapReduceBase with Mapper[LongWritable, Text, Text,
 IntWritable] {   

    val one = new IntWritable(1); 

    def map(key: LongWritable, value: Text, output: OutputCollector[Text,
     IntWritable], reporter: Reporter) = {
      var line = value.toString();

      for(word <- line.split(" ")){
        output.collect(new Text(word), one)
      }
    }
  }

What I find a little counterintuitive about this is that the OutputCollector doesn’t act like a dictionary, overwriting the key-value pair with each call to collect(). I guess since I’m making a new Text object with each new entry, that makes sense even if the collector is implemented as a hash map of some kind. (Shadoop hides this mechanism with implicit conversions, which is rad of course.)

Next comes the reducer. The Shadoop code is this:

def reduce(key: Text, values: Iterator[IntWritable],
            output: OutputCollector[Text, IntWritable], reporter: Reporter) = {
  val sum = values reduceLeft ((a: Int, b: Int) => a + b)
  output collect (key, sum)
}

Ok, so there’s a problem here. The whole point of using Scala to code a MapReduce job is so that you can use Scala’s built in reduceLeft function inside the reduce method of the Reducer. Because functional programming is awesome. By which I mean using built-in functions for things like map and reduce operations are awesome. And Scala supports functional programming, in at the very least that sense. And MapReduce as a computing framework is at least analogous to that paradigm in functional programming, and even has the same name. So, OMG.

Point being, no way in hell am I going to budge on this minor aesthetic point in my toy code. Instead, I’m going to brazenly pillage jweslley’s source code for the necessary implicit type conversion.

  implicit def javaIterator2Iterator[A](value: java.util.Iterator[A]) = new Iterator[A] {
    def hasNext = value.hasNext
    def next = value.next
  }

But not the other implicit conversions that would make my life easier. That would be too much.

Unfortunately, I couldn’t get this conversion to work right. Attempting to run the code gives me the following error:

[error] /home/cc/cs294/sp13/class/cs294-bg/hw3/wikilearn/src/main/scala/testIt/WordCount.scala:33: type mismatch;
[error]  found   : java.util.Iterator[org.apache.hadoop.io.IntWritable]
[error]  required: Iterator[org.apache.hadoop.io.IntWritable]
[error]       val sum = (values : scala.collection.Iterator[IntWritable]).reduceLeft (
[error]                  ^

It beats me why this doesn’t work. In my mental model of how implicit conversion is supposed to work, the java.util.Iterator[IntWritable] should be caught by the parameterized implicit conversion (which I defined within the Object scope) and converted no problemo.

I can’t find any easy explanation of this on-line at the moment. I suspect it’s a scoping issue or a limit to the parameterization of implicit conversions. Or maybe because Iterator is a trait, not a class? Instead I’m going to do the conversion explicitly in the method code.

After fussing around for a bit, I got:

    def reduce(key: Text, values: java.util.Iterator[IntWritable],                                                               
      output: OutputCollector[Text, IntWritable], reporter: Reporter) = {                                                        
      val svals = new scala.collection.Iterator[IntWritable]{
        def hasNext = values.hasNext
        def next = values.next
      }
      val sum = (svals : scala.collection.Iterator[IntWritable])\|
.reduceLeft (
       (a: IntWritable, b: IntWritable) => new IntWritable(a.get() + b.get())}
      ) 
      output collect (key, sum)
    }

…or, equivalently and more cleanly:

    def reduce(key: Text, values: java.util.Iterator[IntWritable],
      output: OutputCollector[Text, IntWritable], reporter: Reporter) = {

      val svals = new scala.collection.Iterator[Int]{
        def hasNext = values.hasNext
        def next = values.next.get
      }

      val sum = (svals : scala.collection.Iterator[Int]).reduceLeft (
        (a: Int, b: Int) => a + b
      )
      output collect (key, new IntWritable(sum))
    }
  }

I find the Scala syntax for defining the methods of an abstract class here pretty great (I hadn’t encountered it before). Since Iterator[A] is an abstract class, you define the methods next and hasNext inside the curly braces. What an elegant way to let people subclass abstract classes in an ad hoc way!

There’s one more compile error I had to bust around. This line was giving me noise:

conf setOutputFormat classOf[TextOutputFormat[_ <: WritableComparable, _ <: Writable]]

It was complaining that WriteComparable needed a type parameter. Not confident I could figure out exactly which parameter to set, I just made the signature tighter.

conf setOutputFormat classOf[TextOutputFormat[Text, IntWritable]]

Only then did I learn that JobConf is a deprecated way of defining jobs. So I rewrote WordCount object into a class implementing the Tool interface, using this Java snippet as an example to work from. To do that, I had to learn the to write a class that extends two interfaces in Scala, you need to use a “extends X with Y” syntax. Also, for trivial conditionals Scala dispenses with Java’s ternary X ? Y : Z operator in favor of a single line if (X) Y else Z. Though I will miss the evocative use of punctuation in the ternary construct, I’ve got to admit that Scala is keeping it real classy here.

Wait…ok, so I just learned that most of the code I was cargo culting was part of the deprecated coding pattern, which means I now have to switch it over to the new API. I learned this from somebody helpful in the #hadoop IRC channel.

[23:31]  what's the deal with org.apache.hadoop.mapreduce.Mapper and org.apache.hadoop.mapred.Mapper ??
[23:31]  is one of them deprecated?
[23:31]  which should I be using?
[23:32]  sbenthall: Use the new API
[23:32]  sbenthall: (i.e. mapreduce.*) both are currently supported but eventually the (mapred.*) may get deprecated
[23:32]  ok thanks QwertyM
[23:33]  sbenthall: as a reference, HBase uses mapreduce.* APIs completely for its provided MR jobs; and I believe Pig too uses the new APIs
[23:33]  is MapReduceBase part of the old API?
[23:33]  sbenthall: yes, its under the mapred.* package
[23:33]  ok, thanks.

Parachuting into the middle of a project has it’s drawbacks, but it’s always nice when a helpful community member can get you up to speed. Even if you’re asking near midnight on a Sunday.

Wait. I realize now that I’ve come full circle.

See, I’ve been writing these notes over the course of several days. Only just now am I realizing that I’m not going back to where I started, with the Mapper class that takes the Context parameter that was giving me noise.

Looking back at the original error, it looks like that too was a result of mixing two API’s. So maybe I can now safely shift everything BACK to the new API, drawing heavily on this code.

It occurs to me that this is one of those humbling programming experiences when you discover that the reason why your thing was broken was not the profound complexity of the tool you were working with, but your own stupidity over something trivial. This happens to me all the time.

Thankfully, I can’t ponder that much now, since it’s become clear that the instructional Hadoop cluster on which we’ve been encouraged to do our work are highly unstable. So I’m going to take the bet that it will be more productive for me to work locally, even if that means installing Hadoop locally on my Ubuntu machine.

I thought I was doing pretty good with the installation until I got to the point of hitting the “ON” switch on Hadoop. I got this:

sb@lebensvelt:~$ /usr/local/hadoop/bin/start-all.sh 
Warning: $HADOOP_HOME is deprecated.

starting namenode, logging to /usr/local/hadoop/libexec/../logs/hadoop-sb-namenode-lebensvelt.out
localhost: ssh: connect to host localhost port 22: Connection refused
localhost: ssh: connect to host localhost port 22: Connection refused
starting jobtracker, logging to /usr/local/hadoop/libexec/../logs/hadoop-sb-jobtracker-lebensvelt.out
localhost: ssh: connect to host localhost port 22: Connection refused

I googled around and it looks like this problem is due to not having an SSH server running locally. Since I’m running Ubuntu, I went ahead and followed these instructions. In the process I managed to convince my computer that I was undergoing a man-in-the-middle attack between myself and myself.

I fixed that with

$ ssh-keygen -R localhost

and successfully got Hadoop running with

$ /usr/local/hadoop/bin/start-all.sh 

only to be hung up on this error

$ hadoop fs -ls
Warning: $HADOOP_HOME is deprecated.

ls: Cannot access .: No such file or directory.

which somebody who runs an Indian matrimony search engine had run into and documented the fix for. (Right way to spell it is

hadoop fs -ls .

With an extra dot.)

There’s a point to me writing all this out, by the way. An important part of participation in open source software, or the hacker ethic in general, is documenting ones steps so that others who follow the same paths can benefit from what you’ve gone through. I’m going into a bit more detail about this process than really helpful because in my academic role I’m dealing with a lot of social scientist types who really don’t know what this kind of work entails. Let’s face it: programming is a widely misunderstood discipline which seems like an utter mystery to those that aren’t deeply involved in it. Much of this has to do with the technical opacity of the work. But another part of why its misunderstood is because problem solving in the course of development depends on a vast and counter-intuitive cyberspace of documentation (often generated from code comments, so written by some core developer), random blog posts, chat room conversations, forum threads. Easily 80% of the work when starting out on a new project like this is wrestling with all the minutia of configuration on a particular system (operating system and hardware contingent, in many cases) and idioms of programming language and environment.

The amount of time it takes to invest in any particular language or toolkit necessarily creates a tribalism among developers because their identities wind up being intertwined with the tools they use. As I hack on this thing, however incompetently, I’m becoming a Scala developer. That’s similar to saying that I’m becoming a German speaker. My conceptual vocabulary, once I’ve learned how to get things done in Scala, is going to be different than it was before. In fact, that’s one of the reasons why I’m insisting on teaching myself Scala in the first place–because I know that it is a conceptually deep and rigorous language which will have something to teach me about the Nature of Things.

Some folks in my department are puzzled at the idea that technical choices in software development might be construed as ethical choices by the developers themselves. Maybe it’s easier to understand that if you see that in choosing a programming language you are in many ways choosing an ontology or theoretical framework through which to conceive of problem-solving. Of course, choice of ontology will influence ones ethical principles, right?

But I digress.

So I have Hadoop running on my laptop now, and a .jar file that compiles in SBT. So now all I need to do is run the .jar using the hadoop jar command, right?

Nope, not yet…

Exception in thread "main" java.lang.NoClassDefFoundError: scala/ScalaObject

OK, so I the problem is that I haven’t included scala-library.jar on my Hadoop runtime classpath.

I solved this by making a symbolic link from the Hadoop /lib directory to the .jar in my Scala installation.

ln -s /usr/local/share/scala-2.9.2/lib/scala-library.jar /usr/local/hadoop/lib/scala-library.jar

That seemed to work, only now I have the most mundane and inscrutable of Java errors to deal with:

Exception in thread "main" java.lang.NullPointerException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:601)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

I had no idea how to proceed from here. This time, no helpful folks in the #hadoop channel helped me either.

So once again I switched to a new hunk of code to work from, this time the WordCount.scala file from Derrick Cheng’s ScalaOnHadoop project. Derrick posted a link to this project on our course’s Piazza earlier, which was awesome.

Another digression. There’s a lot of talk now about on-line education. People within the university context feel vaguely threatened by MOOCs, believing that there will be a superstar effect that advantages first movers, but many are uncomfortable making that first move. Taek Lim in my department is starting to studying user interfaces to support collaboration in on-line learning.

My own two cents on this are that the open software model is about as dynamics a collaborative environment as you can get, and at the point when people started to use our course discussion management system, Piazza, as if it were a forum to discuss the assignment almost as if it was an open source mailing list, we started to get a lot more out of it and learned a lot from each other. I’m not the first person to see this potential of the open source model for education, of course. I’m excited to be attending the POSSE workshop, which is about that intersection, this summer. At this rate, it looks like I will be co-teaching a course along these lines in the Fall targeted at the I School Masters students, which is exciting!

So, anyway, I’m patching Derrick Cheng’s code. I’m not working with BIDMat yet so I’m leaving that out of the build, so I remove all references to that and I get the thing to compile…and run! I got somebody else’s Scala WordCount to run!

This seems like such a triumph. It’s taken a lot of tinkering and beating my head against a wall to get this far. (Not all at once–I’ve wrote this post over the course of several days. I realize that it’s a little absurd.)

But wait! I’m not out of the woods yet. I check the output of my MapReduce job with

hadoop fs -cat test-output/part-r-00000

and the end of my output file looks like this:

§	1
§	1
§	1
§	1
§	1
§	1
§	1
§	1
§	1
Æschylus	1
Æsop	1
Æsop	1
Æsop	1
É	1
Élus,_	1
à	1
à	1
à	1
æons	1
æsthetic	1
è	1
è	1
è	1
état_.	1
The	1
The	1

What’s going on here? Well, it looks like I successfully mapped each occurrence of each word in the original text to a key-value pair. But something went wrong in the reduce step, that was supposed to combine all the occurrences into a single count for each word.

That is just fine for me, because I’d rather be using my own Reduce function. Because it uses Scala’s functional reduceLeft, which is sweet! Why even write a Map Reduce job in a functional programming language if you can’t use a built=in language reduce in the Reduce step?

Ok, mine doesn’t work either.

Apparently, the reason for this is that the type signature I’ve been using for the Reducer’s reduce method has been wrong all along. And when that happens, the code compiles but Reducer runs its default reduce function, which is the identity function.

It’s almost (almost!) as if it would have made more sense to start by just reading the docs and following the instructions.

Now I have edited the map and reduce functions so that they have the right type signatures. To get this right exactly, I looked at a different file. I also tinkered

At last, it works.

Now, at last, I understand how this Context member class works. The problem was that I was trying to use it with the mapred.Mapper class from the old API. So much of my goose chase was due to not reading things carefully enough.

On the other hand, I feel enriched by the whole meandering process. Realizing that my programming faults were mine and not due to the complexity of the tools I was using paradoxically gives me more confidence in my understanding of the tools moving forward. And engaging with the distributed expertise on the subject–through StackOverflow, documentation generated from the original coders, helpful folks on IRC, blog posts, and so on–is much more compelling when one is driven by concrete problem-solving goals, even when those goals are somewhat arbitrary. Had I learned to use Hadoop in a less circuitous way, my understanding would probably be much more brittle. I am integrating new knowledge of Hadoop, Scala, and Java (it’s been a long time) with existing background knowledge. After a good night’s sleep, with any luck it will be part of my lifeworld!

This is the code I wound up with, by the way. I don’t suggest you use it.