1 /** @file
2  *
3  *  A brief file description
4  *
5  *  @section license License
6  *
7  *  Licensed to the Apache Software Foundation (ASF) under one
8  *  or more contributor license agreements.  See the NOTICE file
9  *  distributed with this work for additional information
10  *  regarding copyright ownership.  The ASF licenses this file
11  *  to you under the Apache License, Version 2.0 (the
12  *  "License"); you may not use this file except in compliance
13  *  with the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  */
23 
24 #include "QUICBidirectionalStream.h"
25 
26 //
27 // QUICBidirectionalStream
28 //
QUICBidirectionalStream(QUICRTTProvider * rtt_provider,QUICConnectionInfoProvider * cinfo,QUICStreamId sid,uint64_t recv_max_stream_data,uint64_t send_max_stream_data)29 QUICBidirectionalStream::QUICBidirectionalStream(QUICRTTProvider *rtt_provider, QUICConnectionInfoProvider *cinfo, QUICStreamId sid,
30                                                  uint64_t recv_max_stream_data, uint64_t send_max_stream_data)
31   : QUICStreamVConnection(cinfo, sid),
32     _remote_flow_controller(send_max_stream_data, _id),
33     _local_flow_controller(rtt_provider, recv_max_stream_data, _id),
34     _flow_control_buffer_size(recv_max_stream_data),
35     _state(nullptr, &this->_progress_vio, this, nullptr)
36 {
37   SET_HANDLER(&QUICBidirectionalStream::state_stream_open);
38 
39   QUICStreamFCDebug("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller.current_offset(),
40                     this->_local_flow_controller.current_limit());
41   QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller.current_offset(),
42                     this->_remote_flow_controller.current_limit());
43 }
44 
45 int
state_stream_open(int event,void * data)46 QUICBidirectionalStream::state_stream_open(int event, void *data)
47 {
48   QUICVStreamDebug("%s (%d)", get_vc_event_name(event), event);
49   QUICErrorUPtr error = nullptr;
50 
51   switch (event) {
52   case VC_EVENT_READ_READY:
53   case VC_EVENT_READ_COMPLETE: {
54     int64_t len = this->_process_read_vio();
55     if (len > 0) {
56       this->_signal_read_event();
57     }
58 
59     break;
60   }
61   case VC_EVENT_WRITE_READY:
62   case VC_EVENT_WRITE_COMPLETE: {
63     int64_t len = this->_process_write_vio();
64     if (len > 0) {
65       this->_signal_write_event();
66     }
67 
68     break;
69   }
70   case VC_EVENT_EOS:
71   case VC_EVENT_ERROR:
72   case VC_EVENT_INACTIVITY_TIMEOUT:
73   case VC_EVENT_ACTIVE_TIMEOUT: {
74     // TODO
75     ink_assert(false);
76     break;
77   }
78   default:
79     QUICStreamDebug("unknown event");
80     ink_assert(false);
81   }
82 
83   // FIXME error is always nullptr
84   if (error != nullptr) {
85     if (error->cls == QUICErrorClass::TRANSPORT) {
86       QUICStreamDebug("QUICError: %s (%u), %s (0x%x)", QUICDebugNames::error_class(error->cls),
87                       static_cast<unsigned int>(error->cls), QUICDebugNames::error_code(error->code),
88                       static_cast<unsigned int>(error->code));
89     } else {
90       QUICStreamDebug("QUICError: %s (%u), APPLICATION ERROR (0x%x)", QUICDebugNames::error_class(error->cls),
91                       static_cast<unsigned int>(error->cls), static_cast<unsigned int>(error->code));
92     }
93     if (dynamic_cast<QUICStreamError *>(error.get()) != nullptr) {
94       // Stream Error
95       QUICStreamErrorUPtr serror = QUICStreamErrorUPtr(static_cast<QUICStreamError *>(error.get()));
96       this->reset(std::move(serror));
97     } else {
98       // Connection Error
99       // TODO Close connection (Does this really happen?)
100     }
101   }
102 
103   return EVENT_DONE;
104 }
105 
106 int
state_stream_closed(int event,void * data)107 QUICBidirectionalStream::state_stream_closed(int event, void *data)
108 {
109   QUICVStreamDebug("%s (%d)", get_vc_event_name(event), event);
110 
111   switch (event) {
112   case VC_EVENT_READ_READY:
113   case VC_EVENT_READ_COMPLETE: {
114     // ignore
115     break;
116   }
117   case VC_EVENT_WRITE_READY:
118   case VC_EVENT_WRITE_COMPLETE: {
119     // ignore
120     break;
121   }
122   case VC_EVENT_EOS:
123   case VC_EVENT_ERROR:
124   case VC_EVENT_INACTIVITY_TIMEOUT:
125   case VC_EVENT_ACTIVE_TIMEOUT: {
126     // TODO
127     ink_assert(false);
128     break;
129   }
130   default:
131     ink_assert(false);
132   }
133 
134   return EVENT_DONE;
135 }
136 
137 bool
is_transfer_goal_set() const138 QUICBidirectionalStream::is_transfer_goal_set() const
139 {
140   return this->_received_stream_frame_buffer.is_transfer_goal_set();
141 }
142 
143 uint64_t
transfer_progress() const144 QUICBidirectionalStream::transfer_progress() const
145 {
146   return this->_received_stream_frame_buffer.transfer_progress();
147 }
148 
149 uint64_t
transfer_goal() const150 QUICBidirectionalStream::transfer_goal() const
151 {
152   return this->_received_stream_frame_buffer.transfer_goal();
153 }
154 
155 bool
is_cancelled() const156 QUICBidirectionalStream::is_cancelled() const
157 {
158   return this->_is_reset_complete;
159 }
160 
161 /**
162  * @brief Receive STREAM frame
163  * @detail When receive STREAM frame, reorder frames and write to buffer of read_vio.
164  * If the reordering or writting operation is heavy, split out them to read function,
165  * which is called by application via do_io_read() or reenable().
166  */
167 QUICConnectionErrorUPtr
recv(const QUICStreamFrame & frame)168 QUICBidirectionalStream::recv(const QUICStreamFrame &frame)
169 {
170   ink_assert(_id == frame.stream_id());
171   ink_assert(this->_read_vio.op == VIO::READ);
172 
173   // Check stream state - Do this first before accept the frame
174   if (!this->_state.is_allowed_to_receive(frame)) {
175     QUICStreamDebug("Canceled receiving %s frame due to the stream state", QUICDebugNames::frame_type(frame.type()));
176     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_STATE_ERROR);
177   }
178 
179   // Flow Control - Even if it's allowed to receive on the state, it may exceed the limit
180   int ret = this->_local_flow_controller.update(frame.offset() + frame.data_length());
181   QUICStreamFCDebug("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller.current_offset(),
182                     this->_local_flow_controller.current_limit());
183   if (ret != 0) {
184     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::FLOW_CONTROL_ERROR);
185   }
186 
187   // Make a copy and insert it into the receive buffer because the frame passed is temporal
188   QUICFrame *cloned             = new QUICStreamFrame(frame);
189   QUICConnectionErrorUPtr error = this->_received_stream_frame_buffer.insert(cloned);
190   if (error != nullptr) {
191     this->_received_stream_frame_buffer.clear();
192     return error;
193   }
194 
195   auto new_frame                      = this->_received_stream_frame_buffer.pop();
196   const QUICStreamFrame *stream_frame = nullptr;
197   uint64_t last_offset                = 0;
198   uint64_t last_length                = 0;
199 
200   while (new_frame != nullptr) {
201     stream_frame = static_cast<const QUICStreamFrame *>(new_frame);
202     last_offset  = stream_frame->offset();
203     last_length  = stream_frame->data_length();
204 
205     this->_write_to_read_vio(stream_frame->offset(), reinterpret_cast<uint8_t *>(stream_frame->data()->start()),
206                              stream_frame->data_length(), stream_frame->has_fin_flag());
207     this->_state.update_with_receiving_frame(*new_frame);
208 
209     delete new_frame;
210     new_frame = this->_received_stream_frame_buffer.pop();
211   }
212 
213   // Forward limit of local flow controller with the largest reordered stream frame
214   if (stream_frame) {
215     this->_reordered_bytes = last_offset + last_length;
216     this->_local_flow_controller.forward_limit(this->_reordered_bytes + this->_flow_control_buffer_size);
217     QUICStreamFCDebug("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller.current_offset(),
218                       this->_local_flow_controller.current_limit());
219   }
220 
221   this->_signal_read_event();
222 
223   return nullptr;
224 }
225 
226 QUICConnectionErrorUPtr
recv(const QUICMaxStreamDataFrame & frame)227 QUICBidirectionalStream::recv(const QUICMaxStreamDataFrame &frame)
228 {
229   this->_remote_flow_controller.forward_limit(frame.maximum_stream_data());
230   QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller.current_offset(),
231                     this->_remote_flow_controller.current_limit());
232 
233   int64_t len = this->_process_write_vio();
234   if (len > 0) {
235     this->_signal_write_event();
236   }
237 
238   return nullptr;
239 }
240 
241 QUICConnectionErrorUPtr
recv(const QUICStreamDataBlockedFrame & frame)242 QUICBidirectionalStream::recv(const QUICStreamDataBlockedFrame &frame)
243 {
244   // STREAM_DATA_BLOCKED frames are for debugging. Nothing to do here.
245   QUICStreamFCDebug("[REMOTE] blocked %" PRIu64, frame.offset());
246   return nullptr;
247 }
248 
249 QUICConnectionErrorUPtr
recv(const QUICStopSendingFrame & frame)250 QUICBidirectionalStream::recv(const QUICStopSendingFrame &frame)
251 {
252   this->_state.update_with_receiving_frame(frame);
253   this->_reset_reason = QUICStreamErrorUPtr(new QUICStreamError(this, QUIC_APP_ERROR_CODE_STOPPING));
254   // We received and processed STOP_SENDING frame, so return NO_ERROR here
255   return nullptr;
256 }
257 
258 QUICConnectionErrorUPtr
recv(const QUICRstStreamFrame & frame)259 QUICBidirectionalStream::recv(const QUICRstStreamFrame &frame)
260 {
261   this->_state.update_with_receiving_frame(frame);
262   this->_signal_read_eos_event();
263   return nullptr;
264 }
265 
266 // this->_read_vio.nbytes should be INT64_MAX until receive FIN flag
267 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)268 QUICBidirectionalStream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
269 {
270   if (buf) {
271     this->_read_vio.buffer.writer_for(buf);
272   } else {
273     this->_read_vio.buffer.clear();
274   }
275 
276   this->_read_vio.mutex     = c ? c->mutex : this->mutex;
277   this->_read_vio.cont      = c;
278   this->_read_vio.nbytes    = nbytes;
279   this->_read_vio.ndone     = 0;
280   this->_read_vio.vc_server = this;
281   this->_read_vio.op        = VIO::READ;
282 
283   this->_process_read_vio();
284   this->_send_tracked_event(this->_read_event, VC_EVENT_READ_READY, &this->_read_vio);
285 
286   return &this->_read_vio;
287 }
288 
289 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool owner)290 QUICBidirectionalStream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
291 {
292   if (buf) {
293     this->_write_vio.buffer.reader_for(buf);
294   } else {
295     this->_write_vio.buffer.clear();
296   }
297 
298   this->_write_vio.mutex     = c ? c->mutex : this->mutex;
299   this->_write_vio.cont      = c;
300   this->_write_vio.nbytes    = nbytes;
301   this->_write_vio.ndone     = 0;
302   this->_write_vio.vc_server = this;
303   this->_write_vio.op        = VIO::WRITE;
304 
305   this->_process_write_vio();
306   this->_send_tracked_event(this->_write_event, VC_EVENT_WRITE_READY, &this->_write_vio);
307 
308   return &this->_write_vio;
309 }
310 
311 void
do_io_close(int lerrno)312 QUICBidirectionalStream::do_io_close(int lerrno)
313 {
314   SET_HANDLER(&QUICBidirectionalStream::state_stream_closed);
315 
316   this->_read_vio.buffer.clear();
317   this->_read_vio.nbytes = 0;
318   this->_read_vio.op     = VIO::NONE;
319   this->_read_vio.cont   = nullptr;
320 
321   this->_write_vio.buffer.clear();
322   this->_write_vio.nbytes = 0;
323   this->_write_vio.op     = VIO::NONE;
324   this->_write_vio.cont   = nullptr;
325 }
326 
327 void
do_io_shutdown(ShutdownHowTo_t howto)328 QUICBidirectionalStream::do_io_shutdown(ShutdownHowTo_t howto)
329 {
330   ink_assert(false); // unimplemented yet
331   return;
332 }
333 
334 void
reenable(VIO * vio)335 QUICBidirectionalStream::reenable(VIO *vio)
336 {
337   if (vio->op == VIO::READ) {
338     QUICVStreamDebug("read_vio reenabled");
339 
340     int64_t len = this->_process_read_vio();
341     if (len > 0) {
342       this->_signal_read_event();
343     }
344   } else if (vio->op == VIO::WRITE) {
345     QUICVStreamDebug("write_vio reenabled");
346 
347     int64_t len = this->_process_write_vio();
348     if (len > 0) {
349       this->_signal_write_event();
350     }
351   }
352 }
353 
354 bool
will_generate_frame(QUICEncryptionLevel level,size_t current_packet_size,bool ack_eliciting,uint32_t seq_num)355 QUICBidirectionalStream::will_generate_frame(QUICEncryptionLevel level, size_t current_packet_size, bool ack_eliciting,
356                                              uint32_t seq_num)
357 {
358   return this->_local_flow_controller.will_generate_frame(level, current_packet_size, ack_eliciting, seq_num) ||
359          !this->is_retransmited_frame_queue_empty() || this->_write_vio.get_reader()->is_read_avail_more_than(0);
360 }
361 
362 QUICFrame *
generate_frame(uint8_t * buf,QUICEncryptionLevel level,uint64_t connection_credit,uint16_t maximum_frame_size,size_t current_packet_size,uint32_t seq_num)363 QUICBidirectionalStream::generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint64_t connection_credit,
364                                         uint16_t maximum_frame_size, size_t current_packet_size, uint32_t seq_num)
365 {
366   SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
367 
368   QUICFrame *frame = this->create_retransmitted_frame(buf, level, maximum_frame_size, this->_issue_frame_id(), this);
369   if (frame != nullptr) {
370     ink_assert(frame->type() == QUICFrameType::STREAM);
371     this->_records_stream_frame(level, *static_cast<QUICStreamFrame *>(frame));
372     return frame;
373   }
374 
375   // RESET_STREAM
376   if (this->_reset_reason && !this->_is_reset_sent) {
377     frame = QUICFrameFactory::create_rst_stream_frame(buf, *this->_reset_reason, this->_issue_frame_id(), this);
378     this->_records_rst_stream_frame(level, *static_cast<QUICRstStreamFrame *>(frame));
379     this->_state.update_with_sending_frame(*frame);
380     this->_is_reset_sent = true;
381     return frame;
382   }
383 
384   // STOP_SENDING
385   if (this->_stop_sending_reason && !this->_is_stop_sending_sent) {
386     frame =
387       QUICFrameFactory::create_stop_sending_frame(buf, this->id(), this->_stop_sending_reason->code, this->_issue_frame_id(), this);
388     this->_records_stop_sending_frame(level, *static_cast<QUICStopSendingFrame *>(frame));
389     this->_state.update_with_sending_frame(*frame);
390     this->_is_stop_sending_sent = true;
391     return frame;
392   }
393 
394   // MAX_STREAM_DATA
395   frame = this->_local_flow_controller.generate_frame(buf, level, UINT16_MAX, maximum_frame_size, current_packet_size, seq_num);
396   if (frame) {
397     return frame;
398   }
399 
400   if (!this->_state.is_allowed_to_send(QUICFrameType::STREAM)) {
401     return frame;
402   }
403 
404   uint64_t maximum_data_size = 0;
405   if (maximum_frame_size <= MAX_STREAM_FRAME_OVERHEAD) {
406     return frame;
407   }
408   maximum_data_size = maximum_frame_size - MAX_STREAM_FRAME_OVERHEAD;
409 
410   bool pure_fin = false;
411   bool fin      = false;
412   if ((this->_write_vio.nbytes != 0 || this->_write_vio.nbytes != INT64_MAX) &&
413       this->_write_vio.nbytes == static_cast<int64_t>(this->_send_offset)) {
414     // Pure FIN stream should be sent regardless status of remote flow controller, because the length is zero.
415     pure_fin = true;
416     fin      = true;
417   }
418 
419   uint64_t len           = 0;
420   IOBufferReader *reader = this->_write_vio.get_reader();
421   if (!pure_fin) {
422     uint64_t data_len = reader->block_read_avail();
423     if (data_len == 0) {
424       return frame;
425     }
426 
427     // Check Connection/Stream level credit only if the generating STREAM frame is not pure fin
428     uint64_t stream_credit = this->_remote_flow_controller.credit();
429     if (stream_credit == 0) {
430       // STREAM_DATA_BLOCKED
431       frame =
432         this->_remote_flow_controller.generate_frame(buf, level, UINT16_MAX, maximum_frame_size, current_packet_size, seq_num);
433       return frame;
434     }
435 
436     if (connection_credit == 0) {
437       // BLOCKED - BLOCKED frame will be sent by connection level remote flow controller
438       return frame;
439     }
440 
441     len = std::min(data_len, std::min(maximum_data_size, std::min(stream_credit, connection_credit)));
442 
443     // data_len, maximum_data_size, stream_credit and connection_credit are already checked they're larger than 0
444     ink_assert(len != 0);
445 
446     if (this->_write_vio.nbytes == static_cast<int64_t>(this->_send_offset + len)) {
447       fin = true;
448     }
449   }
450 
451   Ptr<IOBufferBlock> block = make_ptr<IOBufferBlock>(reader->get_current_block()->clone());
452   block->consume(reader->start_offset);
453   block->_end = std::min(block->start() + len, block->_buf_end);
454   ink_assert(static_cast<uint64_t>(block->read_avail()) == len);
455 
456   // STREAM - Pure FIN or data length is lager than 0
457   // FIXME has_length_flag and has_offset_flag should be configurable
458   frame = QUICFrameFactory::create_stream_frame(buf, block, this->_id, this->_send_offset, fin, true, true, this->_issue_frame_id(),
459                                                 this);
460   if (!this->_state.is_allowed_to_send(*frame)) {
461     QUICStreamDebug("Canceled sending %s frame due to the stream state", QUICDebugNames::frame_type(frame->type()));
462     return frame;
463   }
464 
465   if (!pure_fin) {
466     int ret = this->_remote_flow_controller.update(this->_send_offset + len);
467     // We cannot cancel sending the frame after updating the flow controller
468 
469     // Calling update always success, because len is always less than stream_credit
470     ink_assert(ret == 0);
471 
472     QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller.current_offset(),
473                       this->_remote_flow_controller.current_limit());
474     if (this->_remote_flow_controller.current_offset() == this->_remote_flow_controller.current_limit()) {
475       QUICStreamDebug("Flow Controller will block sending a STREAM frame");
476     }
477 
478     reader->consume(len);
479     this->_send_offset += len;
480     this->_write_vio.ndone += len;
481   }
482   this->_records_stream_frame(level, *static_cast<QUICStreamFrame *>(frame));
483 
484   this->_signal_write_event();
485   this->_state.update_with_sending_frame(*frame);
486 
487   return frame;
488 }
489 
490 void
_on_frame_acked(QUICFrameInformationUPtr & info)491 QUICBidirectionalStream::_on_frame_acked(QUICFrameInformationUPtr &info)
492 {
493   StreamFrameInfo *frame_info = nullptr;
494   switch (info->type) {
495   case QUICFrameType::RESET_STREAM:
496     this->_is_reset_complete = true;
497     break;
498   case QUICFrameType::STREAM:
499     frame_info        = reinterpret_cast<StreamFrameInfo *>(info->data);
500     frame_info->block = nullptr;
501     if (false) {
502       this->_is_transfer_complete = true;
503     }
504     break;
505   case QUICFrameType::STOP_SENDING:
506   default:
507     break;
508   }
509 
510   this->_state.update_on_ack();
511 }
512 
513 void
_on_frame_lost(QUICFrameInformationUPtr & info)514 QUICBidirectionalStream::_on_frame_lost(QUICFrameInformationUPtr &info)
515 {
516   switch (info->type) {
517   case QUICFrameType::RESET_STREAM:
518     // [draft-16] 13.2.  Retransmission of Information
519     // Cancellation of stream transmission, as carried in a RESET_STREAM
520     // frame, is sent until acknowledged or until all stream data is
521     // acknowledged by the peer (that is, either the "Reset Recvd" or
522     // "Data Recvd" state is reached on the send stream).  The content of
523     // a RESET_STREAM frame MUST NOT change when it is sent again.
524     this->_is_reset_sent = false;
525     break;
526   case QUICFrameType::STREAM:
527     this->save_frame_info(std::move(info));
528     break;
529   case QUICFrameType::STOP_SENDING:
530     this->_is_stop_sending_sent = false;
531     break;
532   default:
533     break;
534   }
535 }
536 
537 void
stop_sending(QUICStreamErrorUPtr error)538 QUICBidirectionalStream::stop_sending(QUICStreamErrorUPtr error)
539 {
540   this->_stop_sending_reason = std::move(error);
541 }
542 
543 void
reset(QUICStreamErrorUPtr error)544 QUICBidirectionalStream::reset(QUICStreamErrorUPtr error)
545 {
546   this->_reset_reason = std::move(error);
547 }
548 
549 void
on_read()550 QUICBidirectionalStream::on_read()
551 {
552   this->_state.update_on_read();
553 }
554 
555 void
on_eos()556 QUICBidirectionalStream::on_eos()
557 {
558   this->_state.update_on_eos();
559 }
560 
561 QUICOffset
largest_offset_received() const562 QUICBidirectionalStream::largest_offset_received() const
563 {
564   return this->_local_flow_controller.current_offset();
565 }
566 
567 QUICOffset
largest_offset_sent() const568 QUICBidirectionalStream::largest_offset_sent() const
569 {
570   return this->_remote_flow_controller.current_offset();
571 }
572