[python] "Large data" workflows using pandas

I have tried to puzzle out an answer to this question for many months while learning pandas. I use SAS for my day-to-day work and it is great for it's out-of-core support. However, SAS is horrible as a piece of software for numerous other reasons.

One day I hope to replace my use of SAS with python and pandas, but I currently lack an out-of-core workflow for large datasets. I'm not talking about "big data" that requires a distributed network, but rather files too large to fit in memory but small enough to fit on a hard-drive.

My first thought is to use HDFStore to hold large datasets on disk and pull only the pieces I need into dataframes for analysis. Others have mentioned MongoDB as an easier to use alternative. My question is this:

What are some best-practice workflows for accomplishing the following:

  1. Loading flat files into a permanent, on-disk database structure
  2. Querying that database to retrieve data to feed into a pandas data structure
  3. Updating the database after manipulating pieces in pandas

Real-world examples would be much appreciated, especially from anyone who uses pandas on "large data".

Edit -- an example of how I would like this to work:

  1. Iteratively import a large flat-file and store it in a permanent, on-disk database structure. These files are typically too large to fit in memory.
  2. In order to use Pandas, I would like to read subsets of this data (usually just a few columns at a time) that can fit in memory.
  3. I would create new columns by performing various operations on the selected columns.
  4. I would then have to append these new columns into the database structure.

I am trying to find a best-practice way of performing these steps. Reading links about pandas and pytables it seems that appending a new column could be a problem.

Edit -- Responding to Jeff's questions specifically:

  1. I am building consumer credit risk models. The kinds of data include phone, SSN and address characteristics; property values; derogatory information like criminal records, bankruptcies, etc... The datasets I use every day have nearly 1,000 to 2,000 fields on average of mixed data types: continuous, nominal and ordinal variables of both numeric and character data. I rarely append rows, but I do perform many operations that create new columns.
  2. Typical operations involve combining several columns using conditional logic into a new, compound column. For example, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'. The result of these operations is a new column for every record in my dataset.
  3. Finally, I would like to append these new columns into the on-disk data structure. I would repeat step 2, exploring the data with crosstabs and descriptive statistics trying to find interesting, intuitive relationships to model.
  4. A typical project file is usually about 1GB. Files are organized into such a manner where a row consists of a record of consumer data. Each row has the same number of columns for every record. This will always be the case.
  5. It's pretty rare that I would subset by rows when creating a new column. However, it's pretty common for me to subset on rows when creating reports or generating descriptive statistics. For example, I might want to create a simple frequency for a specific line of business, say Retail credit cards. To do this, I would select only those records where the line of business = retail in addition to whichever columns I want to report on. When creating new columns, however, I would pull all rows of data and only the columns I need for the operations.
  6. The modeling process requires that I analyze every column, look for interesting relationships with some outcome variable, and create new compound columns that describe those relationships. The columns that I explore are usually done in small sets. For example, I will focus on a set of say 20 columns just dealing with property values and observe how they relate to defaulting on a loan. Once those are explored and new columns are created, I then move on to another group of columns, say college education, and repeat the process. What I'm doing is creating candidate variables that explain the relationship between my data and some outcome. At the very end of this process, I apply some learning techniques that create an equation out of those compound columns.

It is rare that I would ever add rows to the dataset. I will nearly always be creating new columns (variables or features in statistics/machine learning parlance).

This question is related to python mongodb pandas hdf5 large-data

The answer is


There is now, two years after the question, an 'out-of-core' pandas equivalent: dask. It is excellent! Though it does not support all of pandas functionality, you can get really far with it. Update: in the past two years it has been consistently maintained and there is substantial user community working with Dask.

And now, four years after the question, there is another high-performance 'out-of-core' pandas equivalent in Vaex. It "uses memory mapping, zero memory copy policy and lazy computations for best performance (no memory wasted)." It can handle data sets of billions of rows and does not store them into memory (making it even possible to do analysis on suboptimal hardware).


As noted by others, after some years an 'out-of-core' pandas equivalent has emerged: dask. Though dask is not a drop-in replacement of pandas and all of its functionality it stands out for several reasons:

Dask is a flexible parallel computing library for analytic computing that is optimized for dynamic task scheduling for interactive computational workloads of “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments and scales from laptops to clusters.

Dask emphasizes the following virtues:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
  • Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
  • Native: Enables distributed computing in Pure Python with access to the PyData stack.
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • Scales up: Runs resiliently on clusters with 1000s of cores Scales down: Trivial to set up and run on a laptop in a single process
  • Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

and to add a simple code sample:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

replaces some pandas code like this:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

and, especially noteworthy, provides through the concurrent.futures interface a general infrastructure for the submission of custom tasks:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

I think the answers above are missing a simple approach that I've found very useful.

When I have a file that is too large to load in memory, I break up the file into multiple smaller files (either by row or cols)

Example: In case of 30 days worth of trading data of ~30GB size, I break it into a file per day of ~1GB size. I subsequently process each file separately and aggregate results at the end

One of the biggest advantages is that it allows parallel processing of the files (either multiple threads or processes)

The other advantage is that file manipulation (like adding/removing dates in the example) can be accomplished by regular shell commands, which is not be possible in more advanced/complicated file formats

This approach doesn't cover all scenarios, but is very useful in a lot of them


I recently came across a similar issue. I found simply reading the data in chunks and appending it as I write it in chunks to the same csv works well. My problem was adding a date column based on information in another table, using the value of certain columns as follows. This may help those confused by dask and hdf5 but more familiar with pandas like myself.

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

If your datasets are between 1 and 20GB, you should get a workstation with 48GB of RAM. Then Pandas can hold the entire dataset in RAM. I know its not the answer you're looking for here, but doing scientific computing on a notebook with 4GB of RAM isn't reasonable.


One more variation

Many of the operations done in pandas can also be done as a db query (sql, mongo)

Using a RDBMS or mongodb allows you to perform some of the aggregations in the DB Query (which is optimized for large data, and uses cache and indexes efficiently)

Later, you can perform post processing using pandas.

The advantage of this method is that you gain the DB optimizations for working with large data, while still defining the logic in a high level declarative syntax - and not having to deal with the details of deciding what to do in memory and what to do out of core.

And although the query language and pandas are different, it's usually not complicated to translate part of the logic from one to another.


It is worth mentioning here Ray as well,
it's a distributed computation framework, that has it's own implementation for pandas in a distributed way.

Just replace the pandas import, and the code should work as is:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

can read more details here:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/


One trick I found helpful for large data use cases is to reduce the volume of the data by reducing float precision to 32-bit. It's not applicable in all cases, but in many applications 64-bit precision is overkill and the 2x memory savings are worth it. To make an obvious point even more obvious:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

At the moment I am working "like" you, just on a lower scale, which is why I don't have a PoC for my suggestion.

However, I seem to find success in using pickle as caching system and outsourcing execution of various functions into files - executing these files from my commando / main file; For example i use a prepare_use.py to convert object types, split a data set into test, validating and prediction data set.

How does your caching with pickle work? I use strings in order to access pickle-files that are dynamically created, depending on which parameters and data sets were passed (with that i try to capture and determine if the program was already run, using .shape for data set, dict for passed parameters). Respecting these measures, i get a String to try to find and read a .pickle-file and can, if found, skip processing time in order to jump to the execution i am working on right now.

Using databases I encountered similar problems, which is why i found joy in using this solution, however - there are many constraints for sure - for example storing huge pickle sets due to redundancy. Updating a table from before to after a transformation can be done with proper indexing - validating information opens up a whole other book (I tried consolidating crawled rent data and stopped using a database after 2 hours basically - as I would have liked to jump back after every transformation process)

I hope my 2 cents help you in some way.

Greetings.


Consider Ruffus if you go the simple path of creating a data pipeline which is broken down into multiple smaller files.


I know this is an old thread but I think the Blaze library is worth checking out. It's built for these types of situations.

From the docs:

Blaze extends the usability of NumPy and Pandas to distributed and out-of-core computing. Blaze provides an interface similar to that of the NumPy ND-Array or Pandas DataFrame but maps these familiar interfaces onto a variety of other computational engines like Postgres or Spark.

Edit: By the way, it's supported by ContinuumIO and Travis Oliphant, author of NumPy.


This is the case for pymongo. I have also prototyped using sql server, sqlite, HDF, ORM (SQLAlchemy) in python. First and foremost pymongo is a document based DB, so each person would be a document (dict of attributes). Many people form a collection and you can have many collections (people, stock market, income).

pd.dateframe -> pymongo Note: I use the chunksize in read_csv to keep it to 5 to 10k records(pymongo drops the socket if larger)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

querying: gt = greater than...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() returns an iterator so I commonly use ichunked to chop into smaller iterators.

How about a join since I normally get 10 data sources to paste together:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

then (in my case sometimes I have to agg on aJoinDF first before its "mergeable".)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

And you can then write the new info to your main collection via the update method below. (logical collection vs physical datasources).

collection.update({primarykey:foo},{key:change})

On smaller lookups, just denormalize. For example, you have code in the document and you just add the field code text and do a dict lookup as you create documents.

Now you have a nice dataset based around a person, you can unleash your logic on each case and make more attributes. Finally you can read into pandas your 3 to memory max key indicators and do pivots/agg/data exploration. This works for me for 3 million records with numbers/big text/categories/codes/floats/...

You can also use the two methods built into MongoDB (MapReduce and aggregate framework). See here for more info about the aggregate framework, as it seems to be easier than MapReduce and looks handy for quick aggregate work. Notice I didn't need to define my fields or relations, and I can add items to a document. At the current state of the rapidly changing numpy, pandas, python toolset, MongoDB helps me just get to work :)


I'd like to point out the Vaex package.

Vaex is a python library for lazy Out-of-Core DataFrames (similar to Pandas), to visualize and explore big tabular datasets. It can calculate statistics such as mean, sum, count, standard deviation etc, on an N-dimensional grid up to a billion (109) objects/rows per second. Visualization is done using histograms, density plots and 3d volume rendering, allowing interactive exploration of big data. Vaex uses memory mapping, zero memory copy policy and lazy computations for best performance (no memory wasted).

Have a look at the documentation: https://vaex.readthedocs.io/en/latest/ The API is very close to the API of pandas.


Why Pandas ? Have you tried Standard Python ?

The use of standard library python. Pandas is subject to frequent updates, even with the recent release of the stable version.

Using the standard python library your code will always run.

One way of doing it is to have an idea of the way you want your data to be stored , and which questions you want to solve regarding the data. Then draw a schema of how you can organise your data (think tables) that will help you query the data, not necessarily normalisation.

You can make good use of :

  • list of dictionaries to store the data in memory (Think Amazon EC2) or disk, one dict being one row,
  • generators to process the data row after row to not overflow your RAM,
  • list comprehension to query your data,
  • make use of Counter, DefaultDict, ...
  • store your data on your hard drive using whatever storing solution you have chosen, json could be one of them.

Ram and HDD is becoming cheaper and cheaper with time and standard python 3 is widely available and stable.

The fondamental question you are trying to solve is "how to query large sets of data ?". The hdfs architecture is more or less what I am describing here (data modelling with data being stored on disk).

Let's say you have 1000 petabytes of data, there no way you will be able to store it in Dask or Pandas, your best chances here is to store it on disk and process it with generators.


I spotted this a little late, but I work with a similar problem (mortgage prepayment models). My solution has been to skip the pandas HDFStore layer and use straight pytables. I save each column as an individual HDF5 array in my final file.

My basic workflow is to first get a CSV file from the database. I gzip it, so it's not as huge. Then I convert that to a row-oriented HDF5 file, by iterating over it in python, converting each row to a real data type, and writing it to a HDF5 file. That takes some tens of minutes, but it doesn't use any memory, since it's only operating row-by-row. Then I "transpose" the row-oriented HDF5 file into a column-oriented HDF5 file.

The table transpose looks like:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

Reading it back in then looks like:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Now, I generally run this on a machine with a ton of memory, so I may not be careful enough with my memory usage. For example, by default the load operation reads the whole data set.

This generally works for me, but it's a bit clunky, and I can't use the fancy pytables magic.

Edit: The real advantage of this approach, over the array-of-records pytables default, is that I can then load the data into R using h5r, which can't handle tables. Or, at least, I've been unable to get it to load heterogeneous tables.


Examples related to python

programming a servo thru a barometer Is there a way to view two blocks of code from the same file simultaneously in Sublime Text? python variable NameError Why my regexp for hyphenated words doesn't work? Comparing a variable with a string python not working when redirecting from bash script is it possible to add colors to python output? Get Public URL for File - Google Cloud Storage - App Engine (Python) Real time face detection OpenCV, Python xlrd.biffh.XLRDError: Excel xlsx file; not supported Could not load dynamic library 'cudart64_101.dll' on tensorflow CPU-only installation

Examples related to mongodb

Server Discovery And Monitoring engine is deprecated Avoid "current URL string parser is deprecated" warning by setting useNewUrlParser to true MongoNetworkError: failed to connect to server [localhost:27017] on first connect [MongoNetworkError: connect ECONNREFUSED 127.0.0.1:27017] Failed to auto-configure a DataSource: 'spring.datasource.url' is not specified Failed to start mongod.service: Unit mongod.service not found db.collection is not a function when using MongoClient v3.0 MongoError: connect ECONNREFUSED 127.0.0.1:27017 MongoDB: How To Delete All Records Of A Collection in MongoDB Shell? How to resolve Nodejs: Error: ENOENT: no such file or directory How to create a DB for MongoDB container on start up?

Examples related to pandas

xlrd.biffh.XLRDError: Excel xlsx file; not supported Pandas Merging 101 How to increase image size of pandas.DataFrame.plot in jupyter notebook? Trying to merge 2 dataframes but get ValueError Python Pandas User Warning: Sorting because non-concatenation axis is not aligned How to show all of columns name on pandas dataframe? Pandas/Python: Set value of one column based on value in another column Python Pandas - Find difference between two data frames Pandas get the most frequent values of a column Python convert object to float

Examples related to hdf5

How to read HDF5 files in Python "Large data" workflows using pandas

Examples related to large-data

"Large data" workflows using pandas What causes a Python segmentation fault?