A blog about security, privacy, algorithms, and email in the enterprise. 

Viewing entries tagged


How to Partition a Slave Node’s Disks

[Image via BSB]

In a typical Hadoop slave node, a node running as a datanode and tasktracker, it is typical to provide 75% of the disk for HDFS storage and the remaining 25% for MapReduce intermediate data. The MapReduce intermediate data is the data created after a map task has run over an input split, typically a HDFS block.  Given a single disk, this is simple task, but with multiple slavenode disks the decision becomes more complicated. We want to choose the best disk configuration to maximize all available resources.

Assume we have 4x 1TB disks in our example slavenode.

The logical assumption would be to assign 3x 1TB disks for HDFS storage and 1x 1TB disk for MapReduce intermediate data storage. The problem with this approach is that we sacrifice potential HDFS throughout by assigning one full volume to only store MapReduce intermediate data.

The better approach is to store both HDFS and MapReduce intermediate data on each disk on the slave node. This can be accomplished a few different ways. One way would be to use separate partitions, but using this method would leave you stuck if you ever needed to change the percentage split (e.g. allocated more storage for HDFS or MapReduce intermidate data). Another way is to use the dfs.datanode.du.reserve configuration property in hdfs-site.xml  to control the split by specifying separate directories for each on the same volume. This would allow you to modify the capacity available to HFDS on the fly after a namenode restart.

Another inserting solution, I heard a classmate say, is to use the native Linux file system user disk quota system. This should work because the MapReduce daemon and HDFS daemon both run under separate user accounts, but I’m not sure how thoroughly it has been tested.



FSDataInputStream from a byte array

We're in the process of adding Hive support to Timberwolf, which involves writing files into HDFS so that they can get loaded into Hive tables. Writing to HDFS involves FSDataOutputStreams and FSDataInputStreams, which are all fine and good until you want to start writing tests. My normal approach when testing something that writes to a stream is to create it with a stream that's ultimately backed by a byte array (generally through ByteArrayOutputStream), then pull those bytes out and verify that they're all what I expect them to be. In this case, I was writing a sequence file, so I figured I could use SequenceFile.Reader to pull out my key/value pairs and check that they're correct. That is, until I tried constructing an FSDataInputStream with a ByteArrayInputStream.

Turns out, FSDataInputStream imposes requirements on its backing streams that aren't reflected in the constructor's type signature: FSDataInputStream#FSDataInputStream. So I needed to get a stream that I could construct from a byte array that also implemented PositionedReadable and Seekable. As it turns out, there isn't one of those in the org.apache.hadoop.fs namespace, so I went ahead and rolled my own: SeekablePositionedReadableByteArrayInputStream. It's not complete, since I wasn't sure what exactly seekToNewSource should do and I didn't need it for my tests, but it gets enough of the job done. Maybe it'll help you, too?


1 Comment

Cloudera Administer Training for Apache Hadoop Recap: Part 3

I recently attended Cloudera’s Hadoop Training for Administrators course in Columbia, MD. You can read my recap of the first and second days here and here. On the third day, we covered cluster maintenance, monitoring, and benchmarking,  job logging, and data importing.   Cluster Maintenance

  • Common tasks include checking HDFS status, copying data between clusters, adding and removing nodes, rebalancing the cluster, namenode metadata backup and cluster upgrading.
  • HDFS clusters can become unbalanced when new nodes added to the cluster leading to performance issues.
  • Clusters can be rebalanced using the balancer command, which adjusts block placements on nodes within a set threshold value. The balancer command should only be used after adding new nodes to a cluster

 Namenode backup       

  • Single point of failure (at this point in time).
  • If namenode metadata is lost then the cluster is lost.
  • The fsimage and edits file are the two primary files that write metadata on disk. The fsimge file doesn’t write every change to HDFS file metadata onto disk; rather it appends changes incrementally to the edits log.
  • At startup, the namenode loads the fsimage into ram then replays all entries from the edits log.The two logs are merged at set intervals on the Checkpoint node (aka secondary name node). This node copies both files loads them into RAM, merges them, then copies the results back to the Namenode.
  • The checkpoint node does store a copy of these two files but depending on the last merge the data could be stale. It’s not meant to be a failover node, but more of a housekeeper.
  • Wrigley recommends writing to two local directories on difference physical volumes and to a NFS directory. You can also retrieve copies of the namenode meta data over HTTP:
    • fsimage: http://<namenode>:50070/getimage?getimage=1
    • edits: http://<namenode>:50070/getimage?getedit=1

Cluster monitoring and Troubleshooting

  • Use a general system monitoring tool like Nagios, Cacti, etc.. to monitor your cluster.
  • Monitor Hadoop daemons, disk and disk partitions, cpu usage, and swap network transfer speeds.


  • Log location is controlled in
  • Typically set to /var/log/hadoop
  • Each daemon writes to logs
  • .log is the first port of call for diagnosis issues. It uses log4j. The Configuration for log4j is stored at conf/
  • Logs are rotated daily.
  • Old logs are not deleted.
  • .out is the combination of stdout and stderr, and doesn’t usually contain much output.
  • Appenders are the destination for log messages
  • The one that ships with Hadoop, DailyrollingFileAppender, is limited. It rotates the log file daily, but doesn’t limit size of log or number of files—the admin has to provide scripts to manage this.
  • CDH ships an alternate appender called RollingFileAppender that addresses the limitation of the default appender.


Job logs created by Hadoop       

  • When a job runs 2 files are created, the Job XML configuration file and the job status file.These files are stored in multiple places on local disk and in HDFS:
    • Hadoop_log_dir/<job_id>_conf.xml (default is 1 day)
    • Hadoop_log_dir/history (default is 30 days)
    • <job_output_dir_in_HDFS)/_logs/history (default is forever)
    • Jobtracker also keeps them in memory for a limited time
  • Developer logs are stored in the Hadoop_log_dir/userlogs (location is hardcoded). You should be wary of large developer logs files, as they can cause salve nodes to run out of space. By default, dev logs are deleted every 24 hours.


Users or monitoring: Monitoring the Cluster with Ganglia

  • We discussed several general system monitoring tools, but none of them integrate with Hadoop.
  • Ganglia is designed for clusters. It integrates with Hadoop’s metrics-collection system, but doesn’t provide alerts.


Benchmarking a cluster

  • Standard benchmark is Terasort
    • Example: Generate a 10,000,000 line file, each line containing 100 bytes, then sort the file:
Hadoop jar $HADOOP_HOME/hadoop-*-examples.jar teragen 10000000 input-dir
Hadoop jar $HADOOP_HOME/hadoop-*-examples.jar terasort input-dir output-dir
  • Predominantly benchmarks are used to test network and disk i/o.
  • You should test clusters before and after adding nodes to establish a baseline. It’s also good to do before and after upgrades.
  • Cloudera is working on a benchmarking guide.

Populating HDFS from External Resources

  • Flume is a distributed, reliable, available service for moving large amounts of data as it is produced. It was created at Cloudera as a spinoff of Facebook’s Scribe.
    • Flume is ideally suited for gathering logs from multiple systems as they are generated.
    • It’s configurable through a web browser or CLI., and can be extended by adding connectors to existing storage layers or data platforms.
    • General sources already provided include data form files, syslog, and stdout from a process.
    • Wrigley said there were some latency issues with Flume that are being fixed in the next minor version.
  • SQOOP is the SQL-to-Hadoop database import tool. It was developed at Cloudera, is open-source, and is included as port of CDH (It’s about to become a top level Apache project).
    • Sqoop uses JDBC to connect to RDBMS.
    • It examines each table and automatically generates a Java class to import into HDFS then creates and runs a Map-only MR job to import the data. (Aside: per Mike Olson, you would have to be crack-pipe crazy to run MapReduce 2 in production.)
    • By default, four mappers connect to the RDBMs, and each imports a quarter of the data.
    • Sqoop features:
      • Imports a single table, or all tables in a database
      • Can specify which rows to import with a  WHERE clause
      • Can specify columns to import
      • Can provide an arbitrary SELECT statement
      • Can automatically create a Hive table based on imported data
      • Supports incremental imports of data
      • Can export data from HDFS back to a database table
    • Cloudera has partnered with third parties (Oracle, MicroStrategy, and Netezza) to create native Sqoop connectors that are free but not open source.
    • MicroStrategy has their own version of SQOOP for SQL server derived from SQOOP open source.


Best practices for importing data

  • Import data to an intermediate directory in HDFS; then once it’s completely uploaded in HDFS, move it to the final destination. This prevents other clients from believing the file is there until it is completely there and ready to be processed.


Installing and managing other Hadoop projects

  • Hive metabase should be stored in RDBMS such as MySQL. This is a simple configuration:
    • Create a user and database in RDBMs
    • Modify hive-site.xml on each user’s machine to point to the shared Metastore

1 Comment


Review: Karmasphere Analyst

This week, I took some time to evaluate Karmasphere  Analyst. Particularly, I was interested in how it worked with Hadoop (as opposed to MapR, which it also supports).

Setting up

The setup for Karmasphere is rather painless: a simple installer on windows and a shell script on Linux. However, the windows version does require cygwin. Once open, Karmasphere divides itself into three major steps.


This is where you set up connections to existing HDFS databases. Karmasphere only supports Hive, but it's pretty nice about it... kind of. It will go through the process of installing Hive for you through a rather nice GUI, which allows you to easily specify a Derby database, MySQL database, or whatever other database you have a Java connector for. The downside to this is you can't easily use an already-existing Hive installation. This was a major shortcoming for me, but I get the impression that it should be possible to import an existing Hive database. I'll let you know as soon as the Karmasphere rep gets back to me.


Once I decided to install a new Hive metastore (which was rather painless), importing new tables from sequence files was simple for all the steps that involved Karmasphere (making the sequence file was annoying though). I don't have a problem with how Karmasphere does this. My only real problem is that it seems to hide away the shell that interacts with the Hive cluster Karmasphere uses, which seems like it might be limiting. I could be wrong, but I don't see how you could ever import anything without working through Karmasphere.


Supposedly, this is where the magic happens. The interface here was much simpler compared to other analytic tools. But that may be because there is not fancy drag-and-drop interface, or amazing visual features. It turns out Karmasphere is a glorified query writer. But in its defense, it's very glorified. I've written queries against Hive before, but I've never managed to write them as quickly or as painlessly as Karmasphere allows me to. The bells and whistles it brings to the table include:

  • immediate and clear feedback regarding any errors or warnings in your queries
  • one-click execution of any written queries
  • caching of past queries and results
  • effective sampling of data to test queries on smaller subsets
  • Table, column, and function library indexes
  • A "Query Plan" which shows you just how exactly your query will translate into Hadoop map-reduces

Once you have your data, it's pretty simple to export that data into various useful mediums such as Excel files, SQL tables, or perhaps back into Hive. Also, there is some charting functionality that was relatively simple to use, although I didn't look too much into it since it wasn't of interest to me.


All this makes the tool worthwhile, but I'm not sure it's worth the price (we were unable to obtain pricing information at time of publication, but will update if they get back to us). Since ultimately, you are just making queries, it doesn't add any additional analytic functionality that we couldn't do before. Technically, once you make your query, you don't even need Karmasphere anymore. Although once you have your data, it does let you do several things with that data that would otherwise be difficult to do (export, graphing, etc...).

If you're looking to analyze your unstructured data, I would say Karmasphere is ill-suited for the task, as unstructured data tends to take more than just the SQL-like queries Hive offers. All in all, this product is useful. But once my trial runs up, I will discontinue use.


Mo' Data, Mo' Problems, E02: HDFS

In this episode of our big data series, we're talking Hadoop's distributed file system (HDFS), which kind of acts like a virtual singles bar for the client, namenode and datanodes that map and reduce your data.

Mo' Data, Mo' Problems, E01: HBase

In the inaugural episode of our big data series, we give you a high-level overview of HBase, the transactional database built on top of HDFS. Recommended for anyone who enjoys OLTP, random reads and writes and extended metaphors.