1 /** @file
2  *
3  *  A brief file description
4  *
5  *  @section license License
6  *
7  *  Licensed to the Apache Software Foundation (ASF) under one
8  *  or more contributor license agreements.  See the NOTICE file
9  *  distributed with this work for additional information
10  *  regarding copyright ownership.  The ASF licenses this file
11  *  to you under the Apache License, Version 2.0 (the
12  *  "License"); you may not use this file except in compliance
13  *  with the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  */
23 
24 #include "QUICStreamManager.h"
25 
26 #include "QUICApplication.h"
27 #include "QUICTransportParameters.h"
28 
29 static constexpr char tag[]                     = "quic_stream_manager";
30 static constexpr QUICStreamId QUIC_STREAM_TYPES = 4;
31 
QUICStreamManager(QUICConnectionInfoProvider * info,QUICRTTProvider * rtt_provider,QUICApplicationMap * app_map)32 QUICStreamManager::QUICStreamManager(QUICConnectionInfoProvider *info, QUICRTTProvider *rtt_provider, QUICApplicationMap *app_map)
33   : _stream_factory(rtt_provider, info), _info(info), _app_map(app_map)
34 {
35   if (this->_info->direction() == NET_VCONNECTION_OUT) {
36     this->_next_stream_id_bidi = static_cast<uint32_t>(QUICStreamType::CLIENT_BIDI);
37     this->_next_stream_id_uni  = static_cast<uint32_t>(QUICStreamType::CLIENT_UNI);
38   } else {
39     this->_next_stream_id_bidi = static_cast<uint32_t>(QUICStreamType::SERVER_BIDI);
40     this->_next_stream_id_uni  = static_cast<uint32_t>(QUICStreamType::SERVER_UNI);
41   }
42 }
43 
44 std::vector<QUICFrameType>
interests()45 QUICStreamManager::interests()
46 {
47   return {
48     QUICFrameType::STREAM,          QUICFrameType::RESET_STREAM, QUICFrameType::STOP_SENDING,
49     QUICFrameType::MAX_STREAM_DATA, QUICFrameType::MAX_STREAMS,
50   };
51 }
52 
53 void
init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> & local_tp,const std::shared_ptr<const QUICTransportParameters> & remote_tp)54 QUICStreamManager::init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> &local_tp,
55                                             const std::shared_ptr<const QUICTransportParameters> &remote_tp)
56 {
57   this->_local_tp  = local_tp;
58   this->_remote_tp = remote_tp;
59 
60   if (this->_local_tp) {
61     this->_local_max_streams_bidi = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_BIDI);
62     this->_local_max_streams_uni  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_UNI);
63   }
64   if (this->_remote_tp) {
65     this->_remote_max_streams_bidi = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_BIDI);
66     this->_remote_max_streams_uni  = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_UNI);
67   }
68 }
69 
70 void
set_max_streams_bidi(uint64_t max_streams)71 QUICStreamManager::set_max_streams_bidi(uint64_t max_streams)
72 {
73   if (this->_local_max_streams_bidi <= max_streams) {
74     this->_local_max_streams_bidi = max_streams;
75   }
76 }
77 
78 void
set_max_streams_uni(uint64_t max_streams)79 QUICStreamManager::set_max_streams_uni(uint64_t max_streams)
80 {
81   if (this->_local_max_streams_uni <= max_streams) {
82     this->_local_max_streams_uni = max_streams;
83   }
84 }
85 
86 QUICConnectionErrorUPtr
create_stream(QUICStreamId stream_id)87 QUICStreamManager::create_stream(QUICStreamId stream_id)
88 {
89   // TODO: check stream_id
90   QUICConnectionErrorUPtr error    = nullptr;
91   QUICStreamVConnection *stream_vc = this->_find_or_create_stream_vc(stream_id);
92   if (!stream_vc) {
93     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
94   }
95 
96   QUICApplication *application = this->_app_map->get(stream_id);
97 
98   if (!application->is_stream_set(stream_vc)) {
99     application->set_stream(stream_vc);
100   }
101 
102   return error;
103 }
104 
105 QUICConnectionErrorUPtr
create_uni_stream(QUICStreamId & new_stream_id)106 QUICStreamManager::create_uni_stream(QUICStreamId &new_stream_id)
107 {
108   QUICConnectionErrorUPtr error = this->create_stream(this->_next_stream_id_uni);
109   if (error == nullptr) {
110     new_stream_id = this->_next_stream_id_uni;
111     this->_next_stream_id_uni += QUIC_STREAM_TYPES;
112   }
113 
114   return error;
115 }
116 
117 QUICConnectionErrorUPtr
create_bidi_stream(QUICStreamId & new_stream_id)118 QUICStreamManager::create_bidi_stream(QUICStreamId &new_stream_id)
119 {
120   QUICConnectionErrorUPtr error = this->create_stream(this->_next_stream_id_bidi);
121   if (error == nullptr) {
122     new_stream_id = this->_next_stream_id_bidi;
123     this->_next_stream_id_bidi += QUIC_STREAM_TYPES;
124   }
125 
126   return error;
127 }
128 
129 void
reset_stream(QUICStreamId stream_id,QUICStreamErrorUPtr error)130 QUICStreamManager::reset_stream(QUICStreamId stream_id, QUICStreamErrorUPtr error)
131 {
132   auto stream = this->_find_stream_vc(stream_id);
133   stream->reset(std::move(error));
134 }
135 
136 QUICConnectionErrorUPtr
handle_frame(QUICEncryptionLevel level,const QUICFrame & frame)137 QUICStreamManager::handle_frame(QUICEncryptionLevel level, const QUICFrame &frame)
138 {
139   QUICConnectionErrorUPtr error = nullptr;
140 
141   switch (frame.type()) {
142   case QUICFrameType::MAX_STREAM_DATA:
143     error = this->_handle_frame(static_cast<const QUICMaxStreamDataFrame &>(frame));
144     break;
145   case QUICFrameType::STREAM_DATA_BLOCKED:
146     // STREAM_DATA_BLOCKED frame is for debugging. Just propagate to streams
147     error = this->_handle_frame(static_cast<const QUICStreamDataBlockedFrame &>(frame));
148     break;
149   case QUICFrameType::STREAM:
150     error = this->_handle_frame(static_cast<const QUICStreamFrame &>(frame));
151     break;
152   case QUICFrameType::STOP_SENDING:
153     error = this->_handle_frame(static_cast<const QUICStopSendingFrame &>(frame));
154     break;
155   case QUICFrameType::RESET_STREAM:
156     error = this->_handle_frame(static_cast<const QUICRstStreamFrame &>(frame));
157     break;
158   case QUICFrameType::MAX_STREAMS:
159     error = this->_handle_frame(static_cast<const QUICMaxStreamsFrame &>(frame));
160     break;
161   default:
162     Debug(tag, "Unexpected frame type: %02x", static_cast<unsigned int>(frame.type()));
163     ink_assert(false);
164     break;
165   }
166 
167   return error;
168 }
169 
170 QUICConnectionErrorUPtr
_handle_frame(const QUICMaxStreamDataFrame & frame)171 QUICStreamManager::_handle_frame(const QUICMaxStreamDataFrame &frame)
172 {
173   QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
174   if (stream) {
175     return stream->recv(frame);
176   } else {
177     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
178   }
179 }
180 
181 QUICConnectionErrorUPtr
_handle_frame(const QUICStreamDataBlockedFrame & frame)182 QUICStreamManager::_handle_frame(const QUICStreamDataBlockedFrame &frame)
183 {
184   QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
185   if (stream) {
186     return stream->recv(frame);
187   } else {
188     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
189   }
190 }
191 
192 QUICConnectionErrorUPtr
_handle_frame(const QUICStreamFrame & frame)193 QUICStreamManager::_handle_frame(const QUICStreamFrame &frame)
194 {
195   QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
196   if (!stream) {
197     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
198   }
199 
200   QUICApplication *application = this->_app_map->get(frame.stream_id());
201 
202   if (application && !application->is_stream_set(stream)) {
203     application->set_stream(stream);
204   }
205 
206   return stream->recv(frame);
207 }
208 
209 QUICConnectionErrorUPtr
_handle_frame(const QUICRstStreamFrame & frame)210 QUICStreamManager::_handle_frame(const QUICRstStreamFrame &frame)
211 {
212   QUICStream *stream = this->_find_or_create_stream_vc(frame.stream_id());
213   if (stream) {
214     return stream->recv(frame);
215   } else {
216     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
217   }
218 }
219 
220 QUICConnectionErrorUPtr
_handle_frame(const QUICStopSendingFrame & frame)221 QUICStreamManager::_handle_frame(const QUICStopSendingFrame &frame)
222 {
223   QUICStream *stream = this->_find_or_create_stream_vc(frame.stream_id());
224   if (stream) {
225     return stream->recv(frame);
226   } else {
227     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
228   }
229 }
230 
231 QUICConnectionErrorUPtr
_handle_frame(const QUICMaxStreamsFrame & frame)232 QUICStreamManager::_handle_frame(const QUICMaxStreamsFrame &frame)
233 {
234   QUICStreamType type = QUICTypeUtil::detect_stream_type(frame.maximum_streams());
235   if (type == QUICStreamType::SERVER_BIDI || type == QUICStreamType::CLIENT_BIDI) {
236     this->_remote_max_streams_bidi = frame.maximum_streams();
237   } else {
238     this->_remote_max_streams_uni = frame.maximum_streams();
239   }
240   return nullptr;
241 }
242 
243 QUICStreamVConnection *
_find_stream_vc(QUICStreamId id)244 QUICStreamManager::_find_stream_vc(QUICStreamId id)
245 {
246   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
247     if (s->id() == id) {
248       return s;
249     }
250   }
251   return nullptr;
252 }
253 
254 QUICStreamVConnection *
_find_or_create_stream_vc(QUICStreamId stream_id)255 QUICStreamManager::_find_or_create_stream_vc(QUICStreamId stream_id)
256 {
257   QUICStreamVConnection *stream = this->_find_stream_vc(stream_id);
258   if (!stream) {
259     if (!this->_local_tp) {
260       return nullptr;
261     }
262 
263     ink_assert(this->_local_tp);
264     ink_assert(this->_remote_tp);
265 
266     uint64_t local_max_stream_data  = 0;
267     uint64_t remote_max_stream_data = 0;
268 
269     switch (QUICTypeUtil::detect_stream_type(stream_id)) {
270     case QUICStreamType::CLIENT_BIDI:
271       if (this->_info->direction() == NET_VCONNECTION_OUT) {
272         // client
273         if (this->_remote_max_streams_bidi == 0 || stream_id > this->_remote_max_streams_bidi) {
274           return nullptr;
275         }
276 
277         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
278         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
279       } else {
280         // server
281         if (this->_local_max_streams_bidi == 0 || stream_id > this->_local_max_streams_bidi) {
282           return nullptr;
283         }
284 
285         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
286         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
287       }
288 
289       break;
290     case QUICStreamType::CLIENT_UNI:
291       if (this->_info->direction() == NET_VCONNECTION_OUT) {
292         // client
293         if (this->_remote_max_streams_uni == 0 || stream_id > this->_remote_max_streams_uni) {
294           return nullptr;
295         }
296       } else {
297         // server
298         if (this->_local_max_streams_uni == 0 || stream_id > this->_local_max_streams_uni) {
299           return nullptr;
300         }
301       }
302 
303       local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
304       remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
305 
306       break;
307     case QUICStreamType::SERVER_BIDI:
308       if (this->_info->direction() == NET_VCONNECTION_OUT) {
309         // client
310         if (this->_local_max_streams_bidi == 0 || stream_id > this->_local_max_streams_bidi) {
311           return nullptr;
312         }
313 
314         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
315         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
316       } else {
317         // server
318         if (this->_remote_max_streams_bidi == 0 || stream_id > this->_remote_max_streams_bidi) {
319           return nullptr;
320         }
321 
322         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
323         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
324       }
325       break;
326     case QUICStreamType::SERVER_UNI:
327       if (this->_info->direction() == NET_VCONNECTION_OUT) {
328         if (this->_local_max_streams_uni == 0 || stream_id > this->_local_max_streams_uni) {
329           return nullptr;
330         }
331       } else {
332         if (this->_remote_max_streams_uni == 0 || stream_id > this->_remote_max_streams_uni) {
333           return nullptr;
334         }
335       }
336 
337       local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
338       remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
339 
340       break;
341     default:
342       ink_release_assert(false);
343       break;
344     }
345 
346     stream = this->_stream_factory.create(stream_id, local_max_stream_data, remote_max_stream_data);
347     ink_assert(stream != nullptr);
348     this->stream_list.push(stream);
349   }
350 
351   return stream;
352 }
353 
354 uint64_t
total_reordered_bytes() const355 QUICStreamManager::total_reordered_bytes() const
356 {
357   uint64_t total_bytes = 0;
358 
359   // FIXME Iterating all (open + closed) streams is expensive
360   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
361     total_bytes += s->reordered_bytes();
362   }
363   return total_bytes;
364 }
365 
366 uint64_t
total_offset_received() const367 QUICStreamManager::total_offset_received() const
368 {
369   uint64_t total_offset_received = 0;
370 
371   // FIXME Iterating all (open + closed) streams is expensive
372   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
373     total_offset_received += s->largest_offset_received();
374   }
375   return total_offset_received;
376 }
377 
378 uint64_t
total_offset_sent() const379 QUICStreamManager::total_offset_sent() const
380 {
381   return this->_total_offset_sent;
382 }
383 
384 void
_add_total_offset_sent(uint32_t sent_byte)385 QUICStreamManager::_add_total_offset_sent(uint32_t sent_byte)
386 {
387   // FIXME: use atomic increment
388   this->_total_offset_sent += sent_byte;
389 }
390 
391 uint32_t
stream_count() const392 QUICStreamManager::stream_count() const
393 {
394   uint32_t count = 0;
395   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
396     ++count;
397   }
398   return count;
399 }
400 
401 void
set_default_application(QUICApplication * app)402 QUICStreamManager::set_default_application(QUICApplication *app)
403 {
404   this->_app_map->set_default(app);
405 }
406 
407 bool
will_generate_frame(QUICEncryptionLevel level,size_t current_packet_size,bool ack_eliciting,uint32_t seq_num)408 QUICStreamManager::will_generate_frame(QUICEncryptionLevel level, size_t current_packet_size, bool ack_eliciting, uint32_t seq_num)
409 {
410   if (!this->_is_level_matched(level)) {
411     return false;
412   }
413 
414   // workaround fix until support 0-RTT on client
415   if (level == QUICEncryptionLevel::ZERO_RTT) {
416     return false;
417   }
418 
419   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
420     if (s->will_generate_frame(level, current_packet_size, ack_eliciting, seq_num)) {
421       return true;
422     }
423   }
424 
425   return false;
426 }
427 
428 QUICFrame *
generate_frame(uint8_t * buf,QUICEncryptionLevel level,uint64_t connection_credit,uint16_t maximum_frame_size,size_t current_packet_size,uint32_t seq_num)429 QUICStreamManager::generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint64_t connection_credit, uint16_t maximum_frame_size,
430                                   size_t current_packet_size, uint32_t seq_num)
431 {
432   QUICFrame *frame = nullptr;
433 
434   if (!this->_is_level_matched(level)) {
435     return frame;
436   }
437 
438   // workaround fix until support 0-RTT on client
439   if (level == QUICEncryptionLevel::ZERO_RTT) {
440     return frame;
441   }
442 
443   // FIXME We should pick a stream based on priority
444   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
445     frame = s->generate_frame(buf, level, connection_credit, maximum_frame_size, current_packet_size, seq_num);
446     if (frame) {
447       break;
448     }
449   }
450 
451   if (frame != nullptr && frame->type() == QUICFrameType::STREAM) {
452     this->_add_total_offset_sent(static_cast<QUICStreamFrame *>(frame)->data_length());
453   }
454 
455   return frame;
456 }
457 
458 bool
_is_level_matched(QUICEncryptionLevel level)459 QUICStreamManager::_is_level_matched(QUICEncryptionLevel level)
460 {
461   for (auto l : this->_encryption_level_filter) {
462     if (l == level) {
463       return true;
464     }
465   }
466 
467   return false;
468 }
469