[apache-spark] how to loop through each row of dataFrame in pyspark

E.g

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

The above statement print entire table on terminal but i want to access each row in that table using for or while to perform further calculations .

The answer is


Using list comprehensions in python, you can collect an entire column of values into a list using just two lines:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

In the above example, we return a list of tables in database 'default', but the same can be adapted by replacing the query used in sql().

Or more abbreviated:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

And for your example of three columns, we can create a list of dictionaries, and then iterate through them in a for loop.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))

You simply cannot. DataFrames, same as other distributed data structures, are not iterable and can be accessed using only dedicated higher order function and / or SQL methods.

You can of course collect

for row in df.rdd.collect():
    do_something(row)

or convert toLocalIterator

for row in df.rdd.toLocalIterator():
    do_something(row)

and iterate locally as shown above, but it beats all purpose of using Spark.


Give A Try Like this

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)

To "loop" and take advantage of Spark's parallel computation framework, you could define a custom function and use map.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)

or

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

The custom function would then be applied to every row of the dataframe. Note that sample2 will be a RDD, not a dataframe.

Map may be needed if you are going to perform more complex computations. If you just need to add a simple derived column, you can use the withColumn, with returns a dataframe.

sample3 = sample.withColumn('age2', sample.age + 2)

above

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

should be

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

for name, age, and city are not variables but simply keys of the dictionary.


It might not be the best practice, but you can simply target a specific column using collect(), export it as a list of Rows, and loop through the list.

Assume this is your df:

+----------+----------+-------------------+-----------+-----------+------------------+ 
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+ 
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            | 
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+

to loop through rows in Date column:

rows = df3.select('Date').collect()

final_list = []
for i in rows:
    final_list.append(i[0])

print(final_list)

If you want to do something to each row in a DataFrame object, use map. This will allow you to perform further calculations on each row. It's the equivalent of looping across the entire dataset from 0 to len(dataset)-1.

Note that this will return a PipelinedRDD, not a DataFrame.


Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to dataframe

Trying to merge 2 dataframes but get ValueError How to show all of columns name on pandas dataframe? Python Pandas - Find difference between two data frames Pandas get the most frequent values of a column Display all dataframe columns in a Jupyter Python Notebook How to convert column with string type to int form in pyspark data frame? Display/Print one column from a DataFrame of Series in Pandas Binning column with python pandas Selection with .loc in python Set value to an entire column of a pandas dataframe

Examples related to for-loop

List append() in for loop Prime numbers between 1 to 100 in C Programming Language Get current index from foreach loop how to loop through each row of dataFrame in pyspark TypeScript for ... of with index / key? Is there a way in Pandas to use previous row value in dataframe.apply when previous value is also calculated in the apply? Python for and if on one line R for loop skip to next iteration ifelse How to append rows in a pandas dataframe in a for loop? What is the difference between ( for... in ) and ( for... of ) statements?

Examples related to pyspark

Pyspark: Filter dataframe based on multiple conditions How to convert column with string type to int form in pyspark data frame? Select columns in PySpark dataframe How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion PySpark: withColumn() with two conditions and three outcomes How to get name of dataframe column in pyspark? Spark RDD to DataFrame python PySpark 2.0 The size or shape of a DataFrame

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()