How to give more column conditions when joining two dataframes. For example I want to run the following :
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
I want to join only when these columns match. But above syntax is not valid as cols only takes one string. So how do I get what I want.
This question is related to
apache-spark
apache-spark-sql
rdd
The ===
options give me duplicated columns. So I use Seq
instead.
val Lead_all = Leads.join(Utm_Master,
Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
Of course, this only works when the names of the joining columns are the same.
Try this:
val rccJoin=dfRccDeuda.as("dfdeuda")
.join(dfRccCliente.as("dfcliente")
,col("dfdeuda.etarcid")===col("dfcliente.etarcid")
&& col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")
In Pyspark you can simply specify each condition separately:
val Lead_all = Leads.join(Utm_Master,
(Leaddetails.LeadSource == Utm_Master.LeadSource) &
(Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
(Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
(Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
Just be sure to use operators and parenthesis correctly.
One thing you can do is to use raw SQL:
case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
val bar = sqlContext.createDataFrame(sc.parallelize(
Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
Bar(3, 1, 2, "bar") :: Nil))
val foo = sqlContext.createDataFrame(sc.parallelize(
Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
foo.registerTempTable("foo")
bar.registerTempTable("bar")
sqlContext.sql(
"SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
In Pyspark, using parenthesis around each condition is the key to using multiple column names in the join condition.
joined_df = df1.join(df2,
(df1['name'] == df2['name']) &
(df1['phone'] == df2['phone'])
)
Scala:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
To make it case insensitive,
import org.apache.spark.sql.functions.{lower, upper}
then just use lower(value)
in the condition of the join method.
Eg: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))
Spark SQL supports join on tuple of columns when in parentheses, like
... WHERE (list_of_columns1) = (list_of_columns2)
which is a way shorter than specifying equal expressions (=) for each pair of columns combined by a set of "AND"s.
For example:
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
)
instead of
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
)
which is less readable too especially when list of columns is big and you want to deal with NULLs easily.
As of Spark version 1.5.0 (which is currently unreleased), you can join on multiple DataFrame columns. Refer to SPARK-7990: Add methods to facilitate equi-join on multiple join keys.
Python
Leads.join(
Utm_Master,
["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
"left_outer"
)
Scala
The question asked for a Scala answer, but I don't use Scala. Here is my best guess....
Leads.join(
Utm_Master,
Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left_outer"
)
Source: Stackoverflow.com