Monday, November 11, 2013

Hadoop: How to control how many map tasks can be executed in parallel

http://developer.yahoo.com/hadoop/tutorial/module4.html

The InputFormat defines the list of tasks that make up the mapping phase; each task corresponds to a single input split. The tasks are then assigned to the nodes in the system based on where the input file chunks are physically resident. An individual node may have several dozen tasks assigned to it. The node will begin working on the tasks, attempting to perform as many in parallel as it can. The on-node parallelism is controlled by the mapred.tasktracker.map.tasks.maximum parameter

mapreduce-flow
Figure 4.5: Detailed Hadoop MapReduce data flow

Setting the number of map tasks and reduce tasks

http://stackoverflow.com/questions/6885441/setting-the-number-of-map-tasks-and-reduce-tasks


The number of map tasks for a given job is driven by the number of input splits and not by the mapred.map.tasks parameter. For each input split a map task is spawned. So, over the lifetime of a mapreduce job the number of map tasks is equal to the number of input splits. mapred.map.tasks is just a hint to the InputFormat for the number of maps.
In your example Hadoop has determined there are 24 input splits and will spawn 24 map jobs in total. But, you can control how many map tasks can be executed in parallel by each of the task tracker.
Also, removing a space after -D might solve the problem for reduce.
For more information on the number of map and reduce tasks, please look at the below url
--------
As Praveen mentions above, when using the basic FileInputFormat classes is just the number of input splits that constitute the data. The number of reducers is controlled by mapred.reduce.tasks specified in the way you have it: -D mapred.reduce.tasks=10 would specify 10 reducers. Note that the space after -D isrequired; if you omit the space, the configuration property is passed along to the relevant JVM, not to Hadoop.
Are you specifying 0 because there is no reduce work to do? In that case, if you're having trouble with the run-time parameter, you can also set the value directly in code. Given a JobConf instance job, call
job.setNumReduceTasks(0);
inside, say, your implementation of Tool.run. That should produce output directly from the mappers. If your job actually produces no output whatsoever (because you're using the framework just for side-effects like network calls or image processing, or if the results are entirely accounted for in Counter values), you can disable output by also calling
job.setOutputFormat(NullOutputFormat.class);

Sunday, November 10, 2013

Hadoop: custom RecordReader of TextInputFormat

http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

Problem : We want our mapper to receive 3 records ( 3 lines ) from the source file at a time instead on 1 line as provided by default by the TextInputFormat.
Approach :
  1. We will extend from  TextInputFormat class to create our own NLinesInputFormat .
  2. We will also create our own RecordReader class called NLinesRecordReader where we will implement the logic of feeding 3 lines/records at a time.
  3. We will make a change in our driver program to use our new NLinesInputFormat class.
  4. To prove that we are really getting 3 lines at a time, instead of actually counting words ( which we already know now how to do ) , we will emit out number of lines we get in the input at a time as a key and 1 as a value , which after going through reducer will give us frequency of  each unique number of lines to the mappers.

Hadoop: InputFormat and RecordReader

http://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/

File is composed on 6 lines of 50Mb each
InputSplit1
  • The first Reader will start reading bytes from Block B1, position 0. The first two EOL will be met at respectively 50Mb and 100Mb. 2 lines (L1 & L2) will be read and sent as key / value pairs to Mapper 1 instance. Then, starting from byte 100Mb, we will reach end of our Split (128Mb) before having found the third EOL. This incomplete line will be completed by reading the bytes in Block B2 until position 150Mb. First part of Line L3 will be read locally from Block B1, second part will be readremotely from Block B2 (by the mean of FSDataInputStream), and a complete record will be finally sent as key / value to Mapper 1.
  • The second Reader starts on Block B2, at position 128Mb. Because 128Mb is not the start of a file, there are strong chance our pointer is located somewhere in an existing record that has been already processed by previous Reader. We need to skip this record by jumping out to the next available EOL, found at position 150Mb. Actual start of RecordReader 2 will be at 150Mb instead of 128Mb.
  • We can wonder what happens in case a block starts exactly on a EOL. By jumping out until the next available record (through readLine method), we might miss 1 record. Before jumping to next EOL, we actually need to decrement initial “start” value to “start – 1″. Being located at at least 1 offset before EOL, we ensure no record is skipped !
    Remaining process is following same logic, and everything is summarized in below table.
    InputSplit_meta1