[Image via BSB]

In a typical Hadoop slave node, a node running as a datanode and tasktracker, it is typical to provide 75% of the disk for HDFS storage and the remaining 25% for MapReduce intermediate data. The MapReduce intermediate data is the data created after a map task has run over an input split, typically a HDFS block.  Given a single disk, this is simple task, but with multiple slavenode disks the decision becomes more complicated. We want to choose the best disk configuration to maximize all available resources.

Assume we have 4x 1TB disks in our example slavenode.

The logical assumption would be to assign 3x 1TB disks for HDFS storage and 1x 1TB disk for MapReduce intermediate data storage. The problem with this approach is that we sacrifice potential HDFS throughout by assigning one full volume to only store MapReduce intermediate data.

The better approach is to store both HDFS and MapReduce intermediate data on each disk on the slave node. This can be accomplished a few different ways. One way would be to use separate partitions, but using this method would leave you stuck if you ever needed to change the percentage split (e.g. allocated more storage for HDFS or MapReduce intermidate data). Another way is to use the dfs.datanode.du.reserve configuration property in hdfs-site.xml  to control the split by specifying separate directories for each on the same volume. This would allow you to modify the capacity available to HFDS on the fly after a namenode restart.

Another inserting solution, I heard a classmate say, is to use the native Linux file system user disk quota system. This should work because the MapReduce daemon and HDFS daemon both run under separate user accounts, but I’m not sure how thoroughly it has been tested.

Posted
AuthorJim Stallings
CategoriesHadoop
2 CommentsPost a comment

I recently attended Cloudera’s Hadoop Training for Administrators course in Columbia, MD. You can read my recap of the first and second days here and here. On the third day, we covered cluster maintenance, monitoring, and benchmarking,  job logging, and data importing.   Cluster Maintenance

  • Common tasks include checking HDFS status, copying data between clusters, adding and removing nodes, rebalancing the cluster, namenode metadata backup and cluster upgrading.
  • HDFS clusters can become unbalanced when new nodes added to the cluster leading to performance issues.
  • Clusters can be rebalanced using the balancer command, which adjusts block placements on nodes within a set threshold value. The balancer command should only be used after adding new nodes to a cluster

 Namenode backup       

  • Single point of failure (at this point in time).
  • If namenode metadata is lost then the cluster is lost.
  • The fsimage and edits file are the two primary files that write metadata on disk. The fsimge file doesn’t write every change to HDFS file metadata onto disk; rather it appends changes incrementally to the edits log.
  • At startup, the namenode loads the fsimage into ram then replays all entries from the edits log.The two logs are merged at set intervals on the Checkpoint node (aka secondary name node). This node copies both files loads them into RAM, merges them, then copies the results back to the Namenode.
  • The checkpoint node does store a copy of these two files but depending on the last merge the data could be stale. It’s not meant to be a failover node, but more of a housekeeper.
  • Wrigley recommends writing to two local directories on difference physical volumes and to a NFS directory. You can also retrieve copies of the namenode meta data over HTTP:
    • fsimage: http://<namenode>:50070/getimage?getimage=1
    • edits: http://<namenode>:50070/getimage?getedit=1

Cluster monitoring and Troubleshooting

  • Use a general system monitoring tool like Nagios, Cacti, etc.. to monitor your cluster.
  • Monitor Hadoop daemons, disk and disk partitions, cpu usage, and swap network transfer speeds.

Logging

  • Log location is controlled in hadoop-dev.sh
  • Typically set to /var/log/hadoop
  • Each daemon writes to logs
  • .log is the first port of call for diagnosis issues. It uses log4j. The Configuration for log4j is stored at conf/log4j.properties.
  • Logs are rotated daily.
  • Old logs are not deleted.
  • .out is the combination of stdout and stderr, and doesn’t usually contain much output.
  • Appenders are the destination for log messages
  • The one that ships with Hadoop, DailyrollingFileAppender, is limited. It rotates the log file daily, but doesn’t limit size of log or number of files—the admin has to provide scripts to manage this.
  • CDH ships an alternate appender called RollingFileAppender that addresses the limitation of the default appender.

 

Job logs created by Hadoop       

  • When a job runs 2 files are created, the Job XML configuration file and the job status file.These files are stored in multiple places on local disk and in HDFS:
    • Hadoop_log_dir/<job_id>_conf.xml (default is 1 day)
    • Hadoop_log_dir/history (default is 30 days)
    • <job_output_dir_in_HDFS)/_logs/history (default is forever)
    • Jobtracker also keeps them in memory for a limited time
  • Developer logs are stored in the Hadoop_log_dir/userlogs (location is hardcoded). You should be wary of large developer logs files, as they can cause salve nodes to run out of space. By default, dev logs are deleted every 24 hours.

 

Users or monitoring: Monitoring the Cluster with Ganglia

  • We discussed several general system monitoring tools, but none of them integrate with Hadoop.
  • Ganglia is designed for clusters. It integrates with Hadoop’s metrics-collection system, but doesn’t provide alerts.

 

Benchmarking a cluster

  • Standard benchmark is Terasort
    • Example: Generate a 10,000,000 line file, each line containing 100 bytes, then sort the file:
Hadoop jar $HADOOP_HOME/hadoop-*-examples.jar teragen 10000000 input-dir
Hadoop jar $HADOOP_HOME/hadoop-*-examples.jar terasort input-dir output-dir
  • Predominantly benchmarks are used to test network and disk i/o.
  • You should test clusters before and after adding nodes to establish a baseline. It’s also good to do before and after upgrades.
  • Cloudera is working on a benchmarking guide.

Populating HDFS from External Resources

  • Flume is a distributed, reliable, available service for moving large amounts of data as it is produced. It was created at Cloudera as a spinoff of Facebook’s Scribe.
    • Flume is ideally suited for gathering logs from multiple systems as they are generated.
    • It’s configurable through a web browser or CLI., and can be extended by adding connectors to existing storage layers or data platforms.
    • General sources already provided include data form files, syslog, and stdout from a process.
    • Wrigley said there were some latency issues with Flume that are being fixed in the next minor version.
  • SQOOP is the SQL-to-Hadoop database import tool. It was developed at Cloudera, is open-source, and is included as port of CDH (It’s about to become a top level Apache project).
    • Sqoop uses JDBC to connect to RDBMS.
    • It examines each table and automatically generates a Java class to import into HDFS then creates and runs a Map-only MR job to import the data. (Aside: per Mike Olson, you would have to be crack-pipe crazy to run MapReduce 2 in production.)
    • By default, four mappers connect to the RDBMs, and each imports a quarter of the data.
    • Sqoop features:
      • Imports a single table, or all tables in a database
      • Can specify which rows to import with a  WHERE clause
      • Can specify columns to import
      • Can provide an arbitrary SELECT statement
      • Can automatically create a Hive table based on imported data
      • Supports incremental imports of data
      • Can export data from HDFS back to a database table
    • Cloudera has partnered with third parties (Oracle, MicroStrategy, and Netezza) to create native Sqoop connectors that are free but not open source.
    • MicroStrategy has their own version of SQOOP for SQL server derived from SQOOP open source.

 

Best practices for importing data

  • Import data to an intermediate directory in HDFS; then once it’s completely uploaded in HDFS, move it to the final destination. This prevents other clients from believing the file is there until it is completely there and ready to be processed.

 

Installing and managing other Hadoop projects

  • Hive metabase should be stored in RDBMS such as MySQL. This is a simple configuration:
    • Create a user and database in RDBMs
    • Modify hive-site.xml on each user’s machine to point to the shared Metastore
Posted
AuthorJim Stallings
CategoriesEvents, Hadoop

I recently attended Cloudera’s Hadoop Training for Administrators course in Columbia, MD. You can read my recap of the first day here. In the second day, we learned how to deploy a cluster, install and configure Hadoop on multiple machines, and manage and schedule MapReduce jobs.

1. Deploying your cluster:

  • There are 3 different deployment types:
    • Local (dev)
      • No daemons, data is stored on the local disk no HDFS, everything runs in a single JVM
    • Psuedo distributed (dev/debugging b4 production)
      • All daemons run locally individual JVMs
      • Can be thought of as a single machine cluster
    • Cluster (production):
      • Hadoop daemons run a cluster of machines.

2. Installing Hadoop on multiple machines:

  • Don’t manually install linux on every node.
  • Use some kind of automated deployment. (i.e. RedHat Kickstart)
  • Build a standard slave machine image:
    • Reimage a machine rather than troubleshooting software issues
    • CDH is avalaible in multiple formats (packages and tarballs). Packages are recommended because they include some features not in the tarball.
  • Startup scripts can also install the hadoop native libraries. They provide better performance for some Hadoop components.
    • Hadoop includes scripts called start-all.sh and stop-all.sh. These connect to TTs and DNs to start and stop daemons. Don’t use these scripts, as they require passwordless SSH access to slave nodes.
    • Hadoop does not use SSH for any internal communications.
    • You can verify installation with sample jobs shipped with Hadoop
    • Use CD Manager tool for easy deployment and configuration
  • Hadoop Configuration Files:
    • Each machine in the cluster has its own configuration files that reside in /etc/hadoop/conf
    • Primary files are written in XML
    • .20 onwards configurations have been separated out based on functionality
      • Core properties: core-site.xml
      • HDFS properties: hdfs-site.xml
      • MapReduce properties: mapred-site.xml
      • Hadoop-env.sh sets some environment variables.
    • A lot of the default values are performance stale because they are based on a couple year old hardware specs.
    • Best configuration practices are still in the works b/c of the age of Hadoop. Cloudera hopes to have a guide out sometime this year.
  • Rack awareness:
    • Distributes HDFS blocks based on a host’s location
    • Important for cluster reliability, scalability, and performance.
    • The default setting places all nodes in a single rack /default-rack
    • Script can use a flat file, database, etc.
    • A common scenario is the name your hosts back on rack location so a script can simple deconstruct the name to find the host’s location.
    • You can use IP addresses of names to indentify nodes in Hadoop’s configuration file.
    • Most people use names rather than IPs
    • Cluster daemons generally need to be restarted to read in configuration file changes
  • Configuration Management tools:
    • Use configuration management software to manage machine configuration changes at once
    • Start early; retrofitting these changes is always a pain.
    • There are alternatives to using Cloudera Manager like Puppet and Chef

3. Managing and Scheduling jobs

  • You can control MapReduce jobs at the command line or through the web interface. We went over two job schedulers:
    • FIFO is the default scheduler shipped with Hadoop. With FIFO, Jobs run in order submitted, but this is generally a bad idea when multiple people are using the cluster. The only exception to this is when the developer specifies the priority, but if all jobs are given the same priority then you get the idea.
    • Fair Scheduler is shipped with CDH. Fair scheduler is designed for multiple users to share the cluster simultaneously. MapReduce tasks are split up into different pools and are allocated based on pool configuration parameters.
Posted
AuthorJim Stallings
CategoriesEvents, Hadoop
3 CommentsPost a comment

apache ahdoop admin trainingI recently attended Cloudera’s Hadoop Training for Administrators course in Columbia, MD. Over the next few posts, I’ll discuss how it went and what I learned. All 28 chairs in the computer lab were full. There was another Cloudera employee sitting in to review and update the certification exam.

Our trainer was Ian Wrigley (@iwrigley). He has been with Cloudera for about 2 years. He is the current manager for class curriculum and certification materials. He is originally from the UK (still has the accent) and currently lives in LA. Previously, he's taught for about 20 years on several different technology topics:Oracle, Sun, mysql, systems administration, java, c, php. He is the contributing editor for UNIX and open source for PC Pro in the UK.

The class was primarily made up of Linux systems administrators with some systems architects, dbas, and developers sprinkled in. It seemed like a large number of the class worked for or contracted with the government (expected for the location). A couple of people mentioned using Hadoop in the cloud. Several mentioned the need to migrate data from RDBMS into Hadoop. Everyone expressed the need to adequately manage their clusters. Cluster sizes seemed to range widely. I heard sizes of 13, 128, 270, 2000. Ian introduced us what Cloudera offers: CDH distributions, training, consulting, support, enterprise management software.

I asked if they support the entire stack that is shipped with a CDH distribution. Ian told me that if a paying customer finds a bug, Cloudera’s engineers will fix the bug and submit it back to the community.

Why use Hadoop? It solves the fundamental problem of resource bottlenecks on a single machine by distributing workload across a cluster of commodity machines. Hadoop provides the framework, HDFS for distributed file storage and MapR for distributing tasks across a cluster. There was a brief review of the Hadoop history, Google, Doug Cutting, and Nutch.

Here are a few notes on Hadoop architecture and hardware I scribbled down during class.

HDFS

  • All metadata is held in RAM for fast response:
    • Each items consumes 150-200 bytes of RAM
      • 1 item is the file name metadata (name, permissions, etc.)
      • 1 item for each file block.
      • Larger file size = less RAM
  • HDFS blocks are named blk_xxxxx:
    • A checksum file is also created when a file block is written to disk
  • Dealing with data corruption:
    • As a Datanode reads a block is also calculates checksum.
      • Live checksum is compared to old checksum
        • If different, the client reads from the next DataNode in the list
        • The NameNode is informed of the corrupt block and re-replicates it to another DataNode
    • The DataNode also verifies the checksum for blocks every three weeks after the block was created.
  • File permissions are very similar to Unix:
    • X (execute) is required for directories or you can access files within
  • Few people actually implement Keberos, they use firewalls or gateways
  • Secondary NameNode isn’t a not a HA or failover NameNode:
    • It’s only used to periodically combine the fsimage and edits log.

There are a couple of Hadoop Namenode enhancements in the upcoming CDH4 distribution. CDH4, once released, will add a federated Namenode, which allows two name nodes to answer requests, and HA Secondary Namenode to alleviate the current architecture where the Namenode is a single point of failure.

MapReduce

  • In between the Map and Reduce is the shuffle and sort
    • This the magic sauce; itsends data between Mappers and Reducers
  • Each Mapper processes a single input split from HDFS.
    • Often a HDFS block.
  • Intermediate data is written to local file system disk.
  • Output from Reducers is written back to HDFS.
  • If any task fails to report in 10 minutes is assumed to have failed.

Planning your Hadoop cluster.

Ian is in the“Save the money, buy more nodes” camp, stressing quantity over quality of servers.

  • On a small 4 node cluster, Master nodes’ daemons can run on a Slave Node
  • If the cluster has 10-20 nodes,  master noes should be placed on a separate node
  • Anything > 20 nodes then they should be on separate boxes.
  • Typical base config for a slave node 4 x 1TB HDs:
    • JBOD, no spanning, infact avoid raid controllers all together
    • 2 quad core CPUs
    • 24-32GB Ram
      • The factor here is really amount of HDFS blocks for name node RAM and the # of MapR commands that run on slave nodes.
    • Multiples of 1 HD, 2 core, 6-8 GB of RAM work for many types of applications
  • Hadoop nodes are seldom CPU-bound
    • Typically disk and network bound
  • Each map or reduce task will take 1GB to 2GB of RAM:
    • Ensure you have enough RAM to run all tasks plus overhead for the DN and TT daemons
  • Rule of thumb:
    • Total number of MR tasks = 1.5x number of cores
      • Not definitive, you have to consider RAM
  • 3.5 disks, 7200 RPM
  • More disk spindles over larger spindles
  • Good practice is 24TB per slave max.
  • No raid, no virtualization, no blades
  • Master nodes (JT, NN) are single points of failure.
    • Spend money on these nodes fully redundant.
  • Networking:
    • Hadoop is network intensive
    • Don’t cheap out on hardware
    • 1GB on nodes
    • Top of rack switches should be 10Gb/sec or faster
  • Choose an OS you’re confortable with:
    • CentOs widely used
    • RHEL
    • Cloudera often sees a mixture of the 2.
  • No Linux LVM.
  • Test diski/o:
    • i.e.
    • Hd parm –t /dev/sda1
    • Looking for greater than 70 MB/sec
  • Use a Common directory structure:
    • i.e. /data/<n/dfs/nn for the namenode.
  • Mount disks with the noatime:
    • Disables append file write created every time a file is touched.
  • Reduce the swappines of the system:
    • Vm.swappiness to 0 or 5 in /etc/sysctl.conf
    • This prevents linux from hogging remaining memory for file system cache after all daemons have started causing virtual memory usage.
  • Use ext3 or 4 file systems:
  • Increase the nofile ulimit for mapred and hdfs users to atleast 32k open files at a time:
    • Setting is in /etc/security/limits.conf
    • This increases the number of open files to a user.
  • Disable IPv6, SELinux
  • Use NTP
  • Use oracle JDK, 1.6

 

Posted
AuthorJim Stallings
CategoriesEvents, Hadoop
6 CommentsPost a comment