python - how to avoid IOError while looping through a long list that opens many files in threads? -
python - how to avoid IOError while looping through a long list that opens many files in threads? -
i'm downloading access logs amazon s3. these lot of little files. cut down time of download, i've decided read each file in thread.
this main method first connects s3, iterates on each document, , reads each documents' content within separate thread.
def download_logs(self): """ downloads logs s3 using boto. """ if self.aws_keys: conn = s3connection(*self.aws_keys) else: conn = s3connection() files = [] mybucket = conn.get_bucket(self.input_bucket) tempdir.tempdir() directory: item in mybucket.list(prefix=self.input_prefix): local_file = os.path.join(directory, item.key.split("/")[-1]) logger.debug("downloading %s %s" % (item.key, local_file)) thread = threading.thread(target=item.get_contents_to_filename, args=(local_file,)) thread.start() files.append((thread,local_file)) elms = range(len(files)) elemslen = len(elms) while elemslen: curr = random.choice(elms) thread, file = files[curr] if not thread.is_alive(): yield file elms.remove(curr) elemslen -= 1
as can see generator yields. generator processed reading each file's content concatenate them
logs = self.download_logs() downloaded in logs: self.concat_files(templog, downloaded)
the above code fails next warning raised in threads:
[2014-10-20 15:15:21,427: warning/worker-2] exception in thread thread-710: traceback (most recent phone call last): file "/usr/local/cellar/python/2.7.6/frameworks/python.framework/versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner self.run() file "/usr/local/cellar/python/2.7.6/frameworks/python.framework/versions/2.7/lib/python2.7/threading.py", line 763, in run self.__target(*self.__args, **self.__kwargs) file "/users/viktornagy/.virtualenvs/vidzor/lib/python2.7/site-packages/boto/s3/key.py", line 1561, in get_contents_to_filename fp = open(filename, 'wb') ioerror: [errno 24] many open files: u'/var/folders/7h/9tt8cknn1qx40bs_s467hc3r0000gn/t/tmpzs9fdn/access_log-2014-10-20-11-36-20-9d6f43b122c83bd6'
of course, raise number of open files, rather limit number of threads meaningful.
now question how accomplish that? have loop generates list of threads. 1 time loop finished, digest list, , check closed threads can yield.
if limit number of threads in first loop, i'll never have list ready start digestion.
you can utilize multiprocessing.dummy
create pool of threading.thread
objects, , distribute work threads in pool
:
from multiprocessing.dummy import pool def download_logs(self): """ downloads logs s3 using boto. """ if self.aws_keys: conn = s3connection(*self.aws_keys) else: conn = s3connection() files = [] mybucket = conn.get_bucket(self.input_bucket) pool = pool(20) # 20 threads in pool. tweak see fit. tempdir.tempdir() directory: results = pool.imap_unordered(item.get_contents_to_filename, [os.path.join(directory, item.key.split("/")[-1] item in mybucket.list(prefix=self.input_prefix)] result in results: yield result
i'm using imap_unordered
can start yielding results arrive, rather needing wait tasks complete.
python multithreading python-2.7 boto
Comments
Post a Comment