Python multiprocessing - watchdog process? -



Python multiprocessing - watchdog process? -

i have set of long-running process in typical "pub/sub" setup queues communication.

i 2 things, , can't figure out how accomplish both simultaneously:

addition/removal of workers. example, want able add together consumers if see pending queue size has grown large. watchdog processes - want notified if of producers or consumers crashes.

i can (2) in isolation:

try: while true: process in workers + consumers: if not process.is_alive(): logger.critical("%-8s%s died!", process.pid, process.name) sleep(3) except keyboardinterrupt: # python propagates ctrl+c workers, no need terminate them logger.warn('received ctr+c, shutting down')

the above blocks, prevents me doing (1).

so decided move code own process.

this doesn't work, because process.is_alive() works parent checking status of children. in case, processes want check siblings instead of children.

i'm bit stumped on how proceed. how can main process back upwards changes subprocesses while monitoring subprocesses?

multiprocessing.pool has watchdog built-in already. runs thread checks every 0.1 seconds see if worker has died. if has, starts new 1 take place:

def _handle_workers(pool): thread = threading.current_thread() # maintain maintaining workers until cache gets drained, unless pool # terminated. while thread._state == run or (pool._cache , thread._state != terminate): pool._maintain_pool() time.sleep(0.1) # send sentinel stop workers pool._taskqueue.put(none) debug('worker handler exiting') def _maintain_pool(self): """clean exited workers , start replacements them. """ if self._join_exited_workers(): self._repopulate_pool()

this used implement maxtasksperchild keyword argument, , problematic in cases. if process dies while map or apply command running, , process in middle of handling task associated call, never finish. see this question more info behavior.

that said, if want know process has died, can create thread (not process) monitors pids of processes in pool, , if pids in list ever change, know process has crashed:

def monitor_pids(pool): pids = [p.pid p in pool._pool] while true: new_pids = [p.pid p in pool._pool] if new_pids != pids: print("a worker died") pids = new_pids time.sleep(3)

edit:

if you're rolling own pool implementation, can take cue multiprocessing.pool, , run monitoring code in background thread in parent process. checks see if processes still running quick, time lost background thread taking gil should negligible. consider multiprocessing.process watchdog running every 0.1 seconds! running yours every 3 seconds shouldn't cause problems.

python multiprocessing

Comments

Popular posts from this blog

formatting - SAS SQL Datepart function returning odd values -

c++ - Apple Mach-O Linker Error(Duplicate Symbols For Architecture armv7) -

php - Yii 2: Unable to find a class into the extension 'yii2-admin' -