1 /** @file
2  *
3  *  A brief file description
4  *
5  *  @section license License
6  *
7  *  Licensed to the Apache Software Foundation (ASF) under one
8  *  or more contributor license agreements.  See the NOTICE file
9  *  distributed with this work for additional information
10  *  regarding copyright ownership.  The ASF licenses this file
11  *  to you under the Apache License, Version 2.0 (the
12  *  "License"); you may not use this file except in compliance
13  *  with the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  */
23 
24 #include "QUICUnidirectionalStream.h"
25 
26 //
27 // QUICSendStream
28 //
QUICSendStream(QUICConnectionInfoProvider * cinfo,QUICStreamId sid,uint64_t send_max_stream_data)29 QUICSendStream::QUICSendStream(QUICConnectionInfoProvider *cinfo, QUICStreamId sid, uint64_t send_max_stream_data)
30   : QUICStreamVConnection(cinfo, sid), _remote_flow_controller(send_max_stream_data, _id), _state(nullptr, &this->_progress_vio)
31 {
32   SET_HANDLER(&QUICSendStream::state_stream_open);
33 
34   QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller.current_offset(),
35                     this->_remote_flow_controller.current_limit());
36 }
37 
38 int
state_stream_open(int event,void * data)39 QUICSendStream::state_stream_open(int event, void *data)
40 {
41   QUICVStreamDebug("%s (%d)", get_vc_event_name(event), event);
42   QUICErrorUPtr error = nullptr;
43 
44   switch (event) {
45   case VC_EVENT_READ_READY:
46   case VC_EVENT_READ_COMPLETE: {
47     // should not schedule read event.
48     ink_assert(0);
49     break;
50   }
51   case VC_EVENT_WRITE_READY:
52   case VC_EVENT_WRITE_COMPLETE: {
53     int64_t len = this->_process_write_vio();
54     if (len > 0) {
55       this->_signal_write_event();
56     }
57 
58     break;
59   }
60   case VC_EVENT_EOS:
61   case VC_EVENT_ERROR:
62   case VC_EVENT_INACTIVITY_TIMEOUT:
63   case VC_EVENT_ACTIVE_TIMEOUT: {
64     // TODO
65     ink_assert(false);
66     break;
67   }
68   default:
69     QUICStreamDebug("unknown event");
70     ink_assert(false);
71   }
72 
73   // FIXME error is always nullptr
74   if (error != nullptr) {
75     if (error->cls == QUICErrorClass::TRANSPORT) {
76       QUICStreamDebug("QUICError: %s (%u), %s (0x%x)", QUICDebugNames::error_class(error->cls),
77                       static_cast<unsigned int>(error->cls), QUICDebugNames::error_code(error->code),
78                       static_cast<unsigned int>(error->code));
79     } else {
80       QUICStreamDebug("QUICError: %s (%u), APPLICATION ERROR (0x%x)", QUICDebugNames::error_class(error->cls),
81                       static_cast<unsigned int>(error->cls), static_cast<unsigned int>(error->code));
82     }
83     if (dynamic_cast<QUICStreamError *>(error.get()) != nullptr) {
84       // Stream Error
85       QUICStreamErrorUPtr serror = QUICStreamErrorUPtr(static_cast<QUICStreamError *>(error.get()));
86       this->reset(std::move(serror));
87     } else {
88       // Connection Error
89       // TODO Close connection (Does this really happen?)
90     }
91   }
92 
93   return EVENT_DONE;
94 }
95 
96 int
state_stream_closed(int event,void * data)97 QUICSendStream::state_stream_closed(int event, void *data)
98 {
99   QUICVStreamDebug("%s (%d)", get_vc_event_name(event), event);
100 
101   switch (event) {
102   case VC_EVENT_READ_READY:
103   case VC_EVENT_READ_COMPLETE: {
104     // ignore
105     break;
106   }
107   case VC_EVENT_WRITE_READY:
108   case VC_EVENT_WRITE_COMPLETE: {
109     // ignore
110     break;
111   }
112   case VC_EVENT_EOS:
113   case VC_EVENT_ERROR:
114   case VC_EVENT_INACTIVITY_TIMEOUT:
115   case VC_EVENT_ACTIVE_TIMEOUT: {
116     // TODO
117     ink_assert(false);
118     break;
119   }
120   default:
121     ink_assert(false);
122   }
123 
124   return EVENT_DONE;
125 }
126 
127 bool
will_generate_frame(QUICEncryptionLevel level,size_t current_packet_size,bool ack_eliciting,uint32_t seq_num)128 QUICSendStream::will_generate_frame(QUICEncryptionLevel level, size_t current_packet_size, bool ack_eliciting, uint32_t seq_num)
129 {
130   return !this->is_retransmited_frame_queue_empty() || this->_write_vio.get_reader()->is_read_avail_more_than(0);
131 }
132 
133 QUICFrame *
generate_frame(uint8_t * buf,QUICEncryptionLevel level,uint64_t connection_credit,uint16_t maximum_frame_size,size_t current_packet_size,uint32_t seq_num)134 QUICSendStream::generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint64_t connection_credit, uint16_t maximum_frame_size,
135                                size_t current_packet_size, uint32_t seq_num)
136 {
137   SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
138 
139   QUICFrame *frame = this->create_retransmitted_frame(buf, level, maximum_frame_size, this->_issue_frame_id(), this);
140   if (frame != nullptr) {
141     ink_assert(frame->type() == QUICFrameType::STREAM);
142     this->_records_stream_frame(level, *static_cast<QUICStreamFrame *>(frame));
143     return frame;
144   }
145 
146   // RESET_STREAM
147   if (this->_reset_reason && !this->_is_reset_sent) {
148     frame = QUICFrameFactory::create_rst_stream_frame(buf, *this->_reset_reason, this->_issue_frame_id(), this);
149     this->_records_rst_stream_frame(level, *static_cast<QUICRstStreamFrame *>(frame));
150     this->_state.update_with_sending_frame(*frame);
151     this->_is_reset_sent = true;
152     return frame;
153   }
154 
155   if (!this->_state.is_allowed_to_send(QUICFrameType::STREAM)) {
156     return frame;
157   }
158 
159   uint64_t maximum_data_size = 0;
160   if (maximum_frame_size <= MAX_STREAM_FRAME_OVERHEAD) {
161     return frame;
162   }
163   maximum_data_size = maximum_frame_size - MAX_STREAM_FRAME_OVERHEAD;
164 
165   bool pure_fin = false;
166   bool fin      = false;
167   if ((this->_write_vio.nbytes != 0 || this->_write_vio.nbytes != INT64_MAX) &&
168       this->_write_vio.nbytes == static_cast<int64_t>(this->_send_offset)) {
169     // Pure FIN stream should be sent regardless status of remote flow controller, because the length is zero.
170     pure_fin = true;
171     fin      = true;
172   }
173 
174   uint64_t len           = 0;
175   IOBufferReader *reader = this->_write_vio.get_reader();
176   if (!pure_fin) {
177     uint64_t data_len = reader->block_read_avail();
178     if (data_len == 0) {
179       return frame;
180     }
181 
182     // Check Connection/Stream level credit only if the generating STREAM frame is not pure fin
183     uint64_t stream_credit = this->_remote_flow_controller.credit();
184     if (stream_credit == 0) {
185       // STREAM_DATA_BLOCKED
186       frame =
187         this->_remote_flow_controller.generate_frame(buf, level, UINT16_MAX, maximum_frame_size, current_packet_size, seq_num);
188       return frame;
189     }
190 
191     if (connection_credit == 0) {
192       // BLOCKED - BLOCKED frame will be sent by connection level remote flow controller
193       return frame;
194     }
195 
196     len = std::min(data_len, std::min(maximum_data_size, std::min(stream_credit, connection_credit)));
197 
198     // data_len, maximum_data_size, stream_credit and connection_credit are already checked they're larger than 0
199     ink_assert(len != 0);
200 
201     if (this->_write_vio.nbytes == static_cast<int64_t>(this->_send_offset + len)) {
202       fin = true;
203     }
204   }
205 
206   Ptr<IOBufferBlock> block = make_ptr<IOBufferBlock>(reader->get_current_block()->clone());
207   block->consume(reader->start_offset);
208   block->_end = std::min(block->start() + len, block->_buf_end);
209   ink_assert(static_cast<uint64_t>(block->read_avail()) == len);
210 
211   // STREAM - Pure FIN or data length is lager than 0
212   // FIXME has_length_flag and has_offset_flag should be configurable
213   frame = QUICFrameFactory::create_stream_frame(buf, block, this->_id, this->_send_offset, fin, true, true, this->_issue_frame_id(),
214                                                 this);
215   if (!this->_state.is_allowed_to_send(*frame)) {
216     QUICStreamDebug("Canceled sending %s frame due to the stream state", QUICDebugNames::frame_type(frame->type()));
217     return frame;
218   }
219 
220   if (!pure_fin) {
221     int ret = this->_remote_flow_controller.update(this->_send_offset + len);
222     // We cannot cancel sending the frame after updating the flow controller
223 
224     // Calling update always success, because len is always less than stream_credit
225     ink_assert(ret == 0);
226 
227     QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller.current_offset(),
228                       this->_remote_flow_controller.current_limit());
229     if (this->_remote_flow_controller.current_offset() == this->_remote_flow_controller.current_limit()) {
230       QUICStreamDebug("Flow Controller will block sending a STREAM frame");
231     }
232 
233     reader->consume(len);
234     this->_send_offset += len;
235     this->_write_vio.ndone += len;
236   }
237   this->_records_stream_frame(level, *static_cast<QUICStreamFrame *>(frame));
238 
239   this->_signal_write_event();
240   this->_state.update_with_sending_frame(*frame);
241 
242   return frame;
243 }
244 
245 QUICConnectionErrorUPtr
recv(const QUICStopSendingFrame & frame)246 QUICSendStream::recv(const QUICStopSendingFrame &frame)
247 {
248   this->_state.update_with_receiving_frame(frame);
249   this->reset(QUICStreamErrorUPtr(new QUICStreamError(this, QUIC_APP_ERROR_CODE_STOPPING)));
250   // We received and processed STOP_SENDING frame, so return NO_ERROR here
251   return nullptr;
252 }
253 
254 QUICConnectionErrorUPtr
recv(const QUICMaxStreamDataFrame & frame)255 QUICSendStream::recv(const QUICMaxStreamDataFrame &frame)
256 {
257   this->_remote_flow_controller.forward_limit(frame.maximum_stream_data());
258   QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller.current_offset(),
259                     this->_remote_flow_controller.current_limit());
260 
261   int64_t len = this->_process_write_vio();
262   if (len > 0) {
263     this->_signal_write_event();
264   }
265 
266   return nullptr;
267 }
268 
269 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)270 QUICSendStream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
271 {
272   QUICStreamDebug("Warning wants to read from send only stream ignore");
273   // FIXME: should not assert here
274   ink_assert(!"read from send only stream");
275   return nullptr;
276 }
277 
278 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool owner)279 QUICSendStream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
280 {
281   if (buf) {
282     this->_write_vio.buffer.reader_for(buf);
283   } else {
284     this->_write_vio.buffer.clear();
285   }
286 
287   this->_write_vio.mutex     = c ? c->mutex : this->mutex;
288   this->_write_vio.cont      = c;
289   this->_write_vio.nbytes    = nbytes;
290   this->_write_vio.ndone     = 0;
291   this->_write_vio.vc_server = this;
292   this->_write_vio.op        = VIO::WRITE;
293 
294   this->_process_write_vio();
295   this->_send_tracked_event(this->_write_event, VC_EVENT_WRITE_READY, &this->_write_vio);
296 
297   return &this->_write_vio;
298 }
299 
300 void
do_io_close(int lerrno)301 QUICSendStream::do_io_close(int lerrno)
302 {
303   SET_HANDLER(&QUICSendStream::state_stream_closed);
304 
305   ink_assert(this->_read_vio.nbytes == 0);
306   ink_assert(this->_read_vio.op == VIO::NONE);
307   ink_assert(this->_read_vio.cont == nullptr);
308   this->_read_vio.buffer.clear();
309 
310   this->_write_vio.buffer.clear();
311   this->_write_vio.nbytes = 0;
312   this->_write_vio.op     = VIO::NONE;
313   this->_write_vio.cont   = nullptr;
314 }
315 
316 void
do_io_shutdown(ShutdownHowTo_t howto)317 QUICSendStream::do_io_shutdown(ShutdownHowTo_t howto)
318 {
319   switch (howto) {
320   case IO_SHUTDOWN_READ:
321     // ignore
322     break;
323   case IO_SHUTDOWN_WRITE:
324   case IO_SHUTDOWN_READWRITE:
325     this->do_io_close();
326     break;
327   default:
328     ink_assert(0);
329     break;
330   }
331 }
332 
333 void
reenable(VIO * vio)334 QUICSendStream::reenable(VIO *vio)
335 {
336   ink_assert(vio == &this->_write_vio);
337   ink_assert(vio->op == VIO::WRITE);
338 
339   int64_t len = this->_process_write_vio();
340   if (len > 0) {
341     this->_signal_write_event();
342   }
343 }
344 
345 void
reset(QUICStreamErrorUPtr error)346 QUICSendStream::reset(QUICStreamErrorUPtr error)
347 {
348   this->_reset_reason = std::move(error);
349 }
350 
351 void
_on_frame_acked(QUICFrameInformationUPtr & info)352 QUICSendStream::_on_frame_acked(QUICFrameInformationUPtr &info)
353 {
354   StreamFrameInfo *frame_info = nullptr;
355   switch (info->type) {
356   case QUICFrameType::RESET_STREAM:
357     this->_is_reset_complete = true;
358     break;
359   case QUICFrameType::STREAM:
360     frame_info        = reinterpret_cast<StreamFrameInfo *>(info->data);
361     frame_info->block = nullptr;
362     if (false) {
363       this->_is_transfer_complete = true;
364     }
365     break;
366   default:
367     ink_assert(!"unexpected frame type");
368     break;
369   }
370 }
371 
372 void
_on_frame_lost(QUICFrameInformationUPtr & info)373 QUICSendStream::_on_frame_lost(QUICFrameInformationUPtr &info)
374 {
375   switch (info->type) {
376   case QUICFrameType::RESET_STREAM:
377     // [draft-16] 13.2.  Retransmission of Information
378     // Cancellation of stream transmission, as carried in a RESET_STREAM
379     // frame, is sent until acknowledged or until all stream data is
380     // acknowledged by the peer (that is, either the "Reset Recvd" or
381     // "Data Recvd" state is reached on the send stream).  The content of
382     // a RESET_STREAM frame MUST NOT change when it is sent again.
383     this->_is_reset_sent = false;
384     break;
385   case QUICFrameType::STREAM:
386     this->save_frame_info(std::move(info));
387     break;
388   default:
389     ink_assert(!"unexpected frame type");
390     break;
391   }
392 }
393 
394 QUICOffset
largest_offset_sent() const395 QUICSendStream::largest_offset_sent() const
396 {
397   return this->_remote_flow_controller.current_offset();
398 }
399 
400 //
401 // QUICReceiveStream
402 //
QUICReceiveStream(QUICRTTProvider * rtt_provider,QUICConnectionInfoProvider * cinfo,QUICStreamId sid,uint64_t recv_max_stream_data)403 QUICReceiveStream::QUICReceiveStream(QUICRTTProvider *rtt_provider, QUICConnectionInfoProvider *cinfo, QUICStreamId sid,
404                                      uint64_t recv_max_stream_data)
405   : QUICStreamVConnection(cinfo, sid),
406     _local_flow_controller(rtt_provider, recv_max_stream_data, _id),
407     _flow_control_buffer_size(recv_max_stream_data),
408     _state(this, nullptr)
409 {
410   SET_HANDLER(&QUICReceiveStream::state_stream_open);
411 
412   QUICStreamFCDebug("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller.current_offset(),
413                     this->_local_flow_controller.current_limit());
414 }
415 
416 int
state_stream_open(int event,void * data)417 QUICReceiveStream::state_stream_open(int event, void *data)
418 {
419   QUICVStreamDebug("%s (%d)", get_vc_event_name(event), event);
420   QUICErrorUPtr error = nullptr;
421 
422   switch (event) {
423   case VC_EVENT_READ_READY:
424   case VC_EVENT_READ_COMPLETE: {
425     int64_t len = this->_process_read_vio();
426     if (len > 0) {
427       this->_signal_read_event();
428     }
429 
430     break;
431   }
432   case VC_EVENT_WRITE_READY:
433   case VC_EVENT_WRITE_COMPLETE: {
434     // should not schedule write event
435     ink_assert(!"should not schedule write even");
436     break;
437   }
438   case VC_EVENT_EOS:
439   case VC_EVENT_ERROR:
440   case VC_EVENT_INACTIVITY_TIMEOUT:
441   case VC_EVENT_ACTIVE_TIMEOUT: {
442     // TODO
443     ink_assert(false);
444     break;
445   }
446   default:
447     QUICStreamDebug("unknown event");
448     ink_assert(false);
449   }
450 
451   // FIXME error is always nullptr
452   if (error != nullptr) {
453     if (error->cls == QUICErrorClass::TRANSPORT) {
454       QUICStreamDebug("QUICError: %s (%u), %s (0x%x)", QUICDebugNames::error_class(error->cls),
455                       static_cast<unsigned int>(error->cls), QUICDebugNames::error_code(error->code),
456                       static_cast<unsigned int>(error->code));
457     } else {
458       QUICStreamDebug("QUICError: %s (%u), APPLICATION ERROR (0x%x)", QUICDebugNames::error_class(error->cls),
459                       static_cast<unsigned int>(error->cls), static_cast<unsigned int>(error->code));
460     }
461     if (dynamic_cast<QUICStreamError *>(error.get()) != nullptr) {
462       // Stream Error
463       QUICStreamErrorUPtr serror = QUICStreamErrorUPtr(static_cast<QUICStreamError *>(error.get()));
464       this->reset(std::move(serror));
465     } else {
466       // Connection Error
467       // TODO Close connection (Does this really happen?)
468     }
469   }
470 
471   return EVENT_DONE;
472 }
473 
474 int
state_stream_closed(int event,void * data)475 QUICReceiveStream::state_stream_closed(int event, void *data)
476 {
477   QUICVStreamDebug("%s (%d)", get_vc_event_name(event), event);
478 
479   switch (event) {
480   case VC_EVENT_READ_READY:
481   case VC_EVENT_READ_COMPLETE: {
482     // ignore
483     break;
484   }
485   case VC_EVENT_WRITE_READY:
486   case VC_EVENT_WRITE_COMPLETE: {
487     // ignore
488     break;
489   }
490   case VC_EVENT_EOS:
491   case VC_EVENT_ERROR:
492   case VC_EVENT_INACTIVITY_TIMEOUT:
493   case VC_EVENT_ACTIVE_TIMEOUT: {
494     // TODO
495     ink_assert(false);
496     break;
497   }
498   default:
499     ink_assert(false);
500   }
501 
502   return EVENT_DONE;
503 }
504 
505 bool
is_transfer_goal_set() const506 QUICReceiveStream::is_transfer_goal_set() const
507 {
508   return this->_received_stream_frame_buffer.is_transfer_goal_set();
509 }
510 
511 uint64_t
transfer_progress() const512 QUICReceiveStream::transfer_progress() const
513 {
514   return this->_received_stream_frame_buffer.transfer_progress();
515 }
516 
517 uint64_t
transfer_goal() const518 QUICReceiveStream::transfer_goal() const
519 {
520   return this->_received_stream_frame_buffer.transfer_goal();
521 }
522 
523 bool
is_cancelled() const524 QUICReceiveStream::is_cancelled() const
525 {
526   return this->_is_stop_sending_complete;
527 }
528 
529 bool
will_generate_frame(QUICEncryptionLevel level,size_t current_packet_size,bool ack_eliciting,uint32_t seq_num)530 QUICReceiveStream::will_generate_frame(QUICEncryptionLevel level, size_t current_packet_size, bool ack_eliciting, uint32_t seq_num)
531 {
532   return this->_local_flow_controller.will_generate_frame(level, current_packet_size, ack_eliciting, seq_num) ||
533          (this->_stop_sending_reason != nullptr && this->_is_stop_sending_sent == false);
534 }
535 
536 QUICFrame *
generate_frame(uint8_t * buf,QUICEncryptionLevel level,uint64_t connection_credit,uint16_t maximum_frame_size,size_t current_packet_size,uint32_t seq_num)537 QUICReceiveStream::generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint64_t connection_credit, uint16_t maximum_frame_size,
538                                   size_t current_packet_size, uint32_t seq_num)
539 {
540   QUICFrame *frame = nullptr;
541   // STOP_SENDING
542   if (this->_stop_sending_reason && !this->_is_stop_sending_sent) {
543     frame =
544       QUICFrameFactory::create_stop_sending_frame(buf, this->id(), this->_stop_sending_reason->code, this->_issue_frame_id(), this);
545     this->_records_stop_sending_frame(level, *static_cast<QUICStopSendingFrame *>(frame));
546     this->_state.update_with_sending_frame(*frame);
547     this->_is_stop_sending_sent = true;
548     return frame;
549   }
550 
551   // MAX_STREAM_DATA
552   frame = this->_local_flow_controller.generate_frame(buf, level, UINT16_MAX, maximum_frame_size, current_packet_size, seq_num);
553   return frame;
554 }
555 
556 QUICConnectionErrorUPtr
recv(const QUICRstStreamFrame & frame)557 QUICReceiveStream::recv(const QUICRstStreamFrame &frame)
558 {
559   this->_state.update_with_receiving_frame(frame);
560   this->_signal_read_eos_event();
561   return nullptr;
562 }
563 
564 QUICConnectionErrorUPtr
recv(const QUICStreamDataBlockedFrame & frame)565 QUICReceiveStream::recv(const QUICStreamDataBlockedFrame &frame)
566 {
567   // STREAM_DATA_BLOCKED frames are for debugging. Nothing to do here.
568   QUICStreamFCDebug("[REMOTE] blocked %" PRIu64, frame.offset());
569   return nullptr;
570 }
571 
572 /**
573  * @brief Receive STREAM frame
574  * @detail When receive STREAM frame, reorder frames and write to buffer of read_vio.
575  * If the reordering or writting operation is heavy, split out them to read function,
576  * which is called by application via do_io_read() or reenable().
577  */
578 QUICConnectionErrorUPtr
recv(const QUICStreamFrame & frame)579 QUICReceiveStream::recv(const QUICStreamFrame &frame)
580 {
581   ink_assert(_id == frame.stream_id());
582   ink_assert(this->_read_vio.op == VIO::READ);
583 
584   // Check stream state - Do this first before accept the frame
585   if (!this->_state.is_allowed_to_receive(frame)) {
586     QUICStreamDebug("Canceled receiving %s frame due to the stream state", QUICDebugNames::frame_type(frame.type()));
587     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_STATE_ERROR);
588   }
589 
590   // Flow Control - Even if it's allowed to receive on the state, it may exceed the limit
591   int ret = this->_local_flow_controller.update(frame.offset() + frame.data_length());
592   QUICStreamFCDebug("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller.current_offset(),
593                     this->_local_flow_controller.current_limit());
594   if (ret != 0) {
595     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::FLOW_CONTROL_ERROR);
596   }
597 
598   // Make a copy and insert it into the receive buffer because the frame passed is temporal
599   QUICFrame *cloned             = new QUICStreamFrame(frame);
600   QUICConnectionErrorUPtr error = this->_received_stream_frame_buffer.insert(cloned);
601   if (error != nullptr) {
602     this->_received_stream_frame_buffer.clear();
603     return error;
604   }
605 
606   auto new_frame                      = this->_received_stream_frame_buffer.pop();
607   const QUICStreamFrame *stream_frame = nullptr;
608   uint64_t last_offset                = 0;
609   uint64_t last_length                = 0;
610 
611   while (new_frame != nullptr) {
612     stream_frame = static_cast<const QUICStreamFrame *>(new_frame);
613     last_offset  = stream_frame->offset();
614     last_length  = stream_frame->data_length();
615 
616     this->_write_to_read_vio(stream_frame->offset(), reinterpret_cast<uint8_t *>(stream_frame->data()->start()),
617                              stream_frame->data_length(), stream_frame->has_fin_flag());
618     this->_state.update_with_receiving_frame(*new_frame);
619 
620     delete new_frame;
621     new_frame = this->_received_stream_frame_buffer.pop();
622   }
623 
624   // Forward limit of local flow controller with the largest reordered stream frame
625   if (stream_frame) {
626     this->_reordered_bytes = last_offset + last_length;
627     this->_local_flow_controller.forward_limit(this->_reordered_bytes + this->_flow_control_buffer_size);
628     QUICStreamFCDebug("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller.current_offset(),
629                       this->_local_flow_controller.current_limit());
630   }
631 
632   this->_signal_read_event();
633 
634   return nullptr;
635 }
636 
637 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)638 QUICReceiveStream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
639 {
640   if (buf) {
641     this->_read_vio.buffer.writer_for(buf);
642   } else {
643     this->_read_vio.buffer.clear();
644   }
645 
646   this->_read_vio.mutex     = c ? c->mutex : this->mutex;
647   this->_read_vio.cont      = c;
648   this->_read_vio.nbytes    = nbytes;
649   this->_read_vio.ndone     = 0;
650   this->_read_vio.vc_server = this;
651   this->_read_vio.op        = VIO::READ;
652 
653   this->_process_read_vio();
654   this->_send_tracked_event(this->_read_event, VC_EVENT_READ_READY, &this->_read_vio);
655 
656   return &this->_read_vio;
657 }
658 
659 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool owner)660 QUICReceiveStream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
661 {
662   QUICStreamDebug("Warning wants to write to send only stream ignore");
663   // FIXME: should not assert here
664   ink_assert(!"write to send only stream");
665   return nullptr;
666 }
667 
668 void
do_io_close(int lerrno)669 QUICReceiveStream::do_io_close(int lerrno)
670 {
671   SET_HANDLER(&QUICReceiveStream::state_stream_closed);
672 
673   ink_assert(this->_write_vio.nbytes == 0);
674   ink_assert(this->_write_vio.op == VIO::NONE);
675   ink_assert(this->_write_vio.cont == nullptr);
676   this->_write_vio.buffer.clear();
677 
678   this->_read_vio.buffer.clear();
679   this->_read_vio.nbytes = 0;
680   this->_read_vio.op     = VIO::NONE;
681   this->_read_vio.cont   = nullptr;
682 }
683 
684 void
do_io_shutdown(ShutdownHowTo_t howto)685 QUICReceiveStream::do_io_shutdown(ShutdownHowTo_t howto)
686 {
687   switch (howto) {
688   case IO_SHUTDOWN_WRITE:
689     // ignore
690     break;
691   case IO_SHUTDOWN_READ:
692   case IO_SHUTDOWN_READWRITE:
693     this->do_io_close();
694     break;
695   default:
696     ink_assert(0);
697     break;
698   }
699 }
700 
701 void
reenable(VIO * vio)702 QUICReceiveStream::reenable(VIO *vio)
703 {
704   ink_assert(vio == &this->_read_vio);
705   ink_assert(vio->op == VIO::READ);
706 
707   int64_t len = this->_process_read_vio();
708   if (len > 0) {
709     this->_signal_read_event();
710   }
711 }
712 
713 void
on_read()714 QUICReceiveStream::on_read()
715 {
716   this->_state.update_on_read();
717 }
718 
719 void
on_eos()720 QUICReceiveStream::on_eos()
721 {
722   this->_state.update_on_eos();
723 }
724 
725 QUICOffset
largest_offset_received() const726 QUICReceiveStream::largest_offset_received() const
727 {
728   return this->_local_flow_controller.current_offset();
729 }
730 
731 void
_on_frame_lost(QUICFrameInformationUPtr & info)732 QUICReceiveStream::_on_frame_lost(QUICFrameInformationUPtr &info)
733 {
734   switch (info->type) {
735   case QUICFrameType::STOP_SENDING:
736     this->_is_stop_sending_sent = false;
737     break;
738   default:
739     ink_assert(!"unknown frame type");
740     break;
741   }
742 }
743 
744 void
_on_frame_acked(QUICFrameInformationUPtr & info)745 QUICReceiveStream::_on_frame_acked(QUICFrameInformationUPtr &info)
746 {
747   switch (info->type) {
748   case QUICFrameType::STOP_SENDING:
749     this->_is_stop_sending_complete = true;
750     break;
751   default:
752     ink_assert(!"unknown frame type");
753     break;
754   }
755 }
756 
757 void
stop_sending(QUICStreamErrorUPtr error)758 QUICReceiveStream::stop_sending(QUICStreamErrorUPtr error)
759 {
760   this->_stop_sending_reason = std::move(error);
761 }
762