I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.
I've tried the following without any success:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Also got an error using this:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?
This question is related to
python
apache-spark
dataframe
pyspark
apache-spark-sql
The simplest way to add a column is to use "withColumn". Since the dataframe is created using sqlContext, you have to specify the schema or by default can be available in the dataset. If the schema is specified, the workload becomes tedious when changing every time.
Below is an example that you can consider:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
To add a column using a UDF:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))
To add new column with some custom value or dynamic value calculation which will be populated based on the existing columns.
e.g.
|ColumnA | ColumnB |
|--------|---------|
| 10 | 15 |
| 10 | 20 |
| 10 | 30 |
and new ColumnC as ColumnA+ColumnB
|ColumnA | ColumnB | ColumnC|
|--------|---------|--------|
| 10 | 15 | 25 |
| 10 | 20 | 30 |
| 10 | 30 | 40 |
using
#to add new column
def customColumnVal(row):
rd=row.asDict()
rd["ColumnC"]=row["ColumnA"] + row["ColumnB"]
new_row=Row(**rd)
return new_row
----------------------------
#convert DF to RDD
df_rdd= input_dataframe.rdd
#apply new fucntion to rdd
output_dataframe=df_rdd.map(customColumnVal).toDF()
input_dataframe
is the dataframe which will get modified and customColumnVal
function is having code to add new column.
You can define a new udf
when adding a column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
For Spark 2.0
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
We can add additional columns to DataFrame directly with below steps:
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
There are multiple ways we can add a new column in pySpark.
Let's first create a simple DataFrame.
date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())
Now let's try to double the column value and store it in a new column. PFB few different approaches to achieve the same.
# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()
# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()
# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()
# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()
For more examples and explanation on spark DataFrame functions, you can visit my blog.
I hope this helps.
I would like to offer a generalized example for a very similar use case:
Use Case: I have a csv consisting of:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
I need to perform some transformations and the final csv needs to look like
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.
so:
1) I read the original csv using spark.read and call it "df".
2) I do something to the data.
3) I add the null columns using this script:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).
Source: Stackoverflow.com