Calling collect()
on an RDD will return the entire dataset to the driver which can cause out of memory and we should avoid that.
Will collect()
behave the same way if called on a dataframe?
What about the select()
method?
Does it also work the same way as collect()
if called on a dataframe?
This question is related to
dataframe
apache-spark
apache-spark-sql
Select
is a transformation, not an action, so it is lazily evaluated (won't actually do the calculations just map the operations). Collect
is an action.
Try:
df.limit(20).collect()
To answer the questions directly:
Will
collect()
behave the same way if called on a dataframe?
Yes, spark.DataFrame.collect
is functionally the same as spark.RDD.collect
. They serve the same purpose on these different objects.
What about the
select()
method?
There is no such thing as spark.RDD.select
, so it cannot be the same as spark.DataFrame.select
.
Does it also work the same way as
collect()
if called on a dataframe?
The only thing that is similar between select
and collect
is that they are both functions on a DataFrame. They have absolutely zero overlap in functionality.
Here's my own description: collect
is the opposite of sc.parallelize
. select
is the same as the SELECT
in any SQL statement.
If you are still having trouble understanding what collect
actually does (for either RDD or DataFrame), then you need to look up some articles about what spark is doing behind the scenes. e.g.:
calling select
will result is lazy
evaluation: for example:
val df1 = df.select("col1")
val df2 = df1.filter("col1 == 3")
both above statements create lazy path that will be executed when you call action on that df
, such as show
, collect
etc.
val df3 = df2.collect()
use .explain
at the end of your transformation to follow its plan
here is more detailed info Transformations and Actions
Short answer in bolds:
collect
is mainly to serialize
(loss of parallelism preserving all other data characteristics of the dataframe)
For example with a PrintWriter pw
you can't do direct df.foreach( r => pw.write(r) )
, must to use collect
before foreach
, df.collect.foreach(etc)
.
PS: the "loss of parallelism" is not a "total loss" because after serialization it can be distributed again to executors.
select
is mainly to select columns, similar to projection in relational algebra
(only similar in framework's context because Spark select
not deduplicate data).
So, it is also a complement of filter
in the framework's context.
Commenting explanations of other answers: I like the Jeff's classification of Spark operations in transformations (as select
) and actions (as collect
). It is also good remember that transforms (including select
) are lazily evaluated.
Select
is used for projecting some or all fields of a dataframe
. It won't give you an value
as an output but a new dataframe
. Its a transformation
.
Source: Stackoverflow.com