Sunil S. Ranka's Weblog

Superior Data Analytics is the antidote to Business Failure

Posts Tagged ‘MapReduce’

Big Data – Tez, MR, Spark Execution Engine : Performance Comparison

Posted by sranka on February 25, 2016

There is no question that massive data is being generated in greater volumes than ever before. Along with the traditional data set, new data sources as sensors, application logs, IOT devices, and social networks are adding to data growth. Unlike traditional ETL platforms like Informatica, ODI, DataStage that are largely proprietary commercial products, the majority of Big ETL platforms are powered by open source.

With many execution engines, customers are always curious about their usage and performance.

To put it into perspective, In this post I am running set of query against 3 key Query Engines namely Tez, MapReduce, Spark (MapReduce) to compare the query execution timings.

create external table sensordata_csv
(
ts string,
deviceid int,
sensorid int,
val double
)
row format delimited
fields terminated by '|'
stored as textfile
location '/user/sranka/MachineData/sensordata'
;

drop table sensordata_part;

create table sensordata_part
(
deviceid int,
sensorid int,
val double
)
partitioned by (ts string)
clustered by (deviceid) sorted by (deviceid) into 10 buckets
stored as orc
;

"**********************************************"
"** 1) Baseline: Read a csv without Tez"
" set hive.execution.engine=mr"
" select count(*) from sensordata_csv where ts = '2014-01-01'"
"**********************************************"
2016-02-25 02:57:27,444 Stage-1 map = 0%,  reduce = 0%
2016-02-25 02:57:35,880 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.84 sec
2016-02-25 02:57:44,420 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 4.99 sec
MapReduce Total cumulative CPU time: 4 seconds 990 msec
Ended Job = job_1456183816302_0046
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 4.99 sec   HDFS Read: 3499156 HDFS Write: 6 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 990 msec
OK
16733
Time taken: 32.524 seconds, Fetched: 1 row(s)

"**********************************************"
"** 2) Read a csv with Tez"
" set hive.execution.engine=tez"
" select count(*) from sensordata_csv where ts = '2014-01-01'"
"**********************************************"
Total jobs = 1
Launching Job 1 out of 1

Status: Running (application id: application_1456183816302_0047)

Map 1: -/-    Reducer 2: 0/1
Map 1: 0/1    Reducer 2: 0/1
Map 1: 0/1    Reducer 2: 0/1
Map 1: 0/1    Reducer 2: 0/1
Map 1: 1/1    Reducer 2: 0/1
Map 1: 1/1    Reducer 2: 1/1
Status: Finished successfully
OK
16733
Time taken: 16.905 seconds, Fetched: 1 row(s)

"**********************************************"
"** 3) Read a partition with Tez"
" select count(*) from sensordata_part where ts = '2014-01-01'"
"**********************************************"
Total jobs = 1
Launching Job 1 out of 1
Status: Running (application id: application_1456183816302_0047)

Map 1: -/-    Reducer 2: 0/1
Map 1: 0/2    Reducer 2: 0/1
Map 1: 1/2    Reducer 2: 0/1
Map 1: 2/2    Reducer 2: 0/1
Map 1: 2/2    Reducer 2: 1/1
Status: Finished successfully
OK
16733
Time taken: 6.503 seconds, Fetched: 1 row(s)

"**********************************************"
"** 4) Read a partition with Spark"
" select count(*) from sensordata_part where ts = '2014-01-01'"
"**********************************************"

Time taken: took 5.8 seconds

"**********************************************"
"** 5) Read a csv with Spark"
" select count(*) from sensordata_csv where ts = '2014-01-01'"
"**********************************************"
Time taken: took 4.5 seconds

Query 1select count(*) from sensordata_csv where ts = ‘2014-01-01’

Query 2select count(*) from sensordata_part where ts = ‘2014-01-01’

Below tables shows the execution timings :
Screen Shot 2016-02-24 at 11.07.03 PM

Conclusion Which Engine is right :

Spark being In memory execution engine comes out to be a clear winner, but in certain scenario especially in the current scenario of running query on partition table TEZ execution engines comes closer to spark.

With this you can not conclude that you Spark will solve your — World Hunger Problem — of Big ETL, being continuously growing product Spark has its own challenges when it comes to productization of the Spark workload, same holds True with TEZ. In all MR engine has been around for the most time and its been the core of HDFS framework, for mission critical workloads which are not time bound, MR could be the best choice.

Hope This Helps

Sunil S Ranka

About Spark : http://spark.apache.org/

About MapReduce : https://en.wikipedia.org/wiki/MapReduce

About Tez : https://tez.apache.org/

Posted in Hadoop | Tagged: , , , , , , , , , , | 1 Comment »

Map Reduce: File compression and Processing cost

Posted by sranka on August 19, 2015

Recently while working with a customer we ran into an interesting situation concerning file compression and processing time. For a system like Hadoop, file compression has been always a good way to save on space, especially when Hadoop replicates the data multiple times.

All Hadoop compression algorithms exhibit a space/time trade-off: faster compression and decompression speeds usually come at the expense of space savings.  For more details about how compression is used, see https://documentation.altiscale.com/when-and-why-to-use-compression. There are many file compression formats, but below we only mention some of the commonly used compression methods in Hadoop. 

The type of compression plays an important role — the true power of MapReduce is realized when input can be split, and not all compression formats are splittable, resulting in an unexpected number of map tasks.

In the case of splittable formats, the number of mappers will correspond to number of block-sized chunks into which the file has been stored, whereas in case of a non-splittable format a single map task will process all the blocks. Although the table above shows that LZO is not splittable, in fact it is possible to index LZO files so that performance can be greatly improved. At Altiscale, our experience has shown that by indexing LZO format files, it will make LZO-compressed files splittable and you can gain in performance. For more information, see https://documentation.altiscale.com/compressing-and-indexing-your-data-with-lzo on how to do it.

Format Codec Extension Splittable Hadoop HDInsight
DEFLATE org.apache.hadoop.io.compress.DefaultCodec .deflate N Y Y
Gzip org.apache.hadoop.io.compress.GzipCodec .gz N Y Y
Bzip2 org.apache.hadoop.io.compress.BZip2Codec .bz2 Y Y Y
LZO com.hadoop.compression.lzo.LzopCodec .lzo N Y N
LZ4 org.apache.hadoop.io.compress.Lz4Codec .Lz4 N Y N
Snappy org.apache.hadoop.io.compress.SnappyCodec .Snappy N Y N

To measure the performance and processing costs for the compression method, we did a simple wordcount example on Altiscale’s in-house production cluster by following the steps mentioned in the below link.  Although there are other more detailed methodologies to measure performance, we choose this example just to demonstrate the benefit and performance tradeoff of a splittable solution:

https://documentation.altiscale.com/wordcount-example
The following  table represents the results.

# File Name Compression Option size

(GB)

Comments # Of

Maps

# Of

Reducers

Processing Time
1 input.txt n/a 5.9 No Compression 24 1 1min 16sec
2 input.txt.gz default 1.46 Normal Gzip compression 1 1 11min 19sec
3 input1.txt.gz -1 1.42 gzip with -1 option, means optimize for speed 1 1 11min 41sec
4 input9.txt.gz -9 1.14 gzip with -9 option, means optimize for space 1 1 11min 21sec

The following shows how you can use the Resource Manager Web UI to find the relevant job statistics that show the processing time as well as the number of mappers and reducers that were used.  

input-gz

Conclusion

In our test #1 scenario the uncompressed file size was 5.9 GB when stored in HDFS. With a HDFS block size of 256 MB, the file was stored as ~24 blocks, and a MapReduce job using this file as input created 24 input splits, each processed independently as input to a separate map task taking only 1 min 16 sec.

In the rest of the test scenarios, due to gzip, the file could not be split, resulting in a single input split and taking an average time of approximately 11 min. Even the gzip -1 option, meant for optimize speed, or  -9 option, meant for optimize space, did not  help much.

Gzip compression is an important aspect of the Hadoop ecosystem; it helps save space at a trade off of processing time. If the data processing is time sensitive, then a splittable compression format, or even uncompressed files would be recommended.

Posted in Uncategorized | Tagged: , , , , , , | 1 Comment »

Behind The Scene Of MapReduce Job

Posted by sranka on October 28, 2013

Hi All

Recently I have spending most of my time on Big Data projects,using CDH 4.X. Understanding key component of hadoop infrastruture  is very necessary, But the MapReduce (MR) is the most important for processing and aggregrating data. For getting the best of the performance, one needs to know the details of MapReduce job. After reading several white papers and few books, in my opinion below paragraph summarizes the MapReduce THE BEST !!!!!

All About Map Reduce 

The execution of a MapReduce job is broken down into map tasks and reduce tasks. Subsequently, map task execution is divided into the phases: Read (reading map inputs), Map (map function pro-cessing), Collect (serializing to buffer and partitioning), Spill (sorting, combining, compressing, and writing map outputs to local disk), and Merge (merging sorted spill files). Reduce task execution is divided into the phases: Shuffle (transferring map outputs to reduce tasks, with decompression if needed), Merge (merging sorted map outputs), Reduce (reduce function processing), and Write (writing reduce outputs to the distributed file-system). Each phase represents an important part of
the job’s overall execution in Hadoop.

In the MapReduce model, computation is divided into a map function and a reduce function. The map function takes a key/value pair and produces one or more intermediate key/value pairs. The reduce function then takes these intermediate key/value pairs and merges all values corresponding to a single key. The map function can run independently on each key/value pair, exposing enormous amounts of parallelism. Similarly, the reduce function can run independently on each intermediate key, also exposing significant parallelism. In Hadoop, a centralized JobTracker service is responsible for splitting the input data into pieces for processing by independent map and reduce tasks, scheduling each task on a cluster node for execution, and recovering from failures by re-running tasks. On each node, a TaskTracker service runs MapReduce tasks and periodically contacts the JobTracker to report task completions and request new tasks. By default, when a new task is received, a new JVM instance will be spawned to execute it.

The about text is taken from

The Hadoop Distributed Filesystem: Balancing Portability and Performance by Jeffrey Shafer, Scott Rixner, and Alan L. Cox :Rice University

Technical Report : Hadoop Performance Models By Herodotos Herodotou 

 

Hope This helps

Sunil S Ranka

“Superior BI is the antidote to Business Failure”

Posted in Big Data, sunil s ranka | Tagged: , , , , , , , , | Leave a Comment »