try:
nframes = len(frames)
lengths = [nbytes(frame) for frame in frames]
length_bytes = struct.pack(f"Q{nframes}Q", nframes, *lengths)
frames = [length_bytes, *frames]
lengths = [len(length_bytes), *lengths]
if sum(lengths) < 2 ** 17: // 128kiB
// small enough, send in one go
stream.write(b"".join(frames))
else:
// avoid large memcpy, send in many
for frame, frame_bytes in zip(frames, lengths):
// Can"t wait for the write() Future as it may be lost
// ("If write is called again before that Future has resolved,
// the previous future will be orphaned and will never resolve")
if frame_bytes:
future = stream.write(frame)
bytes_since_last_yield += frame_bytes
if bytes_since_last_yield > 32e6:
await future
bytes_since_last_yield = 0
except StreamClosedError as e:
self.stream = None
self._closed = True
if not shutting_down():
convert_stream_closed_error(self, e)
except Exception:
// Some OSError or a another "low-level" exception. We do not really know what
// was already written to the underlying socket, so it is not even safe to retry
// here using the same stream. The only safe thing to do is to abort.
// (See also GitHub /).
if stream._write_buffer is None:
logger.info("tried to write message %s on closed stream", msg)
self.abort()
raise
return sum(lengths)
@gen.coroutine
def close(self):
// We use gen.coroutine here rather than async def to avoid errors like
After Change
**self.handshake_options,
},
)
frames_nbytes = sum(map(nbytes, frames))header = pack_frames_prelude(frames)
header = struct.pack("Q", nbytes(header) + frames_nbytes) + header
frames = [header, *frames]
frames_nbytes += nbytes(header)
if frames_nbytes < 2 ** 17: // 128kiB
// small enough, send in one go
frames = [b"".join(frames)]
try:
// trick to enque all frames for writing beforehand
for each_frame in frames:
each_frame_nbytes = nbytes(each_frame)
if each_frame_nbytes:
stream._write_buffer.append(each_frame)
stream._total_write_index += each_frame_nbytes
// start writing frames
stream.write(b"")
except StreamClosedError as e:
self.stream = None
self._closed = True
if not shutting_down():
convert_stream_closed_error(self, e)
except Exception:
// Some OSError or a another "low-level" exception. We do not really know what
// was already written to the underlying socket, so it is not even safe to retry
// here using the same stream. The only safe thing to do is to abort.
// (See also GitHub /).
if stream._write_buffer is None:
logger.info("tried to write message %s on closed stream", msg)
self.abort()
raise
return frames_nbytes
@gen.coroutine
def close(self):
// We use gen.coroutine here rather than async def to avoid errors like