Source code for eWRT.util.async

#!/usr/bin/env python

''' @package eWRT.util.async
    asynchronous procedure calls

    @warning
    this library is still a draft and might change considerable

'''


# (C)opyrights 2008-2010 by Albert Weichselbraun <albert@weichselbraun.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

__author__    = "Albert Weichselbraun"
__copyright__ = "GPL"

from eWRT.util.cache import DiskCache
from shutil import rmtree
from os.path import join, exists
from cPickle import load, UnpicklingError
import time
from subprocess import Popen
from glob import glob
import os
import gzip

try:
    import hashlib
    HASH = hashlib.sha1
except ImportError:
    import sha
    HASH = sha.sha


[docs]class Async(DiskCache): ''' Asynchronous Call Handling ''' def __init__(self, cache_dir, cache_nesting_level=0, cache_file_suffix="", max_processes=8, debug_dir=None): ''' initializes the Cache object @param[in] cache_dir the cache base directory @param[in] cache_nesting_level optional number of nesting level (0) @param[in] cache_file_suffix optional suffix for cache files @param[in] max_processes maximum number of parallel processes @param[in] optional debug directory where stdout and stderr of the processes gets saved ''' self.cache_dir = cache_dir self.cache_file_suffix = cache_file_suffix self.cache_nesting_level = cache_nesting_level self.max_processes = max_processes self.cur_processes = [] self.debug_dir = debug_dir
[docs] def getPostHashfile(self, cmd ): ''' returns an identifier representing the object which is compatible to the identifiers returned by the eWRT.util.cache.* classes. ''' args = ( tuple(cmd[1:]), ()) # required to produce the same hash as DiskCache's fetch method return self._get_fname( DiskCache.getObjectId( args ) )
[docs] def post(self, cmd): ''' checks whether the given command is already cached and calls the command otherwise. @param[in] cmdline command to call @returns the hash required to fetch this object ''' cache_file = self.getPostHashfile( cmd ) # print "I will return %s for %s." % (cache_file, " ".join(cmd) ) # try to fetch the object from the cache if exists(cache_file): try: load(open(cache_file)) return cache_file except (EOFError, UnpicklingError): pass self._execute(cmd) return cache_file
[docs] def has_processes_limit_reached(self): ''' closes finished processes and verifies whether we have already reached the maximum number of processes ''' # verify whether all registered processes are still running for pObj in self.cur_processes: if pObj.poll()!=None: self.cur_processes.remove( pObj ) pids = [ str(pObj.pid) for pObj in self.cur_processes ] return len(self.cur_processes) >= self.max_processes
def _execute(self, cmd): while self.has_processes_limit_reached(): time.sleep(5) if self.debug_dir: fname_base = join(self.debug_dir, str(time.time()) ) stdout = open(fname_base+".out", "w") stderr = open(fname_base+".err", "w") pObj = Popen( cmd, stdout=stdout, stderr=stderr ) os.rename( fname_base+".out", join(self.debug_dir, "debug_%d.out" % pObj.pid )) os.rename( fname_base+".err", join(self.debug_dir, "debug_%d.err" % pObj.pid )) else: pObj = Popen( cmd ) # set stdout and stderr to non blocking because otherwise # the process will block after a limit of 64k has been reached # (see http://bytes.com/topic/python/answers/741393-spawning-process-subprocess) # fcntl(stdout, F_SETFL, fcntl(stdout, F_GETFL) | os.O_NONBLOCK) # fcntl(stdin, F_SETFL, fcntl(stdin, F_GETFL) | os.O_NONBLOCK) self.cur_processes.append( pObj )
[docs] def fetch(self, cache_file): self.has_processes_limit_reached() while True: if exists(cache_file): try: return load( gzip.open(cache_file)) except (EOFError, UnpicklingError) as e: print "Error opening %s" % cache_file, e pass time.sleep(10)
[docs]class TestAsync(object): ''' unittests covering the class async ''' TEST_CACHE_DIR = "./.test-async"
[docs] def setUp(self): self._delCacheDir()
[docs] def tearDown(self): self._delCacheDir()
@staticmethod def _delCacheDir(): if exists( TestAsync.TEST_CACHE_DIR ): rmtree( TestAsync.TEST_CACHE_DIR )
[docs] def testMaxProcessLimit(self): ''' tests the max process limit ''' async = Async(self.TEST_CACHE_DIR, max_processes=1) for x in xrange(2): async.post( [ "/bin/sleep", str(x+1) ] ) assert async.has_processes_limit_reached() == True time.sleep(2) flag = async.has_processes_limit_reached() print flag, [ p.pid for p in async.cur_processes ] assert flag == False
[docs] def testDebugMode(self): ''' tests the debug mode ''' async = Async(self.TEST_CACHE_DIR, max_processes=1, debug_dir=self.TEST_CACHE_DIR) for x in xrange(2): async.post( ["/bin/echo", "hallo"] ) print glob( join(self.TEST_CACHE_DIR, "debug*") ) assert len( glob( join(self.TEST_CACHE_DIR, "debug*.out") ) ) == 2 assert len( glob( join(self.TEST_CACHE_DIR, "debug*.err") ) ) == 2
# $Id$