1 /** @file
2  *
3  *  A brief file description
4  *
5  *  @section license License
6  *
7  *  Licensed to the Apache Software Foundation (ASF) under one
8  *  or more contributor license agreements.  See the NOTICE file
9  *  distributed with this work for additional information
10  *  regarding copyright ownership.  The ASF licenses this file
11  *  to you under the Apache License, Version 2.0 (the
12  *  "License"); you may not use this file except in compliance
13  *  with the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  */
23 
24 #include "QUICApplication.h"
25 #include "QUICStream.h"
26 
27 static constexpr char tag_stream_io[] = "quic_stream_io";
28 static constexpr char tag_app[]       = "quic_app";
29 
30 #define QUICStreamIODebug(fmt, ...)                                                                                           \
31   Debug(tag_stream_io, "[%s] [%" PRIu64 "] " fmt, this->_stream_vc->connection_info()->cids().data(), this->_stream_vc->id(), \
32         ##__VA_ARGS__)
33 
34 //
35 // QUICStreamIO
36 //
QUICStreamIO(QUICApplication * app,QUICStreamVConnection * stream_vc)37 QUICStreamIO::QUICStreamIO(QUICApplication *app, QUICStreamVConnection *stream_vc) : _stream_vc(stream_vc)
38 {
39   this->_read_buffer  = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);
40   this->_write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);
41 
42   this->_read_buffer_reader  = this->_read_buffer->alloc_reader();
43   this->_write_buffer_reader = this->_write_buffer->alloc_reader();
44 
45   switch (stream_vc->direction()) {
46   case QUICStreamDirection::BIDIRECTIONAL:
47     this->_read_vio  = stream_vc->do_io_read(app, INT64_MAX, this->_read_buffer);
48     this->_write_vio = stream_vc->do_io_write(app, INT64_MAX, this->_write_buffer_reader);
49     break;
50   case QUICStreamDirection::SEND:
51     this->_write_vio = stream_vc->do_io_write(app, INT64_MAX, this->_write_buffer_reader);
52     break;
53   case QUICStreamDirection::RECEIVE:
54     this->_read_vio = stream_vc->do_io_read(app, INT64_MAX, this->_read_buffer);
55     break;
56   default:
57     ink_assert(false);
58     break;
59   }
60 }
61 
~QUICStreamIO()62 QUICStreamIO::~QUICStreamIO()
63 {
64   // All readers will be deallocated
65   free_MIOBuffer(this->_read_buffer);
66   free_MIOBuffer(this->_write_buffer);
67 };
68 
69 uint32_t
stream_id() const70 QUICStreamIO::stream_id() const
71 {
72   return this->_stream_vc->id();
73 }
74 
75 bool
is_bidirectional() const76 QUICStreamIO::is_bidirectional() const
77 {
78   return this->_stream_vc->is_bidirectional();
79 }
80 
81 int64_t
read(uint8_t * buf,int64_t len)82 QUICStreamIO::read(uint8_t *buf, int64_t len)
83 {
84   if (is_debug_tag_set(tag_stream_io)) {
85     if (this->_read_vio->nbytes == INT64_MAX) {
86       QUICStreamIODebug("nbytes=- ndone=%" PRId64 " read_avail=%" PRId64 " read_len=%" PRId64, this->_read_vio->ndone,
87                         this->_read_buffer_reader->read_avail(), len);
88     } else {
89       QUICStreamIODebug("nbytes=%" PRId64 " ndone=%" PRId64 " read_avail=%" PRId64 " read_len=%" PRId64, this->_read_vio->nbytes,
90                         this->_read_vio->ndone, this->_read_buffer_reader->read_avail(), len);
91     }
92   }
93 
94   int64_t nread = this->_read_buffer_reader->read(buf, len);
95   if (nread > 0) {
96     this->_read_vio->ndone += nread;
97   }
98 
99   this->_stream_vc->on_read();
100 
101   return nread;
102 }
103 
104 int64_t
peek(uint8_t * buf,int64_t len)105 QUICStreamIO::peek(uint8_t *buf, int64_t len)
106 {
107   return this->_read_buffer_reader->memcpy(buf, len) - reinterpret_cast<char *>(buf);
108 }
109 
110 void
consume(int64_t len)111 QUICStreamIO::consume(int64_t len)
112 {
113   this->_read_buffer_reader->consume(len);
114   this->_stream_vc->on_read();
115 }
116 
117 bool
is_read_done()118 QUICStreamIO::is_read_done()
119 {
120   return this->_read_vio->ntodo() == 0;
121 }
122 
123 int64_t
write(const uint8_t * buf,int64_t len)124 QUICStreamIO::write(const uint8_t *buf, int64_t len)
125 {
126   SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());
127 
128   int64_t nwritten = this->_write_buffer->write(buf, len);
129   if (nwritten > 0) {
130     this->_nwritten += nwritten;
131   }
132 
133   return len;
134 }
135 
136 int64_t
write(IOBufferReader * r,int64_t len)137 QUICStreamIO::write(IOBufferReader *r, int64_t len)
138 {
139   SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());
140 
141   int64_t bytes_avail = this->_write_buffer->write_avail();
142 
143   if (bytes_avail > 0) {
144     if (is_debug_tag_set(tag_stream_io)) {
145       if (this->_write_vio->nbytes == INT64_MAX) {
146         QUICStreamIODebug("nbytes=- ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64, this->_write_vio->ndone,
147                           bytes_avail, len);
148       } else {
149         QUICStreamIODebug("nbytes=%" PRId64 " ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64,
150                           this->_write_vio->nbytes, this->_write_vio->ndone, bytes_avail, len);
151       }
152     }
153 
154     int64_t bytes_len = std::min(bytes_avail, len);
155     int64_t nwritten  = this->_write_buffer->write(r, bytes_len);
156 
157     if (nwritten > 0) {
158       this->_nwritten += nwritten;
159     }
160 
161     return nwritten;
162   } else {
163     return 0;
164   }
165 }
166 
167 // TODO: Similar to other "write" apis, but do not copy.
168 int64_t
write(IOBufferBlock * b)169 QUICStreamIO::write(IOBufferBlock *b)
170 {
171   ink_assert(!"not implemented yet");
172   return 0;
173 }
174 
175 void
write_done()176 QUICStreamIO::write_done()
177 {
178   this->_write_vio->nbytes = this->_nwritten;
179 }
180 
181 void
read_reenable()182 QUICStreamIO::read_reenable()
183 {
184   return this->_read_vio->reenable();
185 }
186 
187 void
write_reenable()188 QUICStreamIO::write_reenable()
189 {
190   return this->_write_vio->reenable();
191 }
192 
193 //
194 // QUICApplication
195 //
QUICApplication(QUICConnection * qc)196 QUICApplication::QUICApplication(QUICConnection *qc) : Continuation(new_ProxyMutex())
197 {
198   this->_qc = qc;
199 }
200 
~QUICApplication()201 QUICApplication::~QUICApplication()
202 {
203   for (auto const &kv : this->_stream_map) {
204     delete kv.second;
205   }
206 }
207 
208 // @brief Bind stream and application
209 void
set_stream(QUICStreamVConnection * stream_vc,QUICStreamIO * stream_io)210 QUICApplication::set_stream(QUICStreamVConnection *stream_vc, QUICStreamIO *stream_io)
211 {
212   if (stream_io == nullptr) {
213     stream_io = new QUICStreamIO(this, stream_vc);
214   }
215   this->_stream_map.insert(std::make_pair(stream_vc->id(), stream_io));
216 }
217 
218 // @brief Bind stream and application
219 void
set_stream(QUICStreamIO * stream_io)220 QUICApplication::set_stream(QUICStreamIO *stream_io)
221 {
222   this->_stream_map.insert(std::make_pair(stream_io->stream_id(), stream_io));
223 }
224 
225 bool
is_stream_set(QUICStreamVConnection * stream)226 QUICApplication::is_stream_set(QUICStreamVConnection *stream)
227 {
228   auto result = this->_stream_map.find(stream->id());
229 
230   return result != this->_stream_map.end();
231 }
232 
233 void
reenable(QUICStreamVConnection * stream)234 QUICApplication::reenable(QUICStreamVConnection *stream)
235 {
236   QUICStreamIO *stream_io = this->_find_stream_io(stream->id());
237   if (stream_io) {
238     stream_io->read_reenable();
239     stream_io->write_reenable();
240   } else {
241     Debug(tag_app, "[%s] Unknown Stream id=%" PRIx64, this->_qc->cids().data(), stream->id());
242   }
243 
244   return;
245 }
246 
247 void
unset_stream(QUICStreamVConnection * stream)248 QUICApplication::unset_stream(QUICStreamVConnection *stream)
249 {
250   QUICStreamIO *stream_io = this->_find_stream_io(stream->id());
251   if (stream_io) {
252     this->_stream_map.erase(stream->id());
253   }
254 }
255 
256 QUICStreamIO *
_find_stream_io(QUICStreamId id)257 QUICApplication::_find_stream_io(QUICStreamId id)
258 {
259   auto result = this->_stream_map.find(id);
260 
261   if (result == this->_stream_map.end()) {
262     return nullptr;
263   } else {
264     return result->second;
265   }
266 }
267 
268 QUICStreamIO *
_find_stream_io(VIO * vio)269 QUICApplication::_find_stream_io(VIO *vio)
270 {
271   if (vio == nullptr) {
272     return nullptr;
273   }
274 
275   QUICStream *stream = dynamic_cast<QUICStream *>(vio->vc_server);
276   if (stream == nullptr) {
277     return nullptr;
278   }
279 
280   return this->_find_stream_io(stream->id());
281 }
282