Can someone explain to me the difference between map and flatMap and what is a good use case for each?
What does "flatten the results" mean? What is it good for?
This question is related to
apache-spark
map
returns RDD of equal number of elements while flatMap
may not.
An example use case for flatMap
Filter out missing or incorrect data.
An example use case for map
Use in wide variety of cases where is the number of elements of input and output are the same.
number.csv
1
2
3
-
4
-
5
map.py adds all numbers in add.csv.
from operator import *
def f(row):
try:
return float(row)
except Exception:
return 0
rdd = sc.textFile('a.csv').map(f)
print(rdd.count()) # 7
print(rdd.reduce(add)) # 15.0
flatMap.py uses flatMap
to filtered out missing data before addition. Less numbers are added compared to the previous version.
from operator import *
def f(row):
try:
return [float(row)]
except Exception:
return []
rdd = sc.textFile('a.csv').flatMap(f)
print(rdd.count()) # 5
print(rdd.reduce(add)) # 15.0
It boils down to your initial question: what you mean by flattening ?
When you use flatMap, a "multi-dimensional" collection becomes "one-dimensional" collection.
val array1d = Array ("1,2,3", "4,5,6", "7,8,9")
//array1d is an array of strings
val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )
val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)
You want to use a flatMap when,
Use test.md
as a example:
? spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.
scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3
scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15
scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))
scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)
If you use map
method, you will get the lines of test.md
, for flatMap
method, you will get the number of words.
The map
method is similar to flatMap
, they are all return a new RDD. map
method often to use return a new RDD, flatMap
method often to use split words.
all examples are good....Here is nice visual illustration... source courtesy : DataFlair training of spark
Map : A map is a transformation operation in Apache Spark. It applies to each element of RDD and it returns the result as new RDD. In the Map, operation developer can define his own custom business logic. The same logic will be applied to all the elements of RDD.
Spark RDD map
function takes one element as input process it according to custom code (specified by the developer) and returns one element at a time. Map transforms an RDD of length N into another RDD of length N. The input and output RDDs will typically have the same number of records.
Example of map
using scala :
val x = spark.sparkContext.parallelize(List("spark", "map", "example", "sample", "example"), 3)
val y = x.map(x => (x, 1))
y.collect
// res0: Array[(String, Int)] =
// Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
// rdd y can be re writen with shorter syntax in scala as
val y = x.map((_, 1))
y.collect
// res1: Array[(String, Int)] =
// Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
// Another example of making tuple with string and it's length
val y = x.map(x => (x, x.length))
y.collect
// res3: Array[(String, Int)] =
// Array((spark,5), (map,3), (example,7), (sample,6), (example,7))
FlatMap :
A flatMap
is a transformation operation. It applies to each element of RDD and it returns the result as new RDD
. It is similar to Map, but FlatMap allows returning 0, 1 or more elements from map function. In the FlatMap operation, a developer can define his own custom business logic. The same logic will be applied to all the elements of the RDD.
What does "flatten the results" mean?
A FlatMap function takes one element as input process it according to custom code (specified by the developer) and returns 0 or more element at a time. flatMap
() transforms an RDD of length N into another RDD of length M.
Example of flatMap
using scala :
val x = spark.sparkContext.parallelize(List("spark flatmap example", "sample example"), 2)
// map operation will return Array of Arrays in following case : check type of res0
val y = x.map(x => x.split(" ")) // split(" ") returns an array of words
y.collect
// res0: Array[Array[String]] =
// Array(Array(spark, flatmap, example), Array(sample, example))
// flatMap operation will return Array of words in following case : Check type of res1
val y = x.flatMap(x => x.split(" "))
y.collect
//res1: Array[String] =
// Array(spark, flatmap, example, sample, example)
// RDD y can be re written with shorter syntax in scala as
val y = x.flatMap(_.split(" "))
y.collect
//res2: Array[String] =
// Array(spark, flatmap, example, sample, example)
map :
is a higher-order method that takes a function as input and applies it to each element in the source RDD.
flatMap:
a higher-order method and transformation operation that takes an input function.
Generally we use word count example in hadoop. I will take the same use case and will use map
and flatMap
and we will see the difference how it is processing the data.
Below is the sample data file.
hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome
The above file will be parsed using map
and flatMap
.
map
>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']
Input has 4 lines and output size is 4 as well, i.e., N elements ==> N elements.
flatMap
>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']
The output is different from map.
Let's assign 1 as value for each key to get the word count.
fm
: RDD created by using flatMap
wc
: RDD created using map
>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]
Whereas flatMap
on RDD wc
will give the below undesired output:
>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]
You can't get the word count if map
is used instead of flatMap
.
As per the definition, difference between map
and flatMap
is:
map
: It returns a new RDD by applying given function to each element of the RDD. Function inmap
returns only one item.
flatMap
: Similar tomap
, it returns a new RDD by applying a function to each element of the RDD, but output is flattened.
map
: It returns a new RDD
by applying a function to each element of the RDD
. Function in .map can return only one item.
flatMap
: Similar to map, it returns a new RDD
by applying a function to each element of the RDD, but the output is flattened.
Also, function in flatMap
can return a list of elements (0 or more)
For Example:
sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()
Output: [[1, 2], [1, 2, 3], [1, 2, 3, 4]]
sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()
Output: notice o/p is flattened out in a single list [1, 2, 1, 2, 3, 1, 2, 3, 4]
Source:https://www.linkedin.com/pulse/difference-between-map-flatmap-transformations-spark-pyspark-pandey/
map and flatMap are similar, in the sense they take a line from the input RDD and apply a function on it. The way they differ is that the function in map returns only one element, while function in flatMap can return a list of elements (0 or more) as an iterator.
Also, the output of the flatMap is flattened. Although the function in flatMap returns a list of elements, the flatMap returns an RDD which has all the elements from the list in a flat way (not a list).
The difference can be seen from below sample pyspark code:
rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]
rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]
RDD.map
returns all elements in single array
RDD.flatMap
returns elements in Arrays of array
let's assume we have text in text.txt file as
Spark is an expressive framework
This text is to understand map and faltMap functions of Spark RDD
Using map
val text=sc.textFile("text.txt").map(_.split(" ")).collect
output:
text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))
Using flatMap
val text=sc.textFile("text.txt").flatMap(_.split(" ")).collect
output:
text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)
Flatmap and Map both transforms the collection.
Difference:
map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.
flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
The transformation function:
map: One element in -> one element out.
flatMap: One element in -> 0 or more elements out (a collection).
If you are asking the difference between RDD.map and RDD.flatMap in Spark, map transforms an RDD of size N to another one of size N . eg.
myRDD.map(x => x*2)
for example, if myRDD is composed of Doubles .
While flatMap can transform the RDD into anther one of a different size: eg.:
myRDD.flatMap(x =>new Seq(2*x,3*x))
which will return an RDD of size 2*N or
myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
Difference in output of map and flatMap:
1.flatMap
val a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect()
Output:
1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
2.map
:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length).collect()
Output:
3 6 6 3 8
For all those who've wanted PySpark related:
Example transformation: flatMap
>>> a="hello what are you doing"
>>> a.split()
['hello', 'what', 'are', 'you', 'doing']
>>> b=["hello what are you doing","this is rak"]
>>> b.split()
Traceback (most recent call last): File "", line 1, in AttributeError: 'list' object has no attribute 'split'
>>> rline=sc.parallelize(b)
>>> type(rline)
>>> def fwords(x):
... return x.split()
>>> rword=rline.map(fwords)
>>> rword.collect()
[['hello', 'what', 'are', 'you', 'doing'], ['this', 'is', 'rak']]
>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()
['hello', 'what', 'are', 'you', 'doing', 'this', 'is', 'rak']
Hope it helps :)
whiles
Source: Stackoverflow.com