Wednesday, August 20, 2014

Quick HDF5 with Pandas

HDF5 is a format designed to store large numerical arrays of homogenous type. It cames particularly handy when you need to organize your data models in a hierarchical fashion and you also need a fast way to retrieve the data. Pandas implements a quick and intuitive interface for this format and in this post will shortly introduce how it works.

We can create a HDF5 file using the HDFStore class provided by Pandas:
import numpy as np
from pandas import HDFStore,DataFrame
# create (or open) an hdf5 file and opens in append mode
hdf = HDFStore('storage.h5')
Now we can store a dataset into the file we just created:
df = DataFrame(np.random.rand(5,3), columns=('A','B','C'))
# put the dataset in the storage
hdf.put('d1', df, format='table', data_columns=True)
The structure used to represent the hdf file in Python is a dictionary and we can access to our data using the name of the dataset as key:
print hdf['d1'].shape
(5, 3)
The data in the storage can be manipulated. For example, we can append new data to the dataset we just created:
hdf.append('d1', DataFrame(np.random.rand(5,3), 
           columns=('A','B','C')), 
           format='table', data_columns=True)
hdf.close() # closes the file
There are many ways to open a hdf5 storage, we could use again the constructor of the class HDFStorage, but the function read_hdf makes us also able to query the data:
from pandas import read_hdf
# this query selects the columns A and B
# where the values of A is greather than 0.5
hdf = read_hdf('storage.h5', 'd1',
               where=['A>.5'], columns=['A','B'])
At this point, we have a storage which contains a single dataset. The structure of the storage can be organized using groups. In the following example we add three different datasets to the hdf5 file, two in the same group and another one in a different one:
hdf = HDFStore('storage.h5')
hdf.put('tables/t1', DataFrame(np.random.rand(20,5)))
hdf.put('tables/t2', DataFrame(np.random.rand(10,3)))
hdf.put('new_tables/t1', DataFrame(np.random.rand(15,2)))
Our hdf5 storage now looks like this:
print hdf

File path: storage.h5
/d1             frame_table  (typ->appendable,nrows->10,ncols->3,indexers->[index],dc->[A,B,C])
/new_tables/t1  frame        (shape->[15,2])                                                   
/tables/t1      frame        (shape->[20,5])                                                   
/tables/t2      frame        (shape->[10,3])  
On the left we can see the hierarchy of the groups added to the storage, in the middle we have the type of dataset and on the right there is the list of attributes attached to the dataset. Attributes are pieces of metadata you can stick on objects in the file and the attributes we see here are automatically created by Pandas in order to describe the information required to recover the data from the hdf5 storage system.

Friday, May 23, 2014

Code parallelization with joblib

Recently I've been working on the parallelization of some Python code and I discovered Joblib. It is a library that supports pipelining and offers a good support for parallelization. In this post we will implement a (very naive) paraller matrix by matrix multiplication algorithm to show the parallelization capabilities of this library.
from joblib import Parallel, delayed

def parallel_dot(A,B,n_jobs=2):
    """
     Computes A x B using more CPUs.
     This works only when the number 
     of rows of A and the n_jobs are even.
    """
    parallelizer = Parallel(n_jobs=n_jobs)
    # this iterator returns the functions to execute for each task
    tasks_iterator = ( delayed(np.dot)(A_block,B) 
                      for A_block in np.split(A,n_jobs) )
    result = parallelizer( tasks_iterator )
    # merging the output of the jobs
    return np.vstack(result)
This function spreads the computation across more precesses. The strategy applied to distribute the data is very simple. Each process has the full matrix B and a contiguous block of rows of A, so it can compute a block of rows A*B. In the end, the result of each process is stacked to build final matrix.

Let's compare the parallel version of the algorithm with the sequential one:
A = np.random.randint(0,high=10,size=(1000,1000))
B = np.random.randint(0,high=10,size=(1000,1000))
%time _ = np.dot(A,B)
CPU times: user 13.2 s, sys: 36 ms, total: 13.2 s
Wall time: 13.4 s
%time _ = parallel_dot(A,B,n_jobs=2)
CPU times: user 92 ms, sys: 76 ms, total: 168 ms
Wall time: 8.49 s
Wow, we had a speedup of 1.6X, not bad for a so naive algorithm. It's important to notice that the arguments passed as input to the Parallel call are serialized and reallocated in the memory of each worker process. Which means that the last time that parallel_dot have been called, the matrix B have been entirely replicated two times in memory. To avoid this problem, we can dump the matrices on the filesystem and pass a reference to the worker to open them as memory map.
import tempfile
import os
from joblib import load, dump

# saving A and B to a local file for memmapping
temp_folder = tempfile.mkdtemp()
filenameA = os.path.join(temp_folder, 'A.mmap')
dump(A, filenameA)
filenameB = os.path.join(temp_folder, 'B.mmap')
dump(A, filenameB)
Now, when parallel_dot(A_memmap,B_memmap,n_jobs=2) is called, both the processes created will use only a reference to the matrix B..

Tuesday, April 22, 2014

Parameters selection with Cross-Validation

Most of the pattern recognition techniques have one or more free parameters and choose them for a given classification problem is often not a trivial task. In real applications we only have access to a finite set of examples, usually smaller than we wanted, and we need to test our model on samples not seen during the training process. A model that would just classify the samples that it has seen would have a very good score, but would definitely fail to predict unseen data. This situation is called overfitting and to avoid it we need to apply an appropriate validation procedure to select the parameters. A tool that can help us solve this problem is the Cross-Validation (CV). The idea behind CV is simple: the data are split into train and test sets several consecutive times and the averaged value of the prediction scores obtained with the different sets is the evaluation of the classifier.
Let's see a simple example where a smoothing parameter for a Bayesian classifier is select using the capabilities of the Sklearn library.
To begin we load one of the test datasets provided by sklearn (the same used here) and we hold 33% of the samples for the final evaluation:
from sklearn.datasets import load_digits
data = load_digits()
from sklearn.cross_validation import train_test_split
X,X_test,y,y_test = train_test_split(data.data,data.target,
                                     test_size=.33,
                                     random_state=1899)
Now, we import the classifier we want to use (a Bernoullian Naive Bayes in this case), specify a set of values for the parameter we want to choose and run a grid search:
from sklearn.naive_bayes import BernoulliNB
# test the model for alpha = 0.1, 0.2, ..., 1.0
parameters = [{'alpha':np.linspace(0.1,1,10)}]

from sklearn.grid_search import GridSearchCV
clf = GridSearchCV(BernoulliNB(), parameters, cv=10, scoring='f1')
clf.fit(X,y) # running the grid search
The grid search has evaluated the classifier for each value specified for the parameter alpha using the CV. We can visualize the results as follows:
res = zip(*[(f1m, f1s.std(), p['alpha']) 
            for p, f1m, f1s in clf.grid_scores_])
subplot(2,1,1)
plot(res[2],res[0],'-o')
subplot(2,1,2)
plot(res[2],res[1],'-o')
show()

The plots above show the average score (top) and the standard deviation of the score (bottom) for each values of alpha used. Looking at the graphs it seems plausible that a small alpha could be a good choice.
We can also see thet using the alpha value that gave us the best results on the the test set we selected at the beginning gives us results that are similar to the ones obtained during the CV stage:
from sklearn.metrics import f1_score
print 'Best alpha in CV = %0.01f' % clf.best_params_['alpha']
final = f1_score(y_test,clf.best_estimator_.predict(X_test))
print 'F1-score on the final testset: %0.5f' % final
Best alpha in CV = 0.1
F1-score on the final testset: 0.85861