f31ee84bfdc6dc2325c8890412a965e509074d0a,streaming/python/runtime/task.py,InputStreamTask,run,#InputStreamTask#,132

Before Change


    def run(self):
        while self.running:
            item = self.reader.read(self.read_timeout_millis)
            if item is not None:
                msg_data = item.body()
                type_id = msg_data[:1]
                if (type_id == serialization._PYTHON_TYPE_ID):
                    msg = self.python_serializer.deserialize(msg_data[1:])
                else:
                    msg = self.cross_lang_serializer.deserialize(msg_data[1:])
                self.processor.process(msg)
        self.stopped = True

    def cancel_task(self):
        self.running = False

After Change


                if item is None:
                    continue

                if isinstance(item, DataMessage):
                    msg_data = item.body
                    type_id = msg_data[0]
                    if type_id == serialization.PYTHON_TYPE_ID:
                        msg = self.python_serializer.deserialize(msg_data[1:])
                    else:
                        msg = self.cross_lang_serializer.deserialize(
                            msg_data[1:])
                    self.processor.process(msg)
                elif isinstance(item, CheckpointBarrier):
                    logger.info("Got barrier:{}".format(item))
                    logger.info("Start to do checkpoint {}.".format(
                        item.checkpoint_id))

                    input_points = item.get_input_checkpoints()

                    self.do_checkpoint(item.checkpoint_id, input_points)
                    logger.info("Do checkpoint {} success.".format(
                        item.checkpoint_id))
                else:
                    raise RuntimeError(
                        "Unknown item type! item={}".format(item))

        except ChannelInterruptException:
            logger.info("queue has stopped.")
        except BaseException as e:
            logger.exception(
Italian Trulli
In pattern: SUPERPATTERN

Frequency: 4

Non-data size: 6

Instances


Project Name: ray-project/ray
Commit Name: f31ee84bfdc6dc2325c8890412a965e509074d0a
Time: 2020-09-05
Author: wlx65005@gmail.com
File Name: streaming/python/runtime/task.py
Class Name: InputStreamTask
Method Name: run


Project Name: ray-project/ray
Commit Name: 1b1466748f1db72835a594d73d502e9787e080a9
Time: 2020-09-04
Author: wlx65005@gmail.com
File Name: streaming/python/runtime/task.py
Class Name: InputStreamTask
Method Name: run


Project Name: microsoft/nni
Commit Name: 41e587038a32db002178329a64224ea6a5b2a5f9
Time: 2019-09-15
Author: 40699903+liuzhe-lz@users.noreply.github.com
File Name: src/sdk/pynni/nni/msg_dispatcher.py
Class Name: MsgDispatcher
Method Name: _handle_final_metric_data


Project Name: dmlc/gluon-cv
Commit Name: 5ed0092d12be4db76f57a158981352aa875f8d00
Time: 2019-08-06
Author: yizhu59@gmail.com
File Name: scripts/action-recognition/test_recognizer.py
Class Name:
Method Name: main