Tuesday, 15 January 2013

Many Bee's make quick work. Parallel python scripting framework with pprocess



     ^^      .-=-=-=-.  ^^
 ^^        (`-=-=-=-=-`)         ^^
         (`-=-=-=-=-=-=-`)  ^^         ^^
   ^^   (`-=-=-=-=-=-=-=-`)   ^^                            ^^
       ( `-=-=-=-(@)-=-=-` )      ^^
       (`-=-=-=-=-=-=-=-=-`)  ^^
       (`-=-=-=-=-=-=-=-=-`)              ^^
       (`-=-=-=-=-=-=-=-=-`)                      ^^
       (`-=-=-=-=-=-=-=-=-`)  ^^
        (`-=-=-=-=-=-=-=-`)          ^^
         (`-=-=-=-=-=-=-`)  ^^                 ^^
     jgs   (`-=-=-=-=-`)
            `-=-=-=-=-`


Many Bee's make little work, which is why I found myself looking for more workers instead of bigger workers.

The problem is that, as part of my thesis, I need to crunch 1TB's of data in three separate instances, each taking in excess of 12 hours. This is far too long.

The problem is that tshark is single threaded, making additional processing units useless. Since my ESX server is not to big on the Ghz, but bloated in the cores department I needed to find a way to improve the performance without busting the bank with a quicker CPU.

I have also been learning Python, so I figured this was a prime time to flex my newly developed Python muscles!

Quick disclaimer though: I am NOT a python genius, nor am I a guru of coding. This code is functional but far from perfect, and all that I hope is that someone else will be able find some usability in this snippet.

Here is the script:

import subprocess
import os
import pprocess
import time
 
start = time.time()
def wshark(pcap):
  subprocess.call("/usr/bin/script.sh {0} ".format(pcap), stdout = open( 'log.txt', 'a'), shell=True)

directory_loc = "/warehouse"
file_list = []
for path, subdirs, files in os.walk(directory_loc):
    for name in files:
        file_list.append(os.path.join(path, name))
 
# Parallel computation:
nproc = 3       # maximum number of simultaneous processes desired
                        # available cores -1 is preferable after testing
results = pprocess.Map(limit=nproc, reuse=1)
parallel_function = results.manage(pprocess.MakeReusable(wshark))
[parallel_function(args) for args in file_list];  # Start computing things
parallel_results = results[0:len(file_list)]
print 'It took', time.time()-start, 'seconds'

The quick and dirty explanation is as follows:
Imports are pretty standard, apart from pprocess, which allows for easy multi-proc commands. I then defines a function wshark  which accepts one argument, a filename that is in a pre-populated list. The function uses the python subprocess.call built in function to fire off a system process. 

stdout=open('log.txt','a') logs all the standard input to a file named log.txt, and does so in append mode. file_list is the list that is populated with a list of files in a directory named /warehouse.

Now the pprocess magic starts. My knowledge here is functional only, so I will hopefully relay that functional approach here.

nproc = 3 defines the amount of cores that you want to use in parallel (from experience max cores - 1 is preferable).

the results function and pprocess.Map enables pprocess to take control of the parallel management. limit=nproc sets the max active process while reuse=1 allows the pprocess to recycle the threads as they finish.

[parallel_functions(args) for args in file_list runs through the list of files and drops them to the wshark function.

The last bit had me stumped for a bit, but here is what you need to know to get this working. If you are blowing through a list like I am, then parallel_results = results[0:len(file_list)] will do it for you. I am not quite sure how, but iterating through the entire results list does the job.

Finally, the last line will tell you how much time you saved with this great method!

No comments:

Post a Comment