// Create a Queue to feed documents to parsers
manager = Manager()
in_queue = manager.Queue()// Use an output queue to track multiprocess progress
out_queue = JoinableQueue()
total_count = len(xs)
// Start UDF Processesfor i in range(parallelism):
udf = self.udf_class(
in_queue=in_queue,
out_queue=out_queue,
worker_id=i,
**self.udf_init_kwargs
)
udf.apply_kwargs = kwargs
self.udfs.append(udf)
// Start the UDF processes, and then join on their completionfor udf in self.udfs:
udf.start()
// Fill input queue with documents
pool = Pool(parallelism)
in_tuples = ((in_queue, x) for x in xs)
pool.map_async(func=async_fill_input_queue, iterable=in_tuples)
count_parsed = 0while count_parsed < total_count: