At Grammarly, we have long used Amazon EMR with Hadoop and Pig in support of our big data processing needs. However, we were really excited about the improvements that the maturing Apache Spark offers over Hadoop and Pig, and so set about getting Spark to work with our petabyte text data set. In this post, we describe the challenges we had in the process and a scalable working setup of Spark that we have discovered as a result.

How Big Is Too Big?

At the core of our NLP technologies, we maintain a variety of language models. To build these models, we process and crunch very large text corpora such as Common Crawl (the part that we use is currently almost a petabyte in size). We de-duplicate, extract, split, filter, tally, and restructure this plain text in a variety of ways that involve data aggregation beyond simple text processing to build fast, efficient language models.

To apply these algorithms and transform our large corpora in reasonable timeframes, we utilize distributed systems to run the processing in parallel; indeed, this is an ideal application for a map/reduce system.

The Obvious Choice: EMR

Amazon Elastic MapReduce (EMR) is a service that automatically launches server instances and deploys Apache Hadoop to them. It is a great out-of-the-box solution, but it has its issues:

  • Cost: launching 1,000 spot medium-size instances (m3.2xlarge) currently costs $70 per hour plus $140 for EMR. A 200% premium for automated deployment of an open-source software is significant at scale.
  • Stability: there are issues of Hadoop job failures without retries due to segmentation faults or the dreaded Out Of Memory (OOM) killer.
  • Performance: Hadoop is slower than it could be due to the architecture that moves data to the disk and back several times.
  • Pipelining: There is no automatic way to pipe output of one Hadoop job to another one.

Given these challenges, we needed to find more performant and cost-effective solutions.

Alternatives

As an experiment, we boldly (and perhaps naïvely) implemented our own alternative solution. The initial benchmarks were compelling: it was an order of magnitude faster, and 36 times cheaper. But stability continued to be a challenge, and we rapidly realized that the engineering costs of developing a fully robust, recoverable pipeline would be more than we would be willing to invest.

Fortuitously, at about the same time, Apache Spark started gaining traction, along with a simple mechanism to deploy it to AWS. From the outset, Spark appeared to be perfect for us: benchmark tests showed it to be four times faster than Hadoop, it was far cheaper than EMR, and it even provided an aggregation pipeline. Additionally, our initial testing did not reveal the OOM instability we had encountered with EMR.

Furthermore, another team within Grammarly, our data analytics team, migrated to Spark from Apache Pig; they had experienced problems with debugging and testing with Pig. Their impression was quite positive from the beginning because they found Spark to be much simpler to debug and test; Pig scripts, in general, emit cryptic error messages, and mechanisms to unit test them are quite limited.

As we began our adoption of Spark, we started hitting problems fairly quickly, caused primarily by the size of our data sets. The first challenge, for example, was a major priority issue: Spark failed to launch a cluster with more than 100 nodes. Furthermore, such issues were difficult to find: we had to launch a full-size cluster numerous times and wait for hours to reproduce some bugs or notice the result of a wrong configuration value. Our initial (and, again, perhaps naïve?) optimism rapidly faded.

Fixing Spark

We realized that Spark was our best hope, and made the choice to invest in stabilizing it for our needs. We ended up spending about four man-months to get it running. Here is an account of the problems we encountered, along with our solutions:

Issue Workaround
Driver OOM while using reduceByKey decrease spark.default.parallelism
Java killed by OOM killer on a slave node change formula in /root/spark-ec2/deploy_templates.py on master node:
spark_mb = system_ram_mb * 4 // 5
SPARK-4808 Spark fails to spill with small number of large objects update Spark to 1.4.1
SPARK-5077 Map output statuses can still exceed spark.akka.frameSize set("spark.akka.frameSize", "128")
SPARK-6246 spark-ec2 can't handle clusters with > 100 nodes apply the patch for deploy script
SPARK-6497 Class is not registered: scala.reflect.ManifestFactory$$anon$9 don’t force Kryo class registration
HADOOP-6254 s3n fails with SocketTimeoutException use S3A file system in Hadoop 2.7.1
S3 HEAD request failed for ... - ResponseCode=403, ResponseMessage=Forbidden the same
gzip input files are not splittable. (We need to save storage space by archiving datasets. The default choice gzip format doesn’t allow random read access. This is not a bug, but it greatly increases the probability of failing with other issues and degrades performance.) use bzip2 compression for input files
HADOOP-7823 port HADOOP-4012 to branch-1 (splitting support for bzip2) update to Hadoop 2.7.1
HADOOP-10614 CBZip2InputStream is not threadsafe the same
HADOOP-10400 Incorporate new S3A FileSystem implementation the same
HADOOP-11571 Über-jira: S3a stabilisation phase I the same
SPARK-6668 repeated asking to remove non-existent executor install Hadoop 2.7.1 native libraries
SPARK-5348 s3a:// protocol and hadoop-aws dependency build Spark with patch
Stack Overflow - How to access s3a:// files from Apache Spark? --conf spark.hadoop.fs.s3a.access.key=...
--conf spark.hadoop.fs.s3a.secret.key=...
HADOOP-9565 Add a Blobstore interface to add to blobstore FileSystems use DirectOutputCommitter (see here)
Timeout waiting for connection from pool conf.setInt("fs.s3a.connection.maximum", 100)

Reproducing many of these problems is daunting; for some it takes a cluster with 100+ nodes and 10+ TB data. Also note that most of the issues still derive from Hadoop because Spark is built on top of it.

The Hardest Button

Of all the problems we tackled, the following error message from Spark was the biggest mystery:

S3 HEAD request failed for "file path" - ResponseCode=403, ResponseMessage=Forbidden.

The bug wasn’t always reproducible. Some files were read successfully while some were not. We checked S3 file access rights several times, but they were perfectly fine. We even made input files publicly accessible, but it didn’t help. According to the AWS documentation, there are about 30 different reasons for such a message. AWS also sends a more concrete reason in the response body, although, amusingly, this is an HTTP HEAD request and so the response body was, of course, nowhere to be found.

After reading the AWS docs carefully, we tried all sorts of odd ideas, including a wild theory that perhaps unsynchronized time on cluster nodes might be a problem. We checked the time on all servers, and it was one second away from real time while the AWS threshold is 15 minutes. We also checked several other ideas that a Google search suggested, like URL escaping slashes in the S3 credentials and generating credentials without slashes. Still no luck.

Our next step was to crank up S3 logging; this revealed that AWS was indeed complaining that time was unsynchronized. Desperate and confused, we started inspecting Spark source. It uses Hadoop FileSystem for reading input files, which in turn uses JetS3t library, which uses Apache HttpClient. At first glance, the code seemed to be correct. The S3 request is sent as soon as current time and authentication headers are appended. But then one more wild guess came to us. What if HttpClient puts the request in the queue instead of sending it? It turned out it does. For some reason, HttpClient had a default limit of four simultaneous requests, whereas Spark nodes were using eight simultaneous downloads that ran for more than 15 minutes. By the time the queued request was popped out, it was already out of date.

To fix the problem, we just needed to change an HttpClient parameter. Unfortunately, it is in a library used by a library… (libraries all the way down.) After many evil hacks, it turned out that Hadoop 2.7.1 had a brand new implementation of S3 filesystem called S3A. In spite of using different libraries, S3A still had the same bug. Fortunately, the developers had been wise enough to extract the required limit to config, and we were able to end our long-running nightmare with a simple config update!

Our Ultimate Spark Recipe

In the end, this is our working setup:

  1. Build Spark 1.4.1 with Hadoop 2.7.1 with a patch.

  2. Apply a patch for Amazon EC2 deploy scrip

  3. Change formula in /root/spark-ec2/deploy_templates.py on master node: spark_mb = system_ram_mb * 4 // 5. You can do it by modifying Amazon EC2 deploy script.

  4. Use bzip2 compression for input files Use S3A Hadoop filesystem.

  5. Specify S3 credentials during submit with --conf spark.hadoop.fs.s3a.access.key=... and --conf spark.hadoop.fs.s3a.secret.key=....

  6. Use Spark configuration provided below

    SparkConf sparkConf = new SparkConf()
    .set("spark.task.maxFailures", "21") // increased task retries from 4 to 21
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // effective serialization
    .registerKryoClasses(classes) // register classes that are going to be serialized
    .set("spark.akka.frameSize", "128"); // 128 MB, workaround for https://issues.apache.org/jira/browse/SPARK-5077
    
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    Configuration hadoopConf = ctx.hadoopConfiguration();
    hadoopConf.setInt("fs.s3a.connection.maximum", 100); // S3 download parallelism limit
    hadoopConf.set("mapred.output.committer.class", DirectOutputCommitter.class.getName()); // don't use _temporary folder
    hadoopConf.setBoolean("fs.s3a.fast.upload", true); // direct upload to S3 without temporary files
    int multipart_size = 10 * 1024 * 1024; // upload with 10 MB parts, this limits files to 10 GB
    hadoopConf.setInt("fs.s3a.multipart.threshold", multipart_size);
    hadoopConf.setInt("fs.s3a.multipart.size", multipart_size);
    

Beyond this basic setup, we performed some extra optimization steps for our use cases. Because spot prices differ among regions and zones, AWS provides an API for requesting the spot pricing history. We wrote a script that searches for the cheapest zone, which easily reduces the overall cost by half. Another optimization is Spark EC2 deployment; it takes approximately five hours to deploy 1,000 nodes with the default script because it downloads all the software to all the nodes and visits the nodes via SSH many times. We created an AMI with pre-installed software and excluded unneeded startup actions, which sped up our deployment by an order of magnitude.

Was It All Worth It?

Given the engineering efforts, the net cost to get Spark running at our scale ended up being about the same as just using EMR to perform the same tasks. However, over a longer term, we do expect substantially lower overall costs. We hope that our findings will help you avoid our initial investments and help bring Spark to wider adoption in crunching really big data sets!