383ea0326ae103b5d5e0b62ed9c3cb18510c5b9e,distributed/comm/tcp.py,TCP,write,#TCP#Any#Any#Any#,229

Before Change


        try:
            nframes = len(frames)
            lengths = [nbytes(frame) for frame in frames]
            length_bytes = struct.pack(f"Q{nframes}Q", nframes, *lengths)

            frames = [length_bytes, *frames]
            lengths = [len(length_bytes), *lengths]

            if sum(lengths) < 2 ** 17:  // 128kiB
                // small enough, send in one go
                stream.write(b"".join(frames))
            else:
                // avoid large memcpy, send in many
                for frame, frame_bytes in zip(frames, lengths):
                    // Can"t wait for the write() Future as it may be lost
                    // ("If write is called again before that Future has resolved,
                    //   the previous future will be orphaned and will never resolve")
                    if frame_bytes:
                        future = stream.write(frame)
                        bytes_since_last_yield += frame_bytes
                        if bytes_since_last_yield > 32e6:
                            await future
                            bytes_since_last_yield = 0
        except StreamClosedError as e:
            self.stream = None
            self._closed = True
            if not shutting_down():
                convert_stream_closed_error(self, e)
        except Exception:
            // Some OSError or a another "low-level" exception. We do not really know what
            // was already written to the underlying socket, so it is not even safe to retry
            // here using the same stream. The only safe thing to do is to abort.
            // (See also GitHub /񳰕).
            if stream._write_buffer is None:
                logger.info("tried to write message %s on closed stream", msg)
            self.abort()
            raise

        return sum(lengths)

    @gen.coroutine
    def close(self):
        // We use gen.coroutine here rather than async def to avoid errors like

After Change


                **self.handshake_options,
            },
        )
        frames_nbytes = sum(map(nbytes, frames))

        header = pack_frames_prelude(frames)
        header = struct.pack("Q", nbytes(header) + frames_nbytes) + header

        frames = [header, *frames]
        frames_nbytes += nbytes(header)

        if frames_nbytes < 2 ** 17:  // 128kiB
            // small enough, send in one go
            frames = [b"".join(frames)]

        try:
            // trick to enque all frames for writing beforehand
            for each_frame in frames:
                each_frame_nbytes = nbytes(each_frame)
                if each_frame_nbytes:
                    stream._write_buffer.append(each_frame)
                    stream._total_write_index += each_frame_nbytes

            // start writing frames
            stream.write(b"")
        except StreamClosedError as e:
            self.stream = None
            self._closed = True
            if not shutting_down():
                convert_stream_closed_error(self, e)
        except Exception:
            // Some OSError or a another "low-level" exception. We do not really know what
            // was already written to the underlying socket, so it is not even safe to retry
            // here using the same stream. The only safe thing to do is to abort.
            // (See also GitHub /񳰕).
            if stream._write_buffer is None:
                logger.info("tried to write message %s on closed stream", msg)
            self.abort()
            raise

        return frames_nbytes

    @gen.coroutine
    def close(self):
        // We use gen.coroutine here rather than async def to avoid errors like
Italian Trulli
In pattern: SUPERPATTERN

Frequency: 3

Non-data size: 6

Instances


Project Name: dask/distributed
Commit Name: 383ea0326ae103b5d5e0b62ed9c3cb18510c5b9e
Time: 2021-02-17
Author: jakirkham@gmail.com
File Name: distributed/comm/tcp.py
Class Name: TCP
Method Name: write


Project Name: tensorflow/transform
Commit Name: ec9dcf7491c38f3531bfa718b496861ab19bb048
Time: 2019-08-28
Author: zoy@google.com
File Name: tensorflow_transform/analyzer_nodes.py
Class Name: _VocabularyAccumulatorCoder
Method Name: encode_cache


Project Name: dask/distributed
Commit Name: 85c35064ec1786055cbc741a126fa581493acbe1
Time: 2020-07-20
Author: jakirkham@gmail.com
File Name: distributed/protocol/utils.py
Class Name:
Method Name: pack_frames_prelude