[apache-spark] Spark specify multiple column conditions for dataframe join

How to give more column conditions when joining two dataframes. For example I want to run the following :

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")

I want to join only when these columns match. But above syntax is not valid as cols only takes one string. So how do I get what I want.

This question is related to apache-spark apache-spark-sql rdd

The answer is


The === options give me duplicated columns. So I use Seq instead.

val Lead_all = Leads.join(Utm_Master,
    Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")

Of course, this only works when the names of the joining columns are the same.


Try this:

val rccJoin=dfRccDeuda.as("dfdeuda")
.join(dfRccCliente.as("dfcliente")
,col("dfdeuda.etarcid")===col("dfcliente.etarcid") 
&& col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")

In Pyspark you can simply specify each condition separately:

val Lead_all = Leads.join(Utm_Master,  
    (Leaddetails.LeadSource == Utm_Master.LeadSource) &
    (Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
    (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
    (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))

Just be sure to use operators and parenthesis correctly.


One thing you can do is to use raw SQL:

case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)

val bar = sqlContext.createDataFrame(sc.parallelize(
    Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
    Bar(3, 1, 2, "bar") :: Nil))

val foo = sqlContext.createDataFrame(sc.parallelize(
    Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
    Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))

foo.registerTempTable("foo")
bar.registerTempTable("bar")

sqlContext.sql(
    "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")

In Pyspark, using parenthesis around each condition is the key to using multiple column names in the join condition.

joined_df = df1.join(df2, 
    (df1['name'] == df2['name']) &
    (df1['phone'] == df2['phone'])
)

Scala:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

To make it case insensitive,

import org.apache.spark.sql.functions.{lower, upper}

then just use lower(value) in the condition of the join method.

Eg: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))


Spark SQL supports join on tuple of columns when in parentheses, like

... WHERE (list_of_columns1) = (list_of_columns2)

which is a way shorter than specifying equal expressions (=) for each pair of columns combined by a set of "AND"s.

For example:

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
   )

instead of

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
   )

which is less readable too especially when list of columns is big and you want to deal with NULLs easily.


As of Spark version 1.5.0 (which is currently unreleased), you can join on multiple DataFrame columns. Refer to SPARK-7990: Add methods to facilitate equi-join on multiple join keys.

Python

Leads.join(
    Utm_Master, 
    ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
    "left_outer"
)

Scala

The question asked for a Scala answer, but I don't use Scala. Here is my best guess....

Leads.join(
    Utm_Master,
    Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
    "left_outer"
)

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to apache-spark-sql

Select Specific Columns from Spark DataFrame Pyspark: Filter dataframe based on multiple conditions Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Filter df when values matches part of a string in pyspark Convert date from String to Date format in Dataframes Take n rows from a spark dataframe and pass to toPandas()

Examples related to rdd

How to create a DataFrame from a text file in Spark Spark - repartition() vs coalesce() Difference between DataFrame, Dataset, and RDD in Spark Spark specify multiple column conditions for dataframe join Spark read file from S3 using sc.textFile ("s3n://...) Spark: subtract two DataFrames How to convert rdd object to dataframe in spark What is the difference between cache and persist? Apache Spark: map vs mapPartitions?