Distributed Systems + Middleware Hadoop Alessandro Sivieri Dipartimento di Elettronica, Informazione e Bioingegneria Politecnico di Milano, Italy [email protected] http://corsi.dei.polimi.it/distsys Politecnico di Milano Contents Introduction to Hadoop History MapReduce HDFS Pig Distributed Systems + Middleware: Hadoop 2 Politecnico di Milano INTRODUCTION TO HADOOP Distributed Systems + Middleware: Hadoop 3 Politecnico di Milano Data We live in a digital world that produces data at an impressive speed – – – – – – – As of 2012, 2.7 ZB of data exist (1 ZB = 1021 Bytes) NYSE produces 1 TB of data per day The Internet Archive grows by 20 TB per month The LHC produces 15 PB of data per year AT&T has a 300 TB database 100 TB of data uploaded daily on Facebook … Distributed Systems + Middleware: Hadoop 4 Politecnico di Milano Data Personal data is growing, too – E.g., photos: a single photo taken with a Nikon commercial camera takes about 6 MB (default settings); a year of family photos takes about 8 GB of space – … adding to the slices of personal stuff uploaded on social networks, video Websites, blogs and more Machine-produced data is also growing – Machine logs – Sensor networks and monitored data Distributed Systems + Middleware: Hadoop 5 Politecnico di Milano Data analysis Main problem: disk reading speed / capacity has not really improved – Solution: parallelize the storage and read less data from each disk – Problems: hardware replication, data aggregation Take, for example, a RDBMS (keeping in mind that the seeking time on disks measures the latency of the operations): – Updating records is fast: a B-Tree structure is efficient – Reading many records is slow: if access is dominated by seeking time, it is faster to read the entire disk (which operates at transfer time) Distributed Systems + Middleware: Hadoop 6 Politecnico di Milano Hadoop Reliable data storage: Hadoop Distributed File System Data analysis: MapReduce implementation Many tools for the developers – Easy cluster administration – Query languages • Some are similar to SQL – Column-oriented distributed databases on top of Hadoop – Structured to unstructured repositories and back – … Distributed Systems + Middleware: Hadoop 7 Politecnico di Milano Hadoop vs. The (existing) World RDBMS: – Disk seek time – Some types of data are not normalized (e.g., logs): MapReduce works well with unstructured data – MapReduce scales linearly (while a RDBMS does not) Volunteer computing (e.g., SETI@home) – Similar model, but Hadoop works in a localized cluster sharing high-performance bandwidth, while volunteer computing works over the Internet on untrusted computers performing other operations meanwhile Distributed Systems + Middleware: Hadoop 8 Politecnico di Milano Hadoop vs. The (existing) World MPI: – Works well for compute-intensive jobs, but network becomes the bottleneck when hundredths of GB of data have to be analyzed – Conversely, MapReduce does its best to exploit data locality by collocate the data with the compute node (network bandwidth is the most precious resource, it must not be wasted) – MapReduce operates at a higher level wrt MPI: data flow is already taken care – MapReduce implements failure recovery (in MPI the developer has to handle checkpoints and failure recovery) Distributed Systems + Middleware: Hadoop 9 Politecnico di Milano HADOOP HISTORY Distributed Systems + Middleware: Hadoop 10 Politecnico di Milano Brief history In 2002, Mike Cafarella and Doug Cutting started working on Apache Nutch, a new Web search engine In 2003, Google published a paper on the Google File System, a distributed filesystem, and Mike and Doug started working on a similar, open source, project In 2004, Google published another paper, on the MapReduce computation model, and yet again Mike and Doug implemented an open source version in Nutch Distributed Systems + Middleware: Hadoop 11 Politecnico di Milano Brief history In 2006, these two projects separated from Nutch and became Hadoop In the same year, Doug Cutting started working for Yahoo! and started using Hadoop there In 2008 Hadoop was used by Yahoo! (10000-core cluster), Last.fm, Facebook and the NYT In 2009, Yahoo! broke the world record for sorting 1 TB of data in 62 seconds, using Hadoop Since then, Hadoop became mainstream in industry Distributed Systems + Middleware: Hadoop 12 Politecnico di Milano Examples from the Real World Last.fm – Each user listening to a song (local or in streaming) generates a trace – Hadoop analyses these traces to produce charts • E.g., track statistics per user and per country, weekly top tracks… Facebook – Daily and hourly summaries over user logs • Products usage, ads campaigns… – Ad-hoc jobs over historical data – Long term archival store – Integrity checks Distributed Systems + Middleware: Hadoop 13 Politecnico di Milano Examples from the Real World Nutch search engine – Link inversion: find outgoing links that point to a specific Web page – URL fetching – Produce Lucene indexes (for text searches) Infochimps: explore network graphs – Social networks: Twitter analysis, measure communities – Biology: neuron connections in roundworms – Street connections: OpenStreetMap Distributed Systems + Middleware: Hadoop 14 Politecnico di Milano Hadoop umbrella HDFS: distributed filesystem MapReduce: distributed data processing model MRUnit: unit testing of MapReduce applications Pig: data flow language to explore large datasets Hive: distributed data warehouse HBase: distributed, column-oriented db ZooKeeper: distributed coordination service Sqoop: efficient bulk transfers of data over HDFS … Distributed Systems + Middleware: Hadoop 15 Politecnico di Milano MAPREDUCE Distributed Systems + Middleware: Hadoop 16 Politecnico di Milano MapReduce Model for analyzing large amounts of data Data has to be organized as a key – value dataset Two phases: – map(k1, v1) -> list(k2, v2), where the input domain is different from the output domain – (shuffle: intermediate phase to sort the output of map and group by key) – reduce(k2, list(v2)) -> list(v3), where the input and output domain is the same Distributed Systems + Middleware: Hadoop 17 Politecnico di Milano MapReduce on Hadoop Job: unit of work to be performed by the system – Input data – Map and Reduce implementations – Configuration Map task and Reduce task: smaller pieces of a job Jobtracker: node of the cluster coordinating the job Tasktracker: runs a task and reports to the jobtracker Split: part of the input Distributed Systems + Middleware: Hadoop 18 Politecnico di Milano MapReduce on Hadoop The jobtracker splits the input in parts, and a map task is run for each split Splits are run in parallel on all nodes – Hadoop tries hard to run a map task for a specific split in the same node where HDFS has saved that split • If not possible, at least in the same rack Map task output is written on disk (not on HDFS: intermediate data do not need replication) Reduce tasks receive map output through network (no data locality here) Final output is saved on HDFS Distributed Systems + Middleware: Hadoop 19 Politecnico di Milano MapReduce on Hadoop Distributed Systems + Middleware: Hadoop 20 Politecnico di Milano MapReduce on Hadoop (with multiple reduce tasks) Distributed Systems + Middleware: Hadoop 21 Politecnico di Milano MapReduce on Hadoop (no reduce) Distributed Systems + Middleware: Hadoop 22 Politecnico di Milano Intermediate data Combiner function – Aggregates data from several map outputs • To minimize network transfer • Hadoop decides if this is needed, it may not be executed at all – In a way, it can be seen as a local instance of the reduce function Shuffle – – – – Sorts each map output Transfers it to (one of the) reducers Merges it with other map outputs, maintaining sorting Everything can be configured • Memory, buffer sizes, parallel copies… check the book! Distributed Systems + Middleware: Hadoop 23 Politecnico di Milano MapReduce on Hadoop Everything is configurable – – – – – Number of map tasks Number of reduce tasks Data compression Custom serialization Memory management Profilers to understand the performances in detail Tools to enable streaming data in subsequent MapReduce runs, called workflows (Apache Oozie) – For complex problems Distributed Systems + Middleware: Hadoop 24 Politecnico di Milano MapReduce on Hadoop: versions Version 1: first developed version of Hadoop MR – The runtime contains the previously mentioned Job and Tasktrackers – Still developed (latest version: 1.2) Version 2: the new Hadoop MR – The runtime called YARN (Yet Another Resource Negotiator), substitutes (an generalizes) the previous trackers – New HDFS features – Different configurations and APIs The book mixes the two versions here and there… We will follow version 2 (=> Hadoop 2.2.0, the latest version) Distributed Systems + Middleware: Hadoop 25 Politecnico di Milano MapReduce example Main interface is written in Java – Hadoop is written in Java There are interfaces in many other languages – Hadoop is able to work in “streaming” mode, using Unix pipes – Other languages take advantage of this • • • • Python Ruby C++ … Distributed Systems + Middleware: Hadoop 26 Politecnico di Milano MapReduce example A simple dataset – Temperature and humidity measured through WSN motes in two rooms Timestamp Room 1341078338 18 3224 54 2999 1341078379 31 3186 49 2999 1341078398 18 3237 48 2999 1341078439 31 3184 49 2999 1341078458 18 3243 47 2999 1341078499 31 3180 49 2999 1341078518 18 3245 48 2999 1341078559 31 3178 51 2999 Distributed Systems + Middleware: Hadoop Temperature Humidity Battery 27 Politecnico di Milano MapReduce example Dataset – Sampling time: 5 minutes – Total dataset: about 566 days (more than 1.5 years) – CSV (only a few megs… it doesn’t really exploit Hadoop, but it will help us understand how it works) Calculations – Max temperature per day – Mean temperature per day – … Distributed Systems + Middleware: Hadoop 28 Politecnico di Milano MapReduce example Classes to be implemented – Mapper – Reducer – Job handler (the “main” file, launching the execution and waiting for results) – this will handle the configuration, too Configuration – Default filesystem (HDFS, but Hadoop can read from standard filesystems) – Default configuration for node and data managers Distributed Systems + Middleware: Hadoop 29 Politecnico di Milano Hadoop configurations Standalone (default configuration) – Runs on single JVM – No parallelization – Useful for debugging purposes Pseudo-distributed (single-host cluster) – Parallelization – Runs daemons on a single host Cluster – Full scale Hadoop Distributed Systems + Middleware: Hadoop 30 Politecnico di Milano Hadoop configuration We will see standalone and pseudo-distributed – I will include cluster configuration in the exercises, but it requires a certain amount of memory to be run For standalone, you don’t need to do almost anything – Just set JAVA_HOME environment variable in hadoopenv.sh, before executing hadoop itself Notice that Windows is NOT supported in production mode for Hadoop – Examples have been tested on Linux and OS X Distributed Systems + Middleware: Hadoop 31 Politecnico di Milano Mapper public class MeanTemperatureMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) { String line = value.toString(); String[] parts = line.split("\\s+"); if (parts.length < 3) { return; } long timestamp = Long.parseLong(parts[0]) * 1000; int mote = Integer.parseInt(parts[1]); if (mote != 18) { return; } int temperatureInt = Integer.parseInt(parts[2]); double temperature = temperatureInt / 100.0; Date date = new Date(timestamp); DateFormat df = new SimpleDateFormat("yyyyMMdd"); String dateKey = df.format(date); context.write(new Text(dateKey), new DoubleWritable(temperature)); } } Distributed Systems + Middleware: Hadoop 32 Politecnico di Milano Mapper Day Timestamp 20120630 1341078338 Room 20120630 1341078379 20120630 1341078398 31 18 Temperature Temperature Humidity 32.24 3224 54 Battery 2999 49 2999 18 32.37 3186 32.43 3237 48 2999 20120630 1341078439 31 32.45 3184 49 2999 1341078458 18 3243 47 2999 1341078499 31 3180 49 2999 1341078518 18 3245 48 2999 1341078559 31 3178 51 2999 Distributed Systems + Middleware: Hadoop 33 Politecnico di Milano Reducer public class MeanTemperatureReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) { double sum = 0; int counter = 0; for (DoubleWritable dw : values) { sum += dw.get(); ++counter; } context.write(key, new DoubleWritable(sum / counter)); } } Distributed Systems + Middleware: Hadoop 34 Politecnico di Milano Reducer Day Temperature Mean temperature 20120630 32.24 32.3725 20120630 32.37 20120630 32.43 20120630 32.45 Distributed Systems + Middleware: Hadoop 35 Politecnico di Milano Job handler public class MeanTemperatureJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = Job.getInstance(getConf(), "Mean temperature"); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MeanTemperatureMapper.class); job.setReducerClass(MeanTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MeanTemperatureJob(), args); System.exit(exitCode); } Distributed Systems + Middleware: Hadoop 36 } Politecnico di Milano Run MapReduce Java classes are compiled through Maven – The example will contain a sample pom.xml file To run the example: – mvn package – export HADOOP_CLASSPATH=somename.jar – hadoop MeanTemperatureJob –fs file:/// input/file.csv output Output directory MUST not exist Notice the “fs” option… Distributed Systems + Middleware: Hadoop 37 Politecnico di Milano HDFS Distributed Systems + Middleware: Hadoop 38 Politecnico di Milano HDFS The previous example used the OS filesystem – Hadoop does not have any problem with that But, if we want to move to pseudo-distributed and cluster modes, we need to start using HDFS Hadoop Distributed File System is designed to store very large files suitable for streaming data access Distributed Systems + Middleware: Hadoop 39 Politecnico di Milano Name and data nodes HDFS has a single* namenode (master) and several datanodes (slaves) The namenode manages the filesystem namespace (i.e., metadata) – It maintains a namespace image and an edit log – It has a list of all the datanodes and which blocks they store – It does not maintain any file by itself * From version 2, Hadoop added the concept of HDFS federation – Backup namenodes! Distributed Systems + Middleware: Hadoop 40 Politecnico di Milano File distribution A datanode is a machine storing blocks of files, continuously communicating with the namenode – Without a namenode, a datanode is useless Each file uploaded to HDFS is split in blocks of 64 MB each – Minimize seek, maximize transfer rate – If a file (or the last block of a file) is smaller than 64 MB, it does not occupy 64 MB • This is different from OS filesystems A file can be bigger than a single disk in the network Distributed Systems + Middleware: Hadoop 41 Politecnico di Milano File distribution Distributed Systems + Middleware: Hadoop 42 Politecnico di Milano Interactions Command-line interface – ls, mkdir, copyFromLocal, cat… Third-party tools – Fuse Java APIs Distributed Systems + Middleware: Hadoop 43 Politecnico di Milano Writable datatypes Mappers and Reducers used particular types – Text, DoubleWritable, LongWritable… Hadoop defines specific types “wrapping” Java types, optimizing network serialization To add new types, you can implement Writable and WritableComparable Distributed Systems + Middleware: Hadoop 44 Politecnico di Milano MapReduce example (reprise) Let’s assume we want to output the mean temperature per day and per mote We need to use as key in our key-value pairs the pair day – mote Create a new class and extend WritableComparable – Comparable is needed because Hadoop sorts keys before reducing Distributed Systems + Middleware: Hadoop 45 Politecnico di Milano MapReduce example (reprise) public class RoomDayWritable implements WritableComparable<RoomDayWritable> { private Text date; private IntWritable mote; public RoomDayWritable(String date, int mote) { this.date = new Text(date); this.mote = new IntWritable(mote); } @Override public void write(DataOutput out) throws IOException { this.mote.write(out); this.date.write(out); } @Override public void readFields(DataInput in) throws IOException { this.mote.readFields(in); this.date.readFields(in); } @Override public int hashCode() {… } @Override public boolean equals(Object obj) {… } @Override public int compareTo(RoomDayWritable other) {… } } Distributed Systems + Middleware: Hadoop 46 Politecnico di Milano MapReduce example (reprise) Day Mote Temperature 20120630 18 32.24 20120630 18 32.37 20120630 18 32.43 20120630 18 32.45 20120630 31 31.86 20120630 31 31.84 20120630 31 31.80 20120630 31 31.78 Day Mote Mean temperature 20120630 18 32.3725 20120630 31 31.82 Distributed Systems + Middleware: Hadoop 47 Politecnico di Milano Single node Configuration needs to be changed – Location of all the daemons is localhost – Number of replicas is 1 Format HDFS Start the HDFS daemon Start the YARN daemon Run the demo Distributed Systems + Middleware: Hadoop 48 Politecnico di Milano Configuration core-site.xml <configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration> hdfs-site.xml <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration> mapred-site.xml <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration> yarn-site.xml <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name> yarn.nodemanager.aux-services.mapreduce.shuffle.class </name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> </configuration> 49 Distributed Systems + Middleware: Hadoop Politecnico di Milano Run the example Format HDFS – hdfs namenode –format Start daemons (check the logs!) – – – – start-dfs.sh start-yarn.sh mr-jobhistory-daemon.sh start historyserver (*) jps Copy the file on HDFS – hadoop fs –mkdir –p /user/hadoop – hadoop fs –copyFromLocal source.csv . Distributed Systems + Middleware: Hadoop 50 Politecnico di Milano Run the example Run – hadoop jar somename.jar MeanTemperatureJob source.csv output • Again, the output directory (on HDFS!) MUST not exist Check the output – hadoop fs –cat output/part-r-00000 Distributed Systems + Middleware: Hadoop 51 Politecnico di Milano Cluster mode Configuration needs to be changed – See example file on the course Website A new file, called “slaves”, has to be created on the master node, containing addresses of all the datanodes Running the example works in the exact same way Distributed Systems + Middleware: Hadoop 52 Politecnico di Milano PIG Distributed Systems + Middleware: Hadoop 53 Politecnico di Milano Pig Language and runtime to perform complex queries on richer data structures – Pig Latin is the language to express data flows – Runtime to execute scripts on Hadoop clusters A script is a sequence of transformations on initial data – They will be translated into MapReduce jobs – Faster development: analysis can be written in a few lines and executes on terabytes of data User Defined Functions can extend Pig capabilities Performances are comparable with native MR code* Distributed Systems + Middleware: Hadoop 54 Politecnico di Milano Pig Three execution modes – Script – Interactive shell (Grunt) – Embedded in Java IDE plugins Scripts can be run in local mode (single JVM) or on a cluster (pseudo or real) Pig is able to generate a (reasonably) complete and concise sample dataset for a script Distributed Systems + Middleware: Hadoop 55 Politecnico di Milano MapReduce example with Pig REGISTER ./hadooptests-1.0.jar; raw = LOAD 'temperature-sorted.csv' USING PigStorage('\t') AS (timestamp:long, mote:int, temperature:int, humidity:int, battery:int); clean = FILTER raw BY mote != 1 day = FOREACH clean GENERATE me.sivieri.hadoop.pig.ExtractDate(timestamp) as date, mote, temperature / 100.00 as temperature; grouped_date_mote = GROUP day BY (date, mote); mean_temp = FOREACH grouped_date_mote GENERATE group, AVG(day.temperature); DUMP mean_temp; Distributed Systems + Middleware: Hadoop 56 Politecnico di Milano MapReduce example with Pig public class ExtractDate extends EvalFunc<String> { @Override public String exec(Tuple arg0) throws IOException { if (arg0 == null || arg0.size() == 0) return null; try { Long timestamp = (Long) arg0.get(0); Date date = new Date(timestamp); DateFormat df = new SimpleDateFormat("yyyyMMdd"); return df.format(date); } catch(Exception e){ System.err.println("ExtractDate: failed to proces input; error - " + e.getMessage()); return null; } } } Distributed Systems + Middleware: Hadoop 57 Politecnico di Milano Bibliography Tom White, “Hadoop – The definitive guide”, 3rd Edition, O’Reilly Sanjay Ghemawat, Howard Gobioff and Shun-Tak Leung, “The Google File System”, Google 2003 Jeffrey Dean and Sanjay Ghemawat, “MapReduce: Simplified Data Processing on Large Clusters”, Google 2004 Distributed Systems + Middleware: Hadoop 58