-
Notifications
You must be signed in to change notification settings - Fork 0
/
Lab 3.tex
126 lines (86 loc) · 13.1 KB
/
Lab 3.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
\documentclass[11pt]{article}
\usepackage{amsmath}
\usepackage{amssymb}
\usepackage{fancyhdr}
\usepackage{enumerate}
\oddsidemargin0cm
\topmargin-2cm
\textwidth16.5cm
\textheight23.5cm
\newcommand{\question}[2] {\vspace{.25in} \hrule\vspace{0.5em}
\noindent{\bf #1: #2} \vspace{0.5em}
\hrule \vspace{.10in}}
\renewcommand{\part}[1] {\vspace{.10in} {\bf (#1)}}
\newcommand{\qed}{\hfill \ensuremath{\Box}}
\setlength{\parindent}{0pt}
\setlength{\parskip}{5pt plus 1pt}
\begin{document}
\medskip
\thispagestyle{plain}
\begin{center}
{\Large 15-440 Lab 3: MapReduce} \\
Neil Batlivala and Neha Rathi \\
November 16, 2013 \\
\end{center}
\question{I} {Design}
A map reduce cluster is initiated at the master machine and is configured using a config file. Once run, the master ssh's into each of the participant machines, runs a FacilityMananger on each of the machines and waits until the machines have all connected, and once they have, the master allows for commands to be typed into a supplied command prompt. This makes it very simple for the application programmer to start up a map reduce cluster without having to ssh into each participant him/herself in order to execute some script.
To use the map reduce facility, a programmer must write a java class that extends MapReduce440 which contains two classes, one that extends Mapper440 and another that extends Reducer440. Both have a map/reduce function that must be implemented. The map reduce facility only supports Strings as input types, where each record is viewed as a single line of the input file, and so the map function from Mapper440 takes a single String as a record and outputs a list of KeyValue pairs. All intermediatary results are also dealt with as Strings, and thus the programmer is restricted to this as well. However, the programmer can always parse to and from strings, so it should not be too inhibiting. Also, there are two functions that must be created in the MapReduce440 class that the programmer writes, and these functions must instantiate their mapper and a reducer respectively.
\textbf{Distributed File System}
\\The distributed file system is comprised of two socket servers, one Read server and one Writer server. The Read server allows for other remote machines to read files from the local machine whereas the Write server allows for other machines to write files to the local machine. We made this decision because portions of the map reduce flow are made easier using push and pull models, for example when uploading a file, the local machine containing the file can push their data onto other machines via the Write server on other machines, whereas when a Reducer needs a partition file, it can pull the data from another machine via the Read server on the other machines. This was our attempt at creating a distributed IO facility. The distributed file system also creates local directories in the /tmp/data440 file on each machine (to ensure that the data is not shared over AFS). Also, care was taken to ensure that when uploading a file onto the distributed file system, if a node is not reached, then the block that was supposed to be distributed to that node will be redistributed to another node. Apart from this, there is limited functionality to the ditsributed file system with respect to the programmer. He/she can simply upload files, which are replicated and available to use by a map reduce program, and then download that same file (moves the blocks to the local machine and concatenates the blocks into the original file uploaded).
A single MapReduce program is carried out by 4 different types of jobs: a MapJob, MapCombineJob, ReduceJob, and ReduceCombineJob. The MapJob pertains to running a map across a single data block, a MapCombine job pertains to combining all the mapped blocks on a single node, a ReduceJob pertains to pulling all the partitions from the mappers onto a local machines, merging the partitions, and then running a reduce on the merged partition, and the ReduceCombineJob gathers all the reduces outputs onto a single machine and concatenates them together.
The MapJob was kept seperate from the MapCombineJob on purpose to support better partitioning by using a global max and global min hash key. Thus, before a combine can be done to partition the mapped key-values into the appropriate partition file, the global maximum hash key (hash of a String key) and global minimum hash key must be obtained. This was done (with difficulty, because it would have been easier to keep the map and combine phases sychronized on a single machine), because by using a global max and min hash key for partitioning, we are ensured that a more even partition distribution will be achieved.
\textbf{Config:}
\\ The config file allows for the file parameters of the MapReduce cluster to be customized:
\\ - cluster name (the name supplied to cluster so that multiple clusters can share machines)
\\ - master ip address
\\ - participants ip addresses (must contain master ip address)
\\ - file system read port (the port on which the read server will listen)
\\ - file system write port (the port on which the write server will lsiten)
\\ - rmi port (the port on which the RMI registry will be created)
\\ - maximum number of maps per host
\\ - maximum number of reduces per host
\\ - replication factor
\\ - block size (note that because we deal with records as a single line of text, the block size is the number of lines to be included in each block, which does not necessarily keep the size of each block in memory consistent, but our assumption of String line records required this simplification).
\textbf{FacilityManager and FacilityManagerMaster}
\\The MapReduce facility is run by a FacilityManger which can be either a regular FacilityManager or a FacilityManagerMaster. We used RMI to provide remote access to each of the facility managers across the network. In this way, we leveraged RMI to send simple messages back and forth between managers. Although the overhead for this may be slightly more than having direct socket connections, we believe that the ease of code comprehension using RMI (i.e. FacilityManger's look and behave as if they were local) is greater than the having slightly more efficient code (with respect to the amount of data sent over the wire). From the application programmer's perspective, this allows for a master to be started up, and then can reach out to the registries of the other participants to lookup the remote facility managers. RMI is also a robust, enterprise tested tool.
\textbf{Health Checker}
\\ To ensure that everything is running smoothly in the system, a health checker is created on the master that pings all the participants every 2 seconds to ensure that they are alive. If they do not respond, then the participant is considered to have died and the jobs running on that node are reasiigned.
\textbf{MapReduceProgram}
\\ A map reduce program is an object that gets instantiated to handle the state of a single map reduce program run by a user. It is only this object that can isntantiate new Map/Reduce/MapCombine/ReduceCombine jobs and it is this object that keeps track of which nodes are running which jobs that are required for this program's completion. In this way, we use the MapReduceProgram as a source of truth on the master for a map reduce program. This was an important concept that we used to ensure that jobs were completed, or re-dispatched in case of failure. The MapReduceProgram is responsible for its own state as well as the state of each of its NodeJobs.
\textbf{Job Dispatcher}
\\ To keep control over the jobs running in the system, all created jobs (MapJob, ReduceJob, etc.) are added to a blocking queue of the Job Dispatcher. When available, the dispatcher will attempt to find a worker to run that job (see Job Scheduler below), and will dispatch that job to the appropriate manager. A lot of care was taken between the interactions of the Job Dispatcher and the Job Scheduler to ensure that jobs are scheduled to nodes based on data locality first, and that the MAX-REDUCES and MAX-MAPS per host are not exceeded. Essentially when the Map Reduce facility is operating at full capacity, Jobs will migrate between the dispatcher and scheduler, while trying to be assigned a valid participant worker.
\textbf{Job Scheduler}
\\ The job scheduler handles all the scheduling of jobs. In the case of MapJobs it tries to find first a participant that has the required data block and ensures that the chosen worker is operating under the maximum map task capacity. For ReduceJobs, only the maximum reduce task capacity is ensured to be maintained when choosing a participant worker for some job. Note that if no participant can be found, the job is just re-enqued into the Job Dispatcher's blocking queue (thus at fully capacity, a job will cycle between the dispatcher and scheduler). One major simplifcation that is made with respect to the reduce phase is that the number of reducers for any particular user-program is equal to the total number of participants. This may not be the best, and in fact the number of reducers should be dependent on the utilization of the system, however we decided to forego this and just assume that every participant is going to eventually become a reducer for every user-program.
\textbf{Record Reader and Record Writer}
\\ These classes handle the reading and writing of the intermediary files. The format for writing a key value pair just uses a new line as a delimter. When reading and writing key's with multiple values, the key, number of values, and list of values are all written using a newline as a delimeter. This class is failry straightforward considering that only String types are handled by the facility.
\question{II} {Implementation}
% Describe the portions of the design that are correctly implemented, that have bugs, and that remain unimplemented.
The Map Reduce facility will properly boot up, and will reach out to all the participants via ssh to execute scripts on them. The map reduce facility itself works properly. An application programmer can create their own program following our specs of only String, and will correctly run the map reduce program across the system. Commands can be issued from any participant in the system.
As said above, the facility managers themselves are Remote objects using RMI while the distributed file system operates off of server sockets.
The information that is passed throughout the system is in the form of serializable objects. For example, the different type of NodeJobs (MapJob, ReduceJob, MapCombineJob, and ReduceCombineJob) are all serializable objects that are created on the master by the MapReduceProgram object and then sent to the appropriate facility manager via RMI.
Things that remain unimplemented are:
\\ - do not ensure that the replication factor is kept if a node crashes
\\ - if a MapCombine jobs fails, the associated map jobs should be re-run on a new node (this is the case that a mapper node crashes and thus all the map tasks need to be re done).
\\ - testing on very large inputs (something like Shakespeare's texts).
\\ - downloading files does not work
\question{III} {Build, Deploy, and Run}
% Tell us how to cleanly build, deploy, and run your project.
We have provided a sample config file, \texttt{config.txt} for your convenience. This config file specifies the cluster name, master IP, participant IPs (which include the master IP), the ports for file system communication, the port for RMI, the maximum number of mappers and reducers per host, the replication factor, and the block size.
To run the master, use the machine specified in the config file and run the following command:
\begin{center}\texttt{java -jar MapReduceFacility.jar -m <config-file>}\end{center}
To execute programs from a participant, use a participant machine specified in the config file and run the following command:
\begin{center}\texttt{java -jar MapReduceFacility.jar -s <rmi-port> <cluster-name>}\end{center}
Where \texttt{rmi-port} and \texttt{cluster-name} match those specified in the config file.
\question{IV} {Dependencies}
% Highlight any dependencies and software or system requirements.
Our project depends on Java 1.6.0 or higher.
\question{V} {Running and Testing}
See above for how to run the MapReduce
% Tell us how to run and test your framework
In general, the way to run and test a program is to:
upload a file: upload $<$filepath$>$ $<$filename$>$
\\ run mapreduce program: mapreduce $<$path-to-class$>$ $<$filename$>$
While the map reduce programs are running, one can run the "ps" command or "ps -a" command to get a list of all the active/completed jobs in the facility.
The first test program we wrote is a WordCountTest program that counts the number of times each word appears in the input file. This was tested on input files that were 10 and 100 lines long, which worked correctly. We also tested this using multiple block sizes, ranging from 1 - 100.
Another test that we wrote is called MutualFriendsTest.class which takes a input file of records, where each record is a person's name followed by all the person's friends, and output a file with all pairwise combinations of people and their intersection of friends. We have provided both the test program and an input file called friends.txt
\end{document}