[dataframe] Spark dataframe: collect () vs select ()

Calling collect() on an RDD will return the entire dataset to the driver which can cause out of memory and we should avoid that.

Will collect() behave the same way if called on a dataframe?
What about the select() method?
Does it also work the same way as collect() if called on a dataframe?

This question is related to dataframe apache-spark apache-spark-sql

The answer is


Select is a transformation, not an action, so it is lazily evaluated (won't actually do the calculations just map the operations). Collect is an action.

Try:

df.limit(20).collect()


To answer the questions directly:

Will collect() behave the same way if called on a dataframe?

Yes, spark.DataFrame.collect is functionally the same as spark.RDD.collect. They serve the same purpose on these different objects.

What about the select() method?

There is no such thing as spark.RDD.select, so it cannot be the same as spark.DataFrame.select.

Does it also work the same way as collect() if called on a dataframe?

The only thing that is similar between select and collect is that they are both functions on a DataFrame. They have absolutely zero overlap in functionality.

Here's my own description: collect is the opposite of sc.parallelize. select is the same as the SELECT in any SQL statement.

If you are still having trouble understanding what collect actually does (for either RDD or DataFrame), then you need to look up some articles about what spark is doing behind the scenes. e.g.:


calling select will result is lazy evaluation: for example:

val df1 = df.select("col1")
val df2 = df1.filter("col1 == 3")

both above statements create lazy path that will be executed when you call action on that df, such as show, collect etc.

val df3 = df2.collect()

use .explain at the end of your transformation to follow its plan here is more detailed info Transformations and Actions


Short answer in bolds:

  • collect is mainly to serialize
    (loss of parallelism preserving all other data characteristics of the dataframe)
    For example with a PrintWriter pw you can't do direct df.foreach( r => pw.write(r) ), must to use collect before foreach, df.collect.foreach(etc).
    PS: the "loss of parallelism" is not a "total loss" because after serialization it can be distributed again to executors.

  • select is mainly to select columns, similar to projection in relational algebra
    (only similar in framework's context because Spark select not deduplicate data).
    So, it is also a complement of filter in the framework's context.


Commenting explanations of other answers: I like the Jeff's classification of Spark operations in transformations (as select) and actions (as collect). It is also good remember that transforms (including select) are lazily evaluated.


Select is used for projecting some or all fields of a dataframe. It won't give you an value as an output but a new dataframe. Its a transformation.


Examples related to dataframe

Trying to merge 2 dataframes but get ValueError How to show all of columns name on pandas dataframe? Python Pandas - Find difference between two data frames Pandas get the most frequent values of a column Display all dataframe columns in a Jupyter Python Notebook How to convert column with string type to int form in pyspark data frame? Display/Print one column from a DataFrame of Series in Pandas Binning column with python pandas Selection with .loc in python Set value to an entire column of a pandas dataframe

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()