python - how to avoid IOError while looping through a long list that opens many files in threads? -
python - how to avoid IOError while looping through a long list that opens many files in threads? -
i'm downloading access logs amazon s3. these lot of little files. cut down time of download, i've decided read each file in thread.
this main method first connects s3, iterates on each document, , reads each documents' content within separate thread.
def download_logs(self):     """     downloads logs s3 using boto.     """     if self.aws_keys:         conn = s3connection(*self.aws_keys)     else:         conn = s3connection()      files = []     mybucket = conn.get_bucket(self.input_bucket)     tempdir.tempdir() directory:         item in mybucket.list(prefix=self.input_prefix):             local_file = os.path.join(directory, item.key.split("/")[-1])             logger.debug("downloading %s %s" % (item.key, local_file))             thread = threading.thread(target=item.get_contents_to_filename, args=(local_file,))             thread.start()             files.append((thread,local_file))          elms = range(len(files))         elemslen = len(elms)         while elemslen:             curr = random.choice(elms)             thread, file  = files[curr]             if not thread.is_alive():                 yield file                 elms.remove(curr)                 elemslen -= 1    as can see generator yields. generator processed reading each file's content concatenate them
        logs = self.download_logs()         downloaded in logs:             self.concat_files(templog, downloaded)    the above code fails next warning raised in threads:
[2014-10-20 15:15:21,427: warning/worker-2] exception in thread thread-710: traceback (most recent  phone call last):   file "/usr/local/cellar/python/2.7.6/frameworks/python.framework/versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner     self.run()   file "/usr/local/cellar/python/2.7.6/frameworks/python.framework/versions/2.7/lib/python2.7/threading.py", line 763, in run     self.__target(*self.__args, **self.__kwargs)   file "/users/viktornagy/.virtualenvs/vidzor/lib/python2.7/site-packages/boto/s3/key.py", line 1561, in get_contents_to_filename     fp = open(filename, 'wb') ioerror: [errno 24] many open files: u'/var/folders/7h/9tt8cknn1qx40bs_s467hc3r0000gn/t/tmpzs9fdn/access_log-2014-10-20-11-36-20-9d6f43b122c83bd6'    of course, raise number of open files, rather limit number of threads meaningful.
now question how accomplish that? have loop generates list of threads. 1 time loop finished, digest list, , check closed threads can yield.
if limit number of threads in first loop, i'll never have list ready start digestion.
you can  utilize multiprocessing.dummy create pool of threading.thread objects, , distribute work threads in pool:
from multiprocessing.dummy import pool  def download_logs(self):     """     downloads logs s3 using boto.     """     if self.aws_keys:         conn = s3connection(*self.aws_keys)     else:         conn = s3connection()      files = []     mybucket = conn.get_bucket(self.input_bucket)     pool = pool(20) # 20 threads in pool. tweak see fit.     tempdir.tempdir() directory:         results = pool.imap_unordered(item.get_contents_to_filename,                                       [os.path.join(directory, item.key.split("/")[-1]                                           item in mybucket.list(prefix=self.input_prefix)]          result in results:             yield result    i'm using imap_unordered can start yielding results arrive, rather needing wait tasks complete.
 python multithreading python-2.7 boto 
 
  
Comments
Post a Comment