xref: /trafficserver/proxy/http2/Http2Stream.cc (revision 9ab53fe9)
1 /** @file
2 
3   Http2Stream.cc
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "Http2Stream.h"
25 
26 #include "HTTP2.h"
27 #include "Http2ClientSession.h"
28 #include "../http/HttpSM.h"
29 
30 #include <numeric>
31 
32 #define REMEMBER(e, r)                                    \
33   {                                                       \
34     this->_history.push_back(MakeSourceLocation(), e, r); \
35   }
36 
37 #define Http2StreamDebug(fmt, ...) \
38   SsnDebug(_proxy_ssn, "http2_stream", "[%" PRId64 "] [%u] " fmt, _proxy_ssn->connection_id(), this->get_id(), ##__VA_ARGS__);
39 
40 ClassAllocator<Http2Stream> http2StreamAllocator("http2StreamAllocator");
41 
Http2Stream(Http2StreamId sid,ssize_t initial_rwnd)42 Http2Stream::Http2Stream(Http2StreamId sid, ssize_t initial_rwnd) : _id(sid), _client_rwnd(initial_rwnd)
43 {
44   SET_HANDLER(&Http2Stream::main_event_handler);
45 }
46 
47 void
init(Http2StreamId sid,ssize_t initial_rwnd)48 Http2Stream::init(Http2StreamId sid, ssize_t initial_rwnd)
49 {
50   this->mark_milestone(Http2StreamMilestone::OPEN);
51 
52   this->_id          = sid;
53   this->_thread      = this_ethread();
54   this->_client_rwnd = initial_rwnd;
55   this->_server_rwnd = Http2::initial_window_size;
56 
57   this->_reader = this->_request_buffer.alloc_reader();
58 
59   _req_header.create(HTTP_TYPE_REQUEST);
60   response_header.create(HTTP_TYPE_RESPONSE);
61   // TODO: init _req_header instead of response_header if this Http2Stream is outgoing
62   http2_init_pseudo_headers(response_header);
63 
64   http_parser_init(&http_parser);
65 }
66 
67 int
main_event_handler(int event,void * edata)68 Http2Stream::main_event_handler(int event, void *edata)
69 {
70   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
71   REMEMBER(event, this->reentrancy_count);
72 
73   if (!this->_switch_thread_if_not_on_right_thread(event, edata)) {
74     // Not on the right thread
75     return 0;
76   }
77   ink_release_assert(this->_thread == this_ethread());
78 
79   Event *e = static_cast<Event *>(edata);
80   reentrancy_count++;
81   if (e == _read_vio_event) {
82     _read_vio_event = nullptr;
83     this->signal_read_event(e->callback_event);
84     return 0;
85   } else if (e == _write_vio_event) {
86     _write_vio_event = nullptr;
87     this->signal_write_event(e->callback_event);
88     return 0;
89   } else if (e == cross_thread_event) {
90     cross_thread_event = nullptr;
91   } else if (e == read_event) {
92     read_event = nullptr;
93   } else if (e == write_event) {
94     write_event = nullptr;
95   } else if (e == buffer_full_write_event) {
96     buffer_full_write_event = nullptr;
97   }
98 
99   switch (event) {
100   case VC_EVENT_ACTIVE_TIMEOUT:
101   case VC_EVENT_INACTIVITY_TIMEOUT:
102     if (_sm && read_vio.ntodo() > 0) {
103       this->signal_read_event(event);
104     } else if (_sm && write_vio.ntodo() > 0) {
105       this->signal_write_event(event);
106     }
107     break;
108   case VC_EVENT_WRITE_READY:
109   case VC_EVENT_WRITE_COMPLETE:
110     _timeout.update_inactivity();
111     if (e->cookie == &write_vio) {
112       if (write_vio.mutex && write_vio.cont && this->_sm) {
113         this->signal_write_event(event);
114       }
115     } else {
116       update_write_request(true);
117     }
118     break;
119   case VC_EVENT_READ_COMPLETE:
120   case VC_EVENT_READ_READY:
121     _timeout.update_inactivity();
122     if (e->cookie == &read_vio) {
123       if (read_vio.mutex && read_vio.cont && this->_sm) {
124         signal_read_event(event);
125       }
126     } else {
127       this->update_read_request(true);
128     }
129     break;
130   case VC_EVENT_EOS:
131     if (e->cookie == &read_vio) {
132       SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
133       read_vio.cont->handleEvent(VC_EVENT_EOS, &read_vio);
134     } else if (e->cookie == &write_vio) {
135       SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
136       write_vio.cont->handleEvent(VC_EVENT_EOS, &write_vio);
137     }
138     break;
139   }
140   reentrancy_count--;
141   // Clean stream up if the terminate flag is set and we are at the bottom of the handler stack
142   terminate_if_possible();
143 
144   return 0;
145 }
146 
147 Http2ErrorCode
decode_header_blocks(HpackHandle & hpack_handle,uint32_t maximum_table_size)148 Http2Stream::decode_header_blocks(HpackHandle &hpack_handle, uint32_t maximum_table_size)
149 {
150   return http2_decode_header_blocks(&_req_header, (const uint8_t *)header_blocks, header_blocks_length, nullptr, hpack_handle,
151                                     trailing_header, maximum_table_size);
152 }
153 
154 void
send_request(Http2ConnectionState & cstate)155 Http2Stream::send_request(Http2ConnectionState &cstate)
156 {
157   ink_release_assert(this->_sm != nullptr);
158   this->_http_sm_id = this->_sm->sm_id;
159 
160   // Convert header to HTTP/1.1 format
161   http2_convert_header_from_2_to_1_1(&_req_header);
162 
163   // Write header to a buffer.  Borrowing logic from HttpSM::write_header_into_buffer.
164   // Seems like a function like this ought to be in HTTPHdr directly
165   int bufindex;
166   int dumpoffset = 0;
167   int done, tmp;
168   do {
169     bufindex             = 0;
170     tmp                  = dumpoffset;
171     IOBufferBlock *block = this->_request_buffer.get_current_block();
172     if (!block) {
173       this->_request_buffer.add_block();
174       block = this->_request_buffer.get_current_block();
175     }
176     done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp);
177     dumpoffset += bufindex;
178     this->_request_buffer.fill(bufindex);
179     if (!done) {
180       this->_request_buffer.add_block();
181     }
182   } while (!done);
183 
184   if (bufindex == 0) {
185     // No data to signal read event
186     return;
187   }
188 
189   if (this->recv_end_stream) {
190     this->read_vio.nbytes = bufindex;
191     this->signal_read_event(VC_EVENT_READ_COMPLETE);
192   } else {
193     this->signal_read_event(VC_EVENT_READ_READY);
194   }
195 }
196 
197 bool
change_state(uint8_t type,uint8_t flags)198 Http2Stream::change_state(uint8_t type, uint8_t flags)
199 {
200   switch (_state) {
201   case Http2StreamState::HTTP2_STREAM_STATE_IDLE:
202     if (type == HTTP2_FRAME_TYPE_HEADERS) {
203       if (recv_end_stream) {
204         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
205       } else if (send_end_stream) {
206         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
207       } else {
208         _state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
209       }
210     } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
211       if (recv_end_stream) {
212         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
213       } else if (send_end_stream) {
214         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
215       } else {
216         _state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
217       }
218     } else if (type == HTTP2_FRAME_TYPE_PUSH_PROMISE) {
219       _state = Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL;
220     } else {
221       return false;
222     }
223     break;
224 
225   case Http2StreamState::HTTP2_STREAM_STATE_OPEN:
226     if (type == HTTP2_FRAME_TYPE_RST_STREAM) {
227       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
228     } else if (type == HTTP2_FRAME_TYPE_DATA) {
229       if (recv_end_stream) {
230         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
231       } else if (send_end_stream) {
232         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
233       } else {
234         // Do not change state
235       }
236     } else {
237       // A stream in the "open" state may be used by both peers to send frames of any type.
238       return true;
239     }
240     break;
241 
242   case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL:
243     if (type == HTTP2_FRAME_TYPE_HEADERS) {
244       if (flags & HTTP2_FLAGS_HEADERS_END_HEADERS) {
245         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
246       }
247     } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
248       if (flags & HTTP2_FLAGS_CONTINUATION_END_HEADERS) {
249         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
250       }
251     } else {
252       return false;
253     }
254     break;
255 
256   case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_REMOTE:
257     // Currently ATS supports only HTTP/2 server features
258     return false;
259 
260   case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL:
261     if (type == HTTP2_FRAME_TYPE_RST_STREAM || recv_end_stream) {
262       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
263     } else {
264       // Error, set state closed
265       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
266       return false;
267     }
268     break;
269 
270   case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE:
271     if (type == HTTP2_FRAME_TYPE_RST_STREAM || send_end_stream) {
272       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
273     } else if (type == HTTP2_FRAME_TYPE_HEADERS) { // w/o END_STREAM flag
274       // No state change here. Expect a following DATA frame with END_STREAM flag.
275       return true;
276     } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) { // w/o END_STREAM flag
277       // No state change here. Expect a following DATA frame with END_STREAM flag.
278       return true;
279     } else {
280       // Error, set state closed
281       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
282       return false;
283     }
284     break;
285 
286   case Http2StreamState::HTTP2_STREAM_STATE_CLOSED:
287     // No state changing
288     return true;
289 
290   default:
291     return false;
292   }
293 
294   Http2StreamDebug("%s", Http2DebugNames::get_state_name(_state));
295 
296   return true;
297 }
298 
299 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)300 Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
301 {
302   if (buf) {
303     read_vio.buffer.writer_for(buf);
304   } else {
305     read_vio.buffer.clear();
306   }
307 
308   read_vio.mutex     = c ? c->mutex : this->mutex;
309   read_vio.cont      = c;
310   read_vio.nbytes    = nbytes;
311   read_vio.ndone     = 0;
312   read_vio.vc_server = this;
313   read_vio.op        = VIO::READ;
314 
315   // TODO: re-enable read_vio
316 
317   return &read_vio;
318 }
319 
320 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuffer,bool owner)321 Http2Stream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
322 {
323   if (abuffer) {
324     write_vio.buffer.reader_for(abuffer);
325   } else {
326     write_vio.buffer.clear();
327   }
328   write_vio.mutex     = c ? c->mutex : this->mutex;
329   write_vio.cont      = c;
330   write_vio.nbytes    = nbytes;
331   write_vio.ndone     = 0;
332   write_vio.vc_server = this;
333   write_vio.op        = VIO::WRITE;
334 
335   if (c != nullptr && nbytes > 0 && this->is_client_state_writeable()) {
336     update_write_request(false);
337   } else if (!this->is_client_state_writeable()) {
338     // Cannot start a write on a closed stream
339     return nullptr;
340   }
341   return &write_vio;
342 }
343 
344 // Initiated from SM
345 void
do_io_close(int)346 Http2Stream::do_io_close(int /* flags */)
347 {
348   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
349   super::release(nullptr);
350 
351   if (!closed) {
352     REMEMBER(NO_EVENT, this->reentrancy_count);
353     Http2StreamDebug("do_io_close");
354 
355     // When we get here, the SM has initiated the shutdown.  Either it received a WRITE_COMPLETE, or it is shutting down.  Any
356     // remaining IO operations back to client should be abandoned.  The SM-side buffers backing these operations will be deleted
357     // by the time this is called from transaction_done.
358     closed = true;
359 
360     if (_proxy_ssn && this->is_client_state_writeable()) {
361       // Make sure any trailing end of stream frames are sent
362       // Wee will be removed at send_data_frames or closing connection phase
363       Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
364       SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
365       h2_proxy_ssn->connection_state.send_data_frames(this);
366     }
367 
368     clear_io_events();
369 
370     // Wait until transaction_done is called from HttpSM to signal that the TXN_CLOSE hook has been executed
371   }
372 }
373 
374 /*
375  *  HttpSM has called TXN_close hooks.
376  */
377 void
transaction_done()378 Http2Stream::transaction_done()
379 {
380   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
381   if (cross_thread_event) {
382     cross_thread_event->cancel();
383     cross_thread_event = nullptr;
384   }
385 
386   if (!closed) {
387     do_io_close(); // Make sure we've been closed.  If we didn't close the _proxy_ssn session better still be open
388   }
389   ink_release_assert(closed || !static_cast<Http2ClientSession *>(_proxy_ssn)->connection_state.is_state_closed());
390   _sm = nullptr;
391 
392   if (closed) {
393     // Safe to initiate SSN_CLOSE if this is the last stream
394     ink_assert(cross_thread_event == nullptr);
395     // Schedule the destroy to occur after we unwind here.  IF we call directly, may delete with reference on the stack.
396     terminate_stream = true;
397     terminate_if_possible();
398   }
399 }
400 
401 void
terminate_if_possible()402 Http2Stream::terminate_if_possible()
403 {
404   if (terminate_stream && reentrancy_count == 0) {
405     REMEMBER(NO_EVENT, this->reentrancy_count);
406 
407     Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
408     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
409     destroy();
410   }
411 }
412 
413 // Initiated from the Http2 side
414 void
initiating_close()415 Http2Stream::initiating_close()
416 {
417   if (!closed) {
418     SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
419     REMEMBER(NO_EVENT, this->reentrancy_count);
420     Http2StreamDebug("initiating_close");
421 
422     // Set the state of the connection to closed
423     // TODO - these states should be combined
424     closed = true;
425     _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
426 
427     // leaving the reference to the SM, so we can detach from the SM when we actually destroy
428     // _sm = NULL;
429     // Leaving reference to client session as well, so we can signal once the
430     // TXN_CLOSE has been sent
431     // _proxy_ssn = NULL;
432 
433     clear_io_events();
434 
435     // This should result in do_io_close or release being called.  That will schedule the final
436     // kill yourself signal
437     // We are sending signals rather than calling the handlers directly to avoid the case where
438     // the HttpTunnel handler causes the HttpSM to be deleted on the stack.
439     bool sent_write_complete = false;
440     if (_sm) {
441       // Push out any last IO events
442       if (write_vio.cont) {
443         SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
444         // Are we done?
445         if (write_vio.nbytes == write_vio.ndone) {
446           Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_WRITE_COMPLETE);
447           write_event = send_tracked_event(write_event, VC_EVENT_WRITE_COMPLETE, &write_vio);
448         } else {
449           write_event = send_tracked_event(write_event, VC_EVENT_EOS, &write_vio);
450           Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_EOS);
451         }
452         sent_write_complete = true;
453       }
454     }
455     // Send EOS to let SM know that we aren't sticking around
456     if (_sm && read_vio.cont) {
457       // Only bother with the EOS if we haven't sent the write complete
458       if (!sent_write_complete) {
459         SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
460         Http2StreamDebug("send EOS to read cont");
461         read_event = send_tracked_event(read_event, VC_EVENT_EOS, &read_vio);
462       }
463     } else if (!sent_write_complete) {
464       // Transaction is already gone or not started. Kill yourself
465       do_io_close();
466       destroy();
467     }
468   }
469 }
470 
471 /* Replace existing event only if the new event is different than the inprogress event */
472 Event *
send_tracked_event(Event * event,int send_event,VIO * vio)473 Http2Stream::send_tracked_event(Event *event, int send_event, VIO *vio)
474 {
475   if (event != nullptr) {
476     if (event->callback_event != send_event) {
477       event->cancel();
478       event = nullptr;
479     }
480   }
481 
482   if (event == nullptr) {
483     REMEMBER(send_event, this->reentrancy_count);
484     event = this_ethread()->schedule_imm(this, send_event, vio);
485   }
486 
487   return event;
488 }
489 
490 void
update_read_request(bool call_update)491 Http2Stream::update_read_request(bool call_update)
492 {
493   if (closed || _proxy_ssn == nullptr || _sm == nullptr || read_vio.mutex == nullptr) {
494     return;
495   }
496 
497   if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_READ_READY, nullptr)) {
498     // Not on the right thread
499     return;
500   }
501   ink_release_assert(this->_thread == this_ethread());
502 
503   SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
504   if (read_vio.nbytes == 0) {
505     return;
506   }
507 
508   // Try to be smart and only signal if there was additional data
509   int send_event = VC_EVENT_READ_READY;
510   if (read_vio.ntodo() == 0 || (this->recv_end_stream && this->read_vio.nbytes != INT64_MAX)) {
511     send_event = VC_EVENT_READ_COMPLETE;
512   }
513 
514   int64_t read_avail = this->read_vio.buffer.writer()->max_read_avail();
515   if (read_avail > 0 || send_event == VC_EVENT_READ_COMPLETE) {
516     if (call_update) { // Safe to call vio handler directly
517       _timeout.update_inactivity();
518       if (read_vio.cont && this->_sm) {
519         read_vio.cont->handleEvent(send_event, &read_vio);
520       }
521     } else { // Called from do_io_read.  Still setting things up.  Send event
522       // to handle this after the dust settles
523       read_event = send_tracked_event(read_event, send_event, &read_vio);
524     }
525   }
526 }
527 
528 void
restart_sending()529 Http2Stream::restart_sending()
530 {
531   if (!this->response_header_done) {
532     return;
533   }
534 
535   IOBufferReader *reader = this->response_get_data_reader();
536   if (reader && !reader->is_read_avail_more_than(0)) {
537     return;
538   }
539 
540   if (this->write_vio.mutex && this->write_vio.ntodo() == 0) {
541     return;
542   }
543 
544   this->send_response_body(true);
545 }
546 
547 void
update_write_request(bool call_update)548 Http2Stream::update_write_request(bool call_update)
549 {
550   if (!this->is_client_state_writeable() || closed || _proxy_ssn == nullptr || write_vio.mutex == nullptr ||
551       write_vio.get_reader() == nullptr) {
552     return;
553   }
554 
555   if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_WRITE_READY, nullptr)) {
556     // Not on the right thread
557     return;
558   }
559   ink_release_assert(this->_thread == this_ethread());
560 
561   Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
562 
563   SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
564 
565   IOBufferReader *vio_reader = write_vio.get_reader();
566   if (write_vio.ntodo() == 0 || !vio_reader->is_read_avail_more_than(0)) {
567     return;
568   }
569 
570   // Process the new data
571   if (!this->response_header_done) {
572     // Still parsing the response_header
573     int bytes_used = 0;
574     int state      = this->response_header.parse_resp(&http_parser, vio_reader, &bytes_used, false);
575     // HTTPHdr::parse_resp() consumed the vio_reader in above (consumed size is `bytes_used`)
576     write_vio.ndone += bytes_used;
577 
578     switch (state) {
579     case PARSE_RESULT_DONE: {
580       this->response_header_done = true;
581 
582       // Schedule session shutdown if response header has "Connection: close"
583       MIMEField *field = this->response_header.field_find(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION);
584       if (field) {
585         int len;
586         const char *value = field->value_get(&len);
587         if (memcmp(HTTP_VALUE_CLOSE, value, HTTP_LEN_CLOSE) == 0) {
588           SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
589           if (h2_proxy_ssn->connection_state.get_shutdown_state() == HTTP2_SHUTDOWN_NONE) {
590             h2_proxy_ssn->connection_state.set_shutdown_state(HTTP2_SHUTDOWN_NOT_INITIATED, Http2ErrorCode::HTTP2_ERROR_NO_ERROR);
591           }
592         }
593       }
594 
595       {
596         SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
597         // Send the response header back
598         h2_proxy_ssn->connection_state.send_headers_frame(this);
599       }
600 
601       // Roll back states of response header to read final response
602       if (this->response_header.expect_final_response()) {
603         this->response_header_done = false;
604         response_header.destroy();
605         response_header.create(HTTP_TYPE_RESPONSE);
606         http2_init_pseudo_headers(response_header);
607         http_parser_clear(&http_parser);
608         http_parser_init(&http_parser);
609       }
610 
611       this->signal_write_event(call_update);
612 
613       if (vio_reader->is_read_avail_more_than(0)) {
614         this->_milestones.mark(Http2StreamMilestone::START_TX_DATA_FRAMES);
615         this->send_response_body(call_update);
616       }
617       break;
618     }
619     case PARSE_RESULT_CONT:
620       // Let it ride for next time
621       break;
622     default:
623       break;
624     }
625   } else {
626     this->_milestones.mark(Http2StreamMilestone::START_TX_DATA_FRAMES);
627     this->send_response_body(call_update);
628   }
629 
630   return;
631 }
632 
633 void
signal_read_event(int event)634 Http2Stream::signal_read_event(int event)
635 {
636   if (this->read_vio.cont == nullptr || this->read_vio.cont->mutex == nullptr || this->read_vio.op == VIO::NONE) {
637     return;
638   }
639 
640   MUTEX_TRY_LOCK(lock, read_vio.cont->mutex, this_ethread());
641   if (lock.is_locked()) {
642     _timeout.update_inactivity();
643     this->read_vio.cont->handleEvent(event, &this->read_vio);
644   } else {
645     if (this->_read_vio_event) {
646       this->_read_vio_event->cancel();
647     }
648     this->_read_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &read_vio);
649   }
650 }
651 
652 void
signal_write_event(int event)653 Http2Stream::signal_write_event(int event)
654 {
655   if (this->write_vio.cont == nullptr || this->write_vio.cont->mutex == nullptr || this->write_vio.op == VIO::NONE) {
656     return;
657   }
658 
659   MUTEX_TRY_LOCK(lock, write_vio.cont->mutex, this_ethread());
660   if (lock.is_locked()) {
661     _timeout.update_inactivity();
662     this->write_vio.cont->handleEvent(event, &this->write_vio);
663   } else {
664     if (this->_write_vio_event) {
665       this->_write_vio_event->cancel();
666     }
667     this->_write_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &write_vio);
668   }
669 }
670 
671 void
signal_write_event(bool call_update)672 Http2Stream::signal_write_event(bool call_update)
673 {
674   if (this->write_vio.cont == nullptr || this->write_vio.op == VIO::NONE) {
675     return;
676   }
677 
678   if (this->write_vio.get_writer()->write_avail() == 0) {
679     return;
680   }
681 
682   int send_event = this->write_vio.ntodo() == 0 ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY;
683 
684   if (call_update) {
685     // Coming from reenable.  Safe to call the handler directly
686     if (write_vio.cont && this->_sm) {
687       write_vio.cont->handleEvent(send_event, &write_vio);
688     }
689   } else {
690     // Called from do_io_write. Might still be setting up state. Send an event to let the dust settle
691     write_event = send_tracked_event(write_event, send_event, &write_vio);
692   }
693 }
694 
695 bool
push_promise(URL & url,const MIMEField * accept_encoding)696 Http2Stream::push_promise(URL &url, const MIMEField *accept_encoding)
697 {
698   Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
699   SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
700   return h2_proxy_ssn->connection_state.send_push_promise_frame(this, url, accept_encoding);
701 }
702 
703 void
send_response_body(bool call_update)704 Http2Stream::send_response_body(bool call_update)
705 {
706   Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
707   _timeout.update_inactivity();
708 
709   if (Http2::stream_priority_enabled) {
710     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
711     h2_proxy_ssn->connection_state.schedule_stream(this);
712     // signal_write_event() will be called from `Http2ConnectionState::send_data_frames_depends_on_priority()`
713     // when write_vio is consumed
714   } else {
715     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
716     h2_proxy_ssn->connection_state.send_data_frames(this);
717     this->signal_write_event(call_update);
718     // XXX The call to signal_write_event can destroy/free the Http2Stream.
719     // Don't modify the Http2Stream after calling this method.
720   }
721 }
722 
723 void
reenable(VIO * vio)724 Http2Stream::reenable(VIO *vio)
725 {
726   if (this->_proxy_ssn) {
727     if (vio->op == VIO::WRITE) {
728       SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
729       update_write_request(true);
730     } else if (vio->op == VIO::READ) {
731       Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
732       {
733         SCOPED_MUTEX_LOCK(ssn_lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
734         h2_proxy_ssn->connection_state.restart_receiving(this);
735       }
736 
737       SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
738       update_read_request(true);
739     }
740   }
741 }
742 
743 void
destroy()744 Http2Stream::destroy()
745 {
746   REMEMBER(NO_EVENT, this->reentrancy_count);
747   Http2StreamDebug("Destroy stream, sent %" PRIu64 " bytes", this->bytes_sent);
748   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
749   // Clean up after yourself if this was an EOS
750   ink_release_assert(this->closed);
751   ink_release_assert(reentrancy_count == 0);
752 
753   uint64_t cid = 0;
754 
755   // Safe to initiate SSN_CLOSE if this is the last stream
756   if (_proxy_ssn) {
757     cid = _proxy_ssn->connection_id();
758 
759     Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(_proxy_ssn);
760     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
761     // Make sure the stream is removed from the stream list and priority tree
762     // In many cases, this has been called earlier, so this call is a no-op
763     h2_proxy_ssn->connection_state.delete_stream(this);
764 
765     h2_proxy_ssn->connection_state.decrement_stream_count();
766 
767     // Update session's stream counts, so it accurately goes into keep-alive state
768     h2_proxy_ssn->connection_state.release_stream();
769 
770     // Do not access `_proxy_ssn` in below. It might be freed by `release_stream`.
771   }
772 
773   // Clean up the write VIO in case of inactivity timeout
774   this->do_io_write(nullptr, 0, nullptr);
775 
776   this->_milestones.mark(Http2StreamMilestone::CLOSE);
777 
778   ink_hrtime total_time = this->_milestones.elapsed(Http2StreamMilestone::OPEN, Http2StreamMilestone::CLOSE);
779   HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, this->_thread, total_time);
780 
781   // Slow Log
782   if (Http2::stream_slow_log_threshold != 0 && ink_hrtime_from_msec(Http2::stream_slow_log_threshold) < total_time) {
783     Error("[%" PRIu64 "] [%" PRIu32 "] [%" PRId64 "] Slow H2 Stream: "
784           "open: %" PRIu64 " "
785           "dec_hdrs: %.3f "
786           "txn: %.3f "
787           "enc_hdrs: %.3f "
788           "tx_hdrs: %.3f "
789           "tx_data: %.3f "
790           "close: %.3f",
791           cid, static_cast<uint32_t>(this->_id), this->_http_sm_id,
792           ink_hrtime_to_msec(this->_milestones[Http2StreamMilestone::OPEN]),
793           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_DECODE_HEADERS),
794           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TXN),
795           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_ENCODE_HEADERS),
796           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TX_HEADERS_FRAMES),
797           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TX_DATA_FRAMES),
798           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::CLOSE));
799   }
800 
801   _req_header.destroy();
802   response_header.destroy();
803 
804   // Drop references to all buffer data
805   this->_request_buffer.clear();
806 
807   // Free the mutexes in the VIO
808   read_vio.mutex.clear();
809   write_vio.mutex.clear();
810 
811   if (header_blocks) {
812     ats_free(header_blocks);
813   }
814   clear_io_events();
815   http_parser_clear(&http_parser);
816 
817   super::destroy();
818   THREAD_FREE(this, http2StreamAllocator, this_ethread());
819 }
820 
821 IOBufferReader *
response_get_data_reader() const822 Http2Stream::response_get_data_reader() const
823 {
824   return write_vio.get_reader();
825 }
826 
827 void
set_active_timeout(ink_hrtime timeout_in)828 Http2Stream::set_active_timeout(ink_hrtime timeout_in)
829 {
830   _timeout.set_active_timeout(timeout_in);
831 }
832 
833 void
set_inactivity_timeout(ink_hrtime timeout_in)834 Http2Stream::set_inactivity_timeout(ink_hrtime timeout_in)
835 {
836   _timeout.set_inactive_timeout(timeout_in);
837 }
838 
839 void
cancel_active_timeout()840 Http2Stream::cancel_active_timeout()
841 {
842   _timeout.cancel_active_timeout();
843 }
844 
845 void
cancel_inactivity_timeout()846 Http2Stream::cancel_inactivity_timeout()
847 {
848   _timeout.cancel_inactive_timeout();
849 }
850 
851 bool
is_active_timeout_expired(ink_hrtime now)852 Http2Stream::is_active_timeout_expired(ink_hrtime now)
853 {
854   return _timeout.is_active_timeout_expired(now);
855 }
856 
857 bool
is_inactive_timeout_expired(ink_hrtime now)858 Http2Stream::is_inactive_timeout_expired(ink_hrtime now)
859 {
860   return _timeout.is_inactive_timeout_expired(now);
861 }
862 
863 void
clear_io_events()864 Http2Stream::clear_io_events()
865 {
866   if (read_event) {
867     read_event->cancel();
868     read_event = nullptr;
869   }
870 
871   if (write_event) {
872     write_event->cancel();
873     write_event = nullptr;
874   }
875 
876   if (buffer_full_write_event) {
877     buffer_full_write_event->cancel();
878     buffer_full_write_event = nullptr;
879   }
880 
881   if (this->_read_vio_event) {
882     this->_read_vio_event->cancel();
883     this->_read_vio_event = nullptr;
884   }
885 
886   if (this->_write_vio_event) {
887     this->_write_vio_event->cancel();
888     this->_write_vio_event = nullptr;
889   }
890 }
891 
892 void
release(IOBufferReader * r)893 Http2Stream::release(IOBufferReader *r)
894 {
895   super::release(r);
896   this->do_io_close();
897 }
898 
899 void
increment_client_transactions_stat()900 Http2Stream::increment_client_transactions_stat()
901 {
902   HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
903   HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_CLIENT_STREAM_COUNT, _thread);
904 }
905 
906 void
decrement_client_transactions_stat()907 Http2Stream::decrement_client_transactions_stat()
908 {
909   HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
910 }
911 
912 ssize_t
client_rwnd() const913 Http2Stream::client_rwnd() const
914 {
915   return this->_client_rwnd;
916 }
917 
918 Http2ErrorCode
increment_client_rwnd(size_t amount)919 Http2Stream::increment_client_rwnd(size_t amount)
920 {
921   this->_client_rwnd += amount;
922 
923   this->_recent_rwnd_increment[this->_recent_rwnd_increment_index] = amount;
924   ++this->_recent_rwnd_increment_index;
925   this->_recent_rwnd_increment_index %= this->_recent_rwnd_increment.size();
926   double sum = std::accumulate(this->_recent_rwnd_increment.begin(), this->_recent_rwnd_increment.end(), 0.0);
927   double avg = sum / this->_recent_rwnd_increment.size();
928   if (avg < Http2::min_avg_window_update) {
929     return Http2ErrorCode::HTTP2_ERROR_ENHANCE_YOUR_CALM;
930   }
931   return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
932 }
933 
934 Http2ErrorCode
decrement_client_rwnd(size_t amount)935 Http2Stream::decrement_client_rwnd(size_t amount)
936 {
937   this->_client_rwnd -= amount;
938   if (this->_client_rwnd < 0) {
939     return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
940   } else {
941     return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
942   }
943 }
944 
945 ssize_t
server_rwnd() const946 Http2Stream::server_rwnd() const
947 {
948   return this->_server_rwnd;
949 }
950 
951 Http2ErrorCode
increment_server_rwnd(size_t amount)952 Http2Stream::increment_server_rwnd(size_t amount)
953 {
954   this->_server_rwnd += amount;
955   return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
956 }
957 
958 Http2ErrorCode
decrement_server_rwnd(size_t amount)959 Http2Stream::decrement_server_rwnd(size_t amount)
960 {
961   this->_server_rwnd -= amount;
962   if (this->_server_rwnd < 0) {
963     return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
964   } else {
965     return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
966   }
967 }
968 
969 bool
_switch_thread_if_not_on_right_thread(int event,void * edata)970 Http2Stream::_switch_thread_if_not_on_right_thread(int event, void *edata)
971 {
972   if (this->_thread != this_ethread()) {
973     SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
974     if (cross_thread_event == nullptr) {
975       // Send to the right thread
976       cross_thread_event = this->_thread->schedule_imm(this, event, edata);
977     }
978     return false;
979   }
980   return true;
981 }
982 
983 int
get_transaction_priority_weight() const984 Http2Stream::get_transaction_priority_weight() const
985 {
986   return priority_node ? priority_node->weight : 0;
987 }
988 
989 int
get_transaction_priority_dependence() const990 Http2Stream::get_transaction_priority_dependence() const
991 {
992   if (!priority_node) {
993     return -1;
994   } else {
995     return priority_node->parent ? priority_node->parent->id : 0;
996   }
997 }
998 
999 int64_t
read_vio_read_avail()1000 Http2Stream::read_vio_read_avail()
1001 {
1002   MIOBuffer *writer = this->read_vio.get_writer();
1003   if (writer) {
1004     return writer->max_read_avail();
1005   }
1006 
1007   return 0;
1008 }
1009