python - how to avoid IOError while looping through a long list that opens many files in threads? -



python - how to avoid IOError while looping through a long list that opens many files in threads? -

i'm downloading access logs amazon s3. these lot of little files. cut down time of download, i've decided read each file in thread.

this main method first connects s3, iterates on each document, , reads each documents' content within separate thread.

def download_logs(self): """ downloads logs s3 using boto. """ if self.aws_keys: conn = s3connection(*self.aws_keys) else: conn = s3connection() files = [] mybucket = conn.get_bucket(self.input_bucket) tempdir.tempdir() directory: item in mybucket.list(prefix=self.input_prefix): local_file = os.path.join(directory, item.key.split("/")[-1]) logger.debug("downloading %s %s" % (item.key, local_file)) thread = threading.thread(target=item.get_contents_to_filename, args=(local_file,)) thread.start() files.append((thread,local_file)) elms = range(len(files)) elemslen = len(elms) while elemslen: curr = random.choice(elms) thread, file = files[curr] if not thread.is_alive(): yield file elms.remove(curr) elemslen -= 1

as can see generator yields. generator processed reading each file's content concatenate them

logs = self.download_logs() downloaded in logs: self.concat_files(templog, downloaded)

the above code fails next warning raised in threads:

[2014-10-20 15:15:21,427: warning/worker-2] exception in thread thread-710: traceback (most recent phone call last): file "/usr/local/cellar/python/2.7.6/frameworks/python.framework/versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner self.run() file "/usr/local/cellar/python/2.7.6/frameworks/python.framework/versions/2.7/lib/python2.7/threading.py", line 763, in run self.__target(*self.__args, **self.__kwargs) file "/users/viktornagy/.virtualenvs/vidzor/lib/python2.7/site-packages/boto/s3/key.py", line 1561, in get_contents_to_filename fp = open(filename, 'wb') ioerror: [errno 24] many open files: u'/var/folders/7h/9tt8cknn1qx40bs_s467hc3r0000gn/t/tmpzs9fdn/access_log-2014-10-20-11-36-20-9d6f43b122c83bd6'

of course, raise number of open files, rather limit number of threads meaningful.

now question how accomplish that? have loop generates list of threads. 1 time loop finished, digest list, , check closed threads can yield.

if limit number of threads in first loop, i'll never have list ready start digestion.

you can utilize multiprocessing.dummy create pool of threading.thread objects, , distribute work threads in pool:

from multiprocessing.dummy import pool def download_logs(self): """ downloads logs s3 using boto. """ if self.aws_keys: conn = s3connection(*self.aws_keys) else: conn = s3connection() files = [] mybucket = conn.get_bucket(self.input_bucket) pool = pool(20) # 20 threads in pool. tweak see fit. tempdir.tempdir() directory: results = pool.imap_unordered(item.get_contents_to_filename, [os.path.join(directory, item.key.split("/")[-1] item in mybucket.list(prefix=self.input_prefix)] result in results: yield result

i'm using imap_unordered can start yielding results arrive, rather needing wait tasks complete.

python multithreading python-2.7 boto

Comments

Popular posts from this blog

formatting - SAS SQL Datepart function returning odd values -

c++ - Apple Mach-O Linker Error(Duplicate Symbols For Architecture armv7) -

php - Yii 2: Unable to find a class into the extension 'yii2-admin' -