[pyspark] Join two data frames, select all columns from one and some columns from the other

Let's say I have a spark data frame df1, with several columns (among which the column id) and data frame df2 with two columns, id and other.

Is there a way to replicate the following command

sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id")

by using only pyspark functions such as join(), select() and the like?

I have to implement this join in a function and I don't want to be forced to have sqlContext as a function parameter.

Thanks!

This question is related to pyspark apache-spark-sql

The answer is


Here is the code snippet that does the inner join and select the columns from both dataframe and alias the same column to different column name.

emp_df  = spark.read.csv('Employees.csv', header =True);
dept_df = spark.read.csv('dept.csv', header =True)


emp_dept_df = emp_df.join(dept_df,'DeptID').select(emp_df['*'], dept_df['Name'].alias('DName'))
emp_df.show()
dept_df.show()
emp_dept_df.show()
Output  for 'emp_df.show()':

+---+---------+------+------+
| ID|     Name|Salary|DeptID|
+---+---------+------+------+
|  1|     John| 20000|     1|
|  2|    Rohit| 15000|     2|
|  3|    Parth| 14600|     3|
|  4|  Rishabh| 20500|     1|
|  5|    Daisy| 34000|     2|
|  6|    Annie| 23000|     1|
|  7| Sushmita| 50000|     3|
|  8| Kaivalya| 20000|     1|
|  9|    Varun| 70000|     3|
| 10|Shambhavi| 21500|     2|
| 11|  Johnson| 25500|     3|
| 12|     Riya| 17000|     2|
| 13|    Krish| 17000|     1|
| 14| Akanksha| 20000|     2|
| 15|   Rutuja| 21000|     3|
+---+---------+------+------+

Output  for 'dept_df.show()':
+------+----------+
|DeptID|      Name|
+------+----------+
|     1|     Sales|
|     2|Accounting|
|     3| Marketing|
+------+----------+

Join Output:
+---+---------+------+------+----------+
| ID|     Name|Salary|DeptID|     DName|
+---+---------+------+------+----------+
|  1|     John| 20000|     1|     Sales|
|  2|    Rohit| 15000|     2|Accounting|
|  3|    Parth| 14600|     3| Marketing|
|  4|  Rishabh| 20500|     1|     Sales|
|  5|    Daisy| 34000|     2|Accounting|
|  6|    Annie| 23000|     1|     Sales|
|  7| Sushmita| 50000|     3| Marketing|
|  8| Kaivalya| 20000|     1|     Sales|
|  9|    Varun| 70000|     3| Marketing|
| 10|Shambhavi| 21500|     2|Accounting|
| 11|  Johnson| 25500|     3| Marketing|
| 12|     Riya| 17000|     2|Accounting|
| 13|    Krish| 17000|     1|     Sales|
| 14| Akanksha| 20000|     2|Accounting|
| 15|   Rutuja| 21000|     3| Marketing|
+---+---------+------+------+----------+

I got an error: 'a not found' using the suggested code:

from pyspark.sql.functions import col df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])

I changed a.columns to df1.columns and it worked out.


Here is a solution that does not require a SQL context, but maintains the metadata of a DataFrame.

a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']]).toDF(["other", "b_id"])

c = a.join(b, a.a_id == b.b_id)

Then, c.show() yields:

+----+-----+-----+----+
|a_id|extra|other|b_id|
+----+-----+-----+----+
|   a|  foo|   p1|   a|
|   b|  hem|   p2|   b|
|   c|  haw|   p3|   c|
+----+-----+-----+----+


function to drop duplicate columns after joining.

check it

def dropDupeDfCols(df): newcols = [] dupcols = []

for i in range(len(df.columns)):
    if df.columns[i] not in newcols:
        newcols.append(df.columns[i])
    else:
        dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
    df = df.drop(str(dupcol))

return df.toDF(*newcols)

I believe that this would be the easiest and most intuitive way:

final = (df1.alias('df1').join(df2.alias('df2'),
                               on = df1['id'] == df2['id'],
                               how = 'inner')
                         .select('df1.*',
                                 'df2.other')
)

drop duplicate b_id

c = a.join(b, a.a_id == b.b_id).drop(b.b_id)

Without using alias.

df1.join(df2, df1.id == df2.id).select(df1["*"],df2["other"])

Asterisk (*) works with alias. Ex:

from pyspark.sql.functions import *

df1 = df1.alias('df1')
df2 = df2.alias('df2')

df1.join(df2, df1.id == df2.id).select('df1.*')