f31ee84bfdc6dc2325c8890412a965e509074d0a,streaming/python/runtime/task.py,InputStreamTask,run,#InputStreamTask#,132
Before Change
def run(self):
while self.running:
item = self.reader.read(self.read_timeout_millis)
if item is not None:
msg_data = item.body()
type_id = msg_data[:1]
if (type_id == serialization._PYTHON_TYPE_ID):
msg = self.python_serializer.deserialize(msg_data[1:])
else:
msg = self.cross_lang_serializer.deserialize(msg_data[1:])
self.processor.process(msg)
self.stopped = True
def cancel_task(self):
self.running = False
After Change
if item is None:
continue
if isinstance(item, DataMessage):
msg_data = item.body
type_id = msg_data[0]
if type_id == serialization.PYTHON_TYPE_ID:
msg = self.python_serializer.deserialize(msg_data[1:])
else:
msg = self.cross_lang_serializer.deserialize(
msg_data[1:])
self.processor.process(msg)
elif isinstance(item, CheckpointBarrier):
logger.info("Got barrier:{}".format(item))
logger.info("Start to do checkpoint {}.".format(
item.checkpoint_id))
input_points = item.get_input_checkpoints()
self.do_checkpoint(item.checkpoint_id, input_points)
logger.info("Do checkpoint {} success.".format(
item.checkpoint_id))
else:
raise RuntimeError(
"Unknown item type! item={}".format(item))
except ChannelInterruptException:
logger.info("queue has stopped.")
except BaseException as e:
logger.exception(
In pattern: SUPERPATTERN
Frequency: 4
Non-data size: 6
Instances
Project Name: ray-project/ray
Commit Name: f31ee84bfdc6dc2325c8890412a965e509074d0a
Time: 2020-09-05
Author: wlx65005@gmail.com
File Name: streaming/python/runtime/task.py
Class Name: InputStreamTask
Method Name: run
Project Name: ray-project/ray
Commit Name: 1b1466748f1db72835a594d73d502e9787e080a9
Time: 2020-09-04
Author: wlx65005@gmail.com
File Name: streaming/python/runtime/task.py
Class Name: InputStreamTask
Method Name: run
Project Name: microsoft/nni
Commit Name: 41e587038a32db002178329a64224ea6a5b2a5f9
Time: 2019-09-15
Author: 40699903+liuzhe-lz@users.noreply.github.com
File Name: src/sdk/pynni/nni/msg_dispatcher.py
Class Name: MsgDispatcher
Method Name: _handle_final_metric_data
Project Name: dmlc/gluon-cv
Commit Name: 5ed0092d12be4db76f57a158981352aa875f8d00
Time: 2019-08-06
Author: yizhu59@gmail.com
File Name: scripts/action-recognition/test_recognizer.py
Class Name:
Method Name: main