if reply is None:
iflen(wr) != 0:
env.logger.error(f"WORKER terminates with pending tasks. sos might not be termianting properly.")
env.logger.trace(f"WORKER {self.name} ({os.getpid()}) quits after receiving None.")breakif not reply: // if an empty job is returned
time.sleep(0.1)
continue//// if a real job is returned, run it. _process_job will either return True// or a runner in case it is interrupted.
env.logger.trace(
f"WORKER {self.name} ({os.getpid()}, {self.num_pending()} pending) receives {self._type_of_work(reply)} request {self._name_of_work(reply)} with master port {self._master_ports[new_idx]}")
if"task" in reply:
self.run_substep(reply)
env.logger.trace(
f"WORKER {self.name} ({os.getpid()}) completes substep {self._name_of_work(reply)}")
self._runners[new_idx] = True
continue
master_port = reply["config"]["sockets"]["master_port"]
After Change
if reply is None:
iflen(wr) != 0:
env.logger.error(f"WORKER terminates with pending tasks. sos might not be termianting properly.")
env.log_to_file("WORKER", f"WORKER {self.name} ({os.getpid()}) quits after receiving None.")breakif not reply: // if an empty job is returned
time.sleep(0.1)
continue//// if a real job is returned, run it. _process_job will either return True// or a runner in case it is interrupted.
env.log_to_file("WORKER",
f"WORKER {self.name} ({os.getpid()}, {self.num_pending()} pending) receives {self._type_of_work(reply)} request {self._name_of_work(reply)} with master port {self._master_ports[new_idx]}")
if"task" in reply:
self.run_substep(reply)
env.log_to_file("WORKER",
f"WORKER {self.name} ({os.getpid()}) completes substep {self._name_of_work(reply)}")
self._runners[new_idx] = True
continue
master_port = reply["config"]["sockets"]["master_port"]