[scala] How to define partitioning of DataFrame?

I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this.

One of the data tables I'm working with contains a list of transactions, by account, silimar to the following example.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

At least initially, most of the calculations will occur between the transactions within an account. So I would want to have the data partitioned so that all of the transactions for an account are in the same Spark partition.

But I'm not seeing a way to define this. The DataFrame class has a method called 'repartition(Int)', where you can specify the number of partitions to create. But I'm not seeing any method available to define a custom partitioner for a DataFrame, such as can be specified for an RDD.

The source data is stored in Parquet. I did see that when writing a DataFrame to Parquet, you can specify a column to partition by, so presumably I could tell Parquet to partition it's data by the 'Account' column. But there could be millions of accounts, and if I'm understanding Parquet correctly, it would create a distinct directory for each Account, so that didn't sound like a reasonable solution.

Is there a way to get Spark to partition this DataFrame so that all data for an Account is in the same partition?

The answer is

So to start with some kind of answer : ) - You can't

I am not an expert, but as far as I understand DataFrames, they are not equal to rdd and DataFrame has no such thing as Partitioner.

Generally DataFrame's idea is to provide another level of abstraction that handles such problems itself. The queries on DataFrame are translated into logical plan that is further translated to operations on RDDs. The partitioning you suggested will probably be applied automatically or at least should be.

If you don't trust SparkSQL that it will provide some kind of optimal job, you can always transform DataFrame to RDD[Row] as suggested in of the comments.

In Spark < 1.6 If you create a HiveContext, not the plain old SqlContext you can use the HiveQL DISTRIBUTE BY colX... (ensures each of N reducers gets non-overlapping ranges of x) & CLUSTER BY colX... (shortcut for Distribute By and Sort By) for example;

hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Not sure how this fits in with Spark DF api. These keywords aren't supported in the normal SqlContext (note you dont need to have a hive meta store to use the HiveContext)

EDIT: Spark 1.6+ now has this in the native DataFrame API

Use the DataFrame returned by:


There is no explicit way to use partitionBy on a DataFrame, only on a PairRDD, but when you sort a DataFrame, it will use that in it's LogicalPlan and that will help when you need to make calculations on each Account.

I just stumbled upon the same exact issue, with a dataframe that I want to partition by account. I assume that when you say "want to have the data partitioned so that all of the transactions for an account are in the same Spark partition", you want it for scale and performance, but your code doesn't depend on it (like using mapPartitions() etc), right?

I was able to do this using RDD. But I don't know if this is an acceptable solution for you. Once you have the DF available as an RDD, you can apply repartitionAndSortWithinPartitions to perform custom repartitioning of data.

Here is a sample I used:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions

  override def numPartitions: Int = partitions

  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }

