Distributed Systems + Middleware
Hadoop
Alessandro Sivieri
Dipartimento di Elettronica, Informazione e Bioingegneria
Politecnico di Milano, Italy
[email protected]
http://corsi.dei.polimi.it/distsys
Politecnico
di Milano
Contents
 Introduction to Hadoop
 History
 MapReduce
 HDFS
 Pig
Distributed Systems + Middleware: Hadoop
2
Politecnico
di Milano
INTRODUCTION TO
HADOOP
Distributed Systems + Middleware: Hadoop
3
Politecnico
di Milano
Data
 We live in a digital world that produces data at an
impressive speed
–
–
–
–
–
–
–
As of 2012, 2.7 ZB of data exist (1 ZB = 1021 Bytes)
NYSE produces 1 TB of data per day
The Internet Archive grows by 20 TB per month
The LHC produces 15 PB of data per year
AT&T has a 300 TB database
100 TB of data uploaded daily on Facebook
…
Distributed Systems + Middleware: Hadoop
4
Politecnico
di Milano
Data
 Personal data is growing, too
– E.g., photos: a single photo taken with a Nikon
commercial camera takes about 6 MB (default settings); a
year of family photos takes about 8 GB of space
– … adding to the slices of personal stuff uploaded on
social networks, video Websites, blogs and more
 Machine-produced data is also growing
– Machine logs
– Sensor networks and monitored data
Distributed Systems + Middleware: Hadoop
5
Politecnico
di Milano
Data analysis
 Main problem: disk reading speed / capacity has not
really improved
– Solution: parallelize the storage and read less data from
each disk
– Problems: hardware replication, data aggregation
 Take, for example, a RDBMS (keeping in mind that
the seeking time on disks measures the latency of
the operations):
– Updating records is fast: a B-Tree structure is efficient
– Reading many records is slow: if access is dominated by
seeking time, it is faster to read the entire disk (which
operates at transfer time)
Distributed Systems + Middleware: Hadoop
6
Politecnico
di Milano
Hadoop
 Reliable data storage: Hadoop Distributed File
System
 Data analysis: MapReduce implementation
 Many tools for the developers
– Easy cluster administration
– Query languages
• Some are similar to SQL
– Column-oriented distributed databases on top of Hadoop
– Structured to unstructured repositories and back
– …
Distributed Systems + Middleware: Hadoop
7
Politecnico
di Milano
Hadoop vs. The (existing) World
 RDBMS:
– Disk seek time
– Some types of data are not normalized (e.g., logs):
MapReduce works well with unstructured data
– MapReduce scales linearly (while a RDBMS does not)
 Volunteer computing (e.g., SETI@home)
– Similar model, but Hadoop works in a localized cluster
sharing high-performance bandwidth, while volunteer
computing works over the Internet on untrusted
computers performing other operations meanwhile
Distributed Systems + Middleware: Hadoop
8
Politecnico
di Milano
Hadoop vs. The (existing) World
 MPI:
– Works well for compute-intensive jobs, but network
becomes the bottleneck when hundredths of GB of data
have to be analyzed
– Conversely, MapReduce does its best to exploit data
locality by collocate the data with the compute node
(network bandwidth is the most precious resource, it
must not be wasted)
– MapReduce operates at a higher level wrt MPI: data flow
is already taken care
– MapReduce implements failure recovery (in MPI the
developer has to handle checkpoints and failure recovery)
Distributed Systems + Middleware: Hadoop
9
Politecnico
di Milano
HADOOP HISTORY
Distributed Systems + Middleware: Hadoop
10
Politecnico
di Milano
Brief history
 In 2002, Mike Cafarella and Doug Cutting started
working on Apache Nutch, a new Web search
engine
 In 2003, Google published a paper on the Google
File System, a distributed filesystem, and Mike and
Doug started working on a similar, open source,
project
 In 2004, Google published another paper, on the
MapReduce computation model, and yet again Mike
and Doug implemented an open source version in
Nutch
Distributed Systems + Middleware: Hadoop
11
Politecnico
di Milano
Brief history
 In 2006, these two projects separated from Nutch
and became Hadoop
 In the same year, Doug Cutting started working for
Yahoo! and started using Hadoop there
 In 2008 Hadoop was used by Yahoo! (10000-core
cluster), Last.fm, Facebook and the NYT
 In 2009, Yahoo! broke the world record for sorting
1 TB of data in 62 seconds, using Hadoop
 Since then, Hadoop became mainstream in industry
Distributed Systems + Middleware: Hadoop
12
Politecnico
di Milano
Examples from the Real World
 Last.fm
– Each user listening to a song (local or in streaming)
generates a trace
– Hadoop analyses these traces to produce charts
• E.g., track statistics per user and per country, weekly top tracks…
 Facebook
– Daily and hourly summaries over user logs
• Products usage, ads campaigns…
– Ad-hoc jobs over historical data
– Long term archival store
– Integrity checks
Distributed Systems + Middleware: Hadoop
13
Politecnico
di Milano
Examples from the Real World
 Nutch search engine
– Link inversion: find outgoing links that point to a specific
Web page
– URL fetching
– Produce Lucene indexes (for text searches)
 Infochimps: explore network graphs
– Social networks: Twitter analysis, measure communities
– Biology: neuron connections in roundworms
– Street connections: OpenStreetMap
Distributed Systems + Middleware: Hadoop
14
Politecnico
di Milano
Hadoop umbrella









HDFS: distributed filesystem
MapReduce: distributed data processing model
MRUnit: unit testing of MapReduce applications
Pig: data flow language to explore large datasets
Hive: distributed data warehouse
HBase: distributed, column-oriented db
ZooKeeper: distributed coordination service
Sqoop: efficient bulk transfers of data over HDFS
…
Distributed Systems + Middleware: Hadoop
15
Politecnico
di Milano
MAPREDUCE
Distributed Systems + Middleware: Hadoop
16
Politecnico
di Milano
MapReduce
 Model for analyzing large amounts of data
 Data has to be organized as a key – value dataset
 Two phases:
– map(k1, v1) -> list(k2, v2), where the input domain is
different from the output domain
– (shuffle: intermediate phase to sort the output of map
and group by key)
– reduce(k2, list(v2)) -> list(v3), where the input and output
domain is the same
Distributed Systems + Middleware: Hadoop
17
Politecnico
di Milano
MapReduce on Hadoop
 Job: unit of work to be performed by the system
– Input data
– Map and Reduce implementations
– Configuration
 Map task and Reduce task: smaller pieces of a job
 Jobtracker: node of the cluster coordinating the job
 Tasktracker: runs a task and reports to the
jobtracker
 Split: part of the input
Distributed Systems + Middleware: Hadoop
18
Politecnico
di Milano
MapReduce on Hadoop
 The jobtracker splits the input in parts, and a map
task is run for each split
 Splits are run in parallel on all nodes
– Hadoop tries hard to run a map task for a specific split in
the same node where HDFS has saved that split
• If not possible, at least in the same rack
 Map task output is written on disk (not on HDFS:
intermediate data do not need replication)
 Reduce tasks receive map output through network
(no data locality here)
 Final output is saved on HDFS
Distributed Systems + Middleware: Hadoop
19
Politecnico
di Milano
MapReduce on Hadoop
Distributed Systems + Middleware: Hadoop
20
Politecnico
di Milano
MapReduce on Hadoop (with
multiple reduce tasks)
Distributed Systems + Middleware: Hadoop
21
Politecnico
di Milano
MapReduce on Hadoop (no reduce)
Distributed Systems + Middleware: Hadoop
22
Politecnico
di Milano
Intermediate data
 Combiner function
– Aggregates data from several map outputs
• To minimize network transfer
• Hadoop decides if this is needed, it may not be executed at all
– In a way, it can be seen as a local instance of the reduce
function
 Shuffle
–
–
–
–
Sorts each map output
Transfers it to (one of the) reducers
Merges it with other map outputs, maintaining sorting
Everything can be configured
• Memory, buffer sizes, parallel copies… check the book!
Distributed Systems + Middleware: Hadoop
23
Politecnico
di Milano
MapReduce on Hadoop
 Everything is configurable
–
–
–
–
–
Number of map tasks
Number of reduce tasks
Data compression
Custom serialization
Memory management
 Profilers to understand the performances in detail
 Tools to enable streaming data in subsequent
MapReduce runs, called workflows (Apache Oozie)
– For complex problems
Distributed Systems + Middleware: Hadoop
24
Politecnico
di Milano
MapReduce on Hadoop: versions
 Version 1: first developed version of Hadoop MR
– The runtime contains the previously mentioned Job and Tasktrackers
– Still developed (latest version: 1.2)
 Version 2: the new Hadoop MR
– The runtime called YARN (Yet Another Resource Negotiator),
substitutes (an generalizes) the previous trackers
– New HDFS features
– Different configurations and APIs
 The book mixes the two versions here and there…
 We will follow version 2 (=> Hadoop 2.2.0, the latest
version)
Distributed Systems + Middleware: Hadoop
25
Politecnico
di Milano
MapReduce example
 Main interface is written in Java
– Hadoop is written in Java
 There are interfaces in many other languages
– Hadoop is able to work in “streaming” mode, using Unix
pipes
– Other languages take advantage of this
•
•
•
•
Python
Ruby
C++
…
Distributed Systems + Middleware: Hadoop
26
Politecnico
di Milano
MapReduce example
 A simple dataset
– Temperature and humidity measured through WSN
motes in two rooms
Timestamp
Room
1341078338
18
3224
54
2999
1341078379
31
3186
49
2999
1341078398
18
3237
48
2999
1341078439
31
3184
49
2999
1341078458
18
3243
47
2999
1341078499
31
3180
49
2999
1341078518
18
3245
48
2999
1341078559
31
3178
51
2999
Distributed Systems + Middleware: Hadoop
Temperature Humidity
Battery
27
Politecnico
di Milano
MapReduce example
 Dataset
– Sampling time: 5 minutes
– Total dataset: about 566 days (more than 1.5 years)
– CSV (only a few megs… it doesn’t really exploit Hadoop,
but it will help us understand how it works)
 Calculations
– Max temperature per day
– Mean temperature per day
– …
Distributed Systems + Middleware: Hadoop
28
Politecnico
di Milano
MapReduce example
 Classes to be implemented
– Mapper
– Reducer
– Job handler (the “main” file, launching the execution and
waiting for results) – this will handle the configuration,
too
 Configuration
– Default filesystem (HDFS, but Hadoop can read from
standard filesystems)
– Default configuration for node and data managers
Distributed Systems + Middleware: Hadoop
29
Politecnico
di Milano
Hadoop configurations
 Standalone (default configuration)
– Runs on single JVM
– No parallelization
– Useful for debugging purposes
 Pseudo-distributed (single-host cluster)
– Parallelization
– Runs daemons on a single host
 Cluster
– Full scale Hadoop
Distributed Systems + Middleware: Hadoop
30
Politecnico
di Milano
Hadoop configuration
 We will see standalone and pseudo-distributed
– I will include cluster configuration in the exercises, but it
requires a certain amount of memory to be run
 For standalone, you don’t need to do almost
anything
– Just set JAVA_HOME environment variable in hadoopenv.sh, before executing hadoop itself
 Notice that Windows is NOT supported in
production mode for Hadoop
– Examples have been tested on Linux and OS X
Distributed Systems + Middleware: Hadoop
31
Politecnico
di Milano
Mapper
public class MeanTemperatureMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) {
String line = value.toString();
String[] parts = line.split("\\s+");
if (parts.length < 3) {
return;
}
long timestamp = Long.parseLong(parts[0]) * 1000;
int mote = Integer.parseInt(parts[1]);
if (mote != 18) {
return;
}
int temperatureInt = Integer.parseInt(parts[2]);
double temperature = temperatureInt / 100.0;
Date date = new Date(timestamp);
DateFormat df = new SimpleDateFormat("yyyyMMdd");
String dateKey = df.format(date);
context.write(new Text(dateKey), new DoubleWritable(temperature));
}
}
Distributed Systems + Middleware: Hadoop
32
Politecnico
di Milano
Mapper
Day
Timestamp
20120630
1341078338
Room
20120630
1341078379
20120630
1341078398
31
18
Temperature
Temperature
Humidity
32.24
3224
54
Battery
2999
49
2999
18
32.37
3186
32.43
3237
48
2999
20120630
1341078439
31
32.45
3184
49
2999
1341078458
18
3243
47
2999
1341078499
31
3180
49
2999
1341078518
18
3245
48
2999
1341078559
31
3178
51
2999
Distributed Systems + Middleware: Hadoop
33
Politecnico
di Milano
Reducer
public class MeanTemperatureReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) {
double sum = 0;
int counter = 0;
for (DoubleWritable dw : values) {
sum += dw.get();
++counter;
}
context.write(key, new DoubleWritable(sum / counter));
}
}
Distributed Systems + Middleware: Hadoop
34
Politecnico
di Milano
Reducer
Day
Temperature
Mean
temperature
20120630
32.24
32.3725
20120630
32.37
20120630
32.43
20120630
32.45
Distributed Systems + Middleware: Hadoop
35
Politecnico
di Milano
Job handler
public class MeanTemperatureJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = Job.getInstance(getConf(), "Mean temperature");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MeanTemperatureMapper.class);
job.setReducerClass(MeanTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MeanTemperatureJob(), args);
System.exit(exitCode);
}
Distributed
Systems + Middleware: Hadoop
36
}
Politecnico
di Milano
Run MapReduce
 Java classes are compiled through Maven
– The example will contain a sample pom.xml file
 To run the example:
– mvn package
– export HADOOP_CLASSPATH=somename.jar
– hadoop MeanTemperatureJob –fs file:/// input/file.csv
output
 Output directory MUST not exist
 Notice the “fs” option…
Distributed Systems + Middleware: Hadoop
37
Politecnico
di Milano
HDFS
Distributed Systems + Middleware: Hadoop
38
Politecnico
di Milano
HDFS
 The previous example used the OS filesystem
– Hadoop does not have any problem with that
 But, if we want to move to pseudo-distributed and
cluster modes, we need to start using HDFS
 Hadoop Distributed File System is designed to store
very large files suitable for streaming data access
Distributed Systems + Middleware: Hadoop
39
Politecnico
di Milano
Name and data nodes
 HDFS has a single* namenode (master) and several
datanodes (slaves)
 The namenode manages the filesystem namespace
(i.e., metadata)
– It maintains a namespace image and an edit log
– It has a list of all the datanodes and which blocks they
store
– It does not maintain any file by itself
 * From version 2, Hadoop added the concept of
HDFS federation
– Backup namenodes!
Distributed Systems + Middleware: Hadoop
40
Politecnico
di Milano
File distribution
 A datanode is a machine storing blocks of files,
continuously communicating with the namenode
– Without a namenode, a datanode is useless
 Each file uploaded to HDFS is split in blocks of 64
MB each
– Minimize seek, maximize transfer rate
– If a file (or the last block of a file) is smaller than 64 MB,
it does not occupy 64 MB
• This is different from OS filesystems
 A file can be bigger than a single disk in the network
Distributed Systems + Middleware: Hadoop
41
Politecnico
di Milano
File distribution
Distributed Systems + Middleware: Hadoop
42
Politecnico
di Milano
Interactions
 Command-line interface
– ls, mkdir, copyFromLocal, cat…
 Third-party tools
– Fuse
 Java APIs
Distributed Systems + Middleware: Hadoop
43
Politecnico
di Milano
Writable datatypes
 Mappers and Reducers used particular types
– Text, DoubleWritable, LongWritable…
 Hadoop defines specific types “wrapping” Java
types, optimizing network serialization
 To add new types, you can implement Writable and
WritableComparable
Distributed Systems + Middleware: Hadoop
44
Politecnico
di Milano
MapReduce example (reprise)
 Let’s assume we want to output the mean
temperature per day and per mote
 We need to use as key in our key-value pairs the pair
day – mote
 Create a new class and extend WritableComparable
– Comparable is needed because Hadoop sorts keys before
reducing
Distributed Systems + Middleware: Hadoop
45
Politecnico
di Milano
MapReduce example (reprise)
public class RoomDayWritable implements WritableComparable<RoomDayWritable> {
private Text date;
private IntWritable mote;
public RoomDayWritable(String date, int mote) {
this.date = new Text(date);
this.mote = new IntWritable(mote);
}
@Override
public void write(DataOutput out) throws IOException {
this.mote.write(out);
this.date.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.mote.readFields(in);
this.date.readFields(in);
}
@Override
public int hashCode() {… }
@Override
public boolean equals(Object obj) {… }
@Override
public int compareTo(RoomDayWritable other) {… }
}
Distributed Systems + Middleware: Hadoop
46
Politecnico
di Milano
MapReduce example (reprise)
Day
Mote
Temperature
20120630
18
32.24
20120630
18
32.37
20120630
18
32.43
20120630
18
32.45
20120630
31
31.86
20120630
31
31.84
20120630
31
31.80
20120630
31
31.78
Day
Mote
Mean temperature
20120630
18
32.3725
20120630
31
31.82
Distributed Systems + Middleware: Hadoop
47
Politecnico
di Milano
Single node
 Configuration needs to be changed
– Location of all the daemons is localhost
– Number of replicas is 1




Format HDFS
Start the HDFS daemon
Start the YARN daemon
Run the demo
Distributed Systems + Middleware: Hadoop
48
Politecnico
di Milano
Configuration
core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>
yarn.nodemanager.aux-services.mapreduce.shuffle.class
</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
49
Distributed Systems + Middleware: Hadoop
Politecnico
di Milano
Run the example
 Format HDFS
– hdfs namenode –format
 Start daemons (check the logs!)
–
–
–
–
start-dfs.sh
start-yarn.sh
mr-jobhistory-daemon.sh start historyserver (*)
jps
 Copy the file on HDFS
– hadoop fs –mkdir –p /user/hadoop
– hadoop fs –copyFromLocal source.csv .
Distributed Systems + Middleware: Hadoop
50
Politecnico
di Milano
Run the example
 Run
– hadoop jar somename.jar MeanTemperatureJob
source.csv output
• Again, the output directory (on HDFS!) MUST not exist
 Check the output
– hadoop fs –cat output/part-r-00000
Distributed Systems + Middleware: Hadoop
51
Politecnico
di Milano
Cluster mode
 Configuration needs to be changed
– See example file on the course Website
 A new file, called “slaves”, has to be created on the
master node, containing addresses of all the
datanodes
 Running the example works in the exact same way
Distributed Systems + Middleware: Hadoop
52
Politecnico
di Milano
PIG
Distributed Systems + Middleware: Hadoop
53
Politecnico
di Milano
Pig
 Language and runtime to perform complex queries
on richer data structures
– Pig Latin is the language to express data flows
– Runtime to execute scripts on Hadoop clusters
 A script is a sequence of transformations on initial
data
– They will be translated into MapReduce jobs
– Faster development: analysis can be written in a few lines
and executes on terabytes of data
 User Defined Functions can extend Pig capabilities
 Performances are comparable with native MR code*
Distributed Systems + Middleware: Hadoop
54
Politecnico
di Milano
Pig
 Three execution modes
– Script
– Interactive shell (Grunt)
– Embedded in Java
 IDE plugins
 Scripts can be run in local mode (single JVM) or on
a cluster (pseudo or real)
 Pig is able to generate a (reasonably) complete and
concise sample dataset for a script
Distributed Systems + Middleware: Hadoop
55
Politecnico
di Milano
MapReduce example with Pig
REGISTER ./hadooptests-1.0.jar;
raw = LOAD 'temperature-sorted.csv' USING PigStorage('\t')
AS (timestamp:long, mote:int, temperature:int, humidity:int, battery:int);
clean = FILTER raw BY mote != 1
day = FOREACH clean GENERATE me.sivieri.hadoop.pig.ExtractDate(timestamp) as date,
mote, temperature / 100.00 as temperature;
grouped_date_mote = GROUP day BY (date, mote);
mean_temp = FOREACH grouped_date_mote GENERATE group, AVG(day.temperature);
DUMP mean_temp;
Distributed Systems + Middleware: Hadoop
56
Politecnico
di Milano
MapReduce example with Pig
public class ExtractDate extends EvalFunc<String> {
@Override
public String exec(Tuple arg0) throws IOException {
if (arg0 == null || arg0.size() == 0)
return null;
try {
Long timestamp = (Long) arg0.get(0);
Date date = new Date(timestamp);
DateFormat df = new SimpleDateFormat("yyyyMMdd");
return df.format(date);
}
catch(Exception e){
System.err.println("ExtractDate: failed to proces input; error - " + e.getMessage());
return null;
}
}
}
Distributed Systems + Middleware: Hadoop
57
Politecnico
di Milano
Bibliography
 Tom White, “Hadoop – The definitive guide”, 3rd
Edition, O’Reilly
 Sanjay Ghemawat, Howard Gobioff and Shun-Tak
Leung, “The Google File System”, Google 2003
 Jeffrey Dean and Sanjay Ghemawat, “MapReduce:
Simplified Data Processing on Large Clusters”,
Google 2004
Distributed Systems + Middleware: Hadoop
58
Scarica

Distributed Systems Introduction - home page corsi