xref: /trafficserver/proxy/http/HttpTunnel.cc (revision ba868d88)
1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26    HttpTunnel.cc
27 
28    Description:
29 
30 
31 ****************************************************************************/
32 
33 #include "tscore/ink_config.h"
34 #include "HttpConfig.h"
35 #include "HttpTunnel.h"
36 #include "HttpSM.h"
37 #include "HttpDebugNames.h"
38 #include "tscore/ParseRules.h"
39 
40 static const int min_block_transfer_bytes = 256;
41 static const char *const CHUNK_HEADER_FMT = "%" PRIx64 "\r\n";
42 // This should be as small as possible because it will only hold the
43 // header and trailer per chunk - the chunk body will be a reference to
44 // a block in the input stream.
45 static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
46 
ChunkedHandler()47 ChunkedHandler::ChunkedHandler() : max_chunk_size(DEFAULT_MAX_CHUNK_SIZE) {}
48 
49 void
init(IOBufferReader * buffer_in,HttpTunnelProducer * p)50 ChunkedHandler::init(IOBufferReader *buffer_in, HttpTunnelProducer *p)
51 {
52   if (p->do_chunking) {
53     init_by_action(buffer_in, ACTION_DOCHUNK);
54   } else if (p->do_dechunking) {
55     init_by_action(buffer_in, ACTION_DECHUNK);
56   } else {
57     init_by_action(buffer_in, ACTION_PASSTHRU);
58   }
59   return;
60 }
61 
62 void
init_by_action(IOBufferReader * buffer_in,Action action)63 ChunkedHandler::init_by_action(IOBufferReader *buffer_in, Action action)
64 {
65   running_sum    = 0;
66   num_digits     = 0;
67   cur_chunk_size = 0;
68   bytes_left     = 0;
69   truncation     = false;
70   this->action   = action;
71 
72   switch (action) {
73   case ACTION_DOCHUNK:
74     dechunked_reader                   = buffer_in->mbuf->clone_reader(buffer_in);
75     dechunked_reader->mbuf->water_mark = min_block_transfer_bytes;
76     chunked_buffer                     = new_MIOBuffer(CHUNK_IOBUFFER_SIZE_INDEX);
77     chunked_size                       = 0;
78     break;
79   case ACTION_DECHUNK:
80     chunked_reader   = buffer_in->mbuf->clone_reader(buffer_in);
81     dechunked_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_256);
82     dechunked_size   = 0;
83     break;
84   case ACTION_PASSTHRU:
85     chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
86     break;
87   default:
88     ink_release_assert(!"Unknown action");
89   }
90 
91   return;
92 }
93 
94 void
clear()95 ChunkedHandler::clear()
96 {
97   switch (action) {
98   case ACTION_DOCHUNK:
99     free_MIOBuffer(chunked_buffer);
100     break;
101   case ACTION_DECHUNK:
102     free_MIOBuffer(dechunked_buffer);
103     break;
104   case ACTION_PASSTHRU:
105   default:
106     break;
107   }
108 
109   return;
110 }
111 
112 void
set_max_chunk_size(int64_t size)113 ChunkedHandler::set_max_chunk_size(int64_t size)
114 {
115   max_chunk_size       = size ? size : DEFAULT_MAX_CHUNK_SIZE;
116   max_chunk_header_len = snprintf(max_chunk_header, sizeof(max_chunk_header), CHUNK_HEADER_FMT, max_chunk_size);
117 }
118 
119 void
read_size()120 ChunkedHandler::read_size()
121 {
122   int64_t bytes_used;
123   bool done = false;
124 
125   while (chunked_reader->read_avail() > 0 && !done) {
126     const char *tmp   = chunked_reader->start();
127     int64_t data_size = chunked_reader->block_read_avail();
128 
129     ink_assert(data_size > 0);
130     bytes_used = 0;
131 
132     while (data_size > 0) {
133       bytes_used++;
134       if (state == CHUNK_READ_SIZE) {
135         // The http spec says the chunked size is always in hex
136         if (ParseRules::is_hex(*tmp)) {
137           num_digits++;
138           running_sum *= 16;
139 
140           if (ParseRules::is_digit(*tmp)) {
141             running_sum += *tmp - '0';
142           } else {
143             running_sum += ParseRules::ink_tolower(*tmp) - 'a' + 10;
144           }
145         } else {
146           // We are done parsing size
147           if (num_digits == 0 || running_sum < 0) {
148             // Bogus chunk size
149             state = CHUNK_READ_ERROR;
150             done  = true;
151             break;
152           } else {
153             state = CHUNK_READ_SIZE_CRLF; // now look for CRLF
154           }
155         }
156       } else if (state == CHUNK_READ_SIZE_CRLF) { // Scan for a linefeed
157         if (ParseRules::is_lf(*tmp)) {
158           Debug("http_chunk", "read chunk size of %d bytes", running_sum);
159           bytes_left = (cur_chunk_size = running_sum);
160           state      = (running_sum == 0) ? CHUNK_READ_TRAILER_BLANK : CHUNK_READ_CHUNK;
161           done       = true;
162           break;
163         }
164       } else if (state == CHUNK_READ_SIZE_START) {
165         if (ParseRules::is_lf(*tmp)) {
166           running_sum = 0;
167           num_digits  = 0;
168           state       = CHUNK_READ_SIZE;
169         }
170       }
171       tmp++;
172       data_size--;
173     }
174     chunked_reader->consume(bytes_used);
175   }
176 }
177 
178 // int ChunkedHandler::transfer_bytes()
179 //
180 //   Transfer bytes from chunked_reader to dechunked buffer
181 //   Use block reference method when there is a sufficient
182 //   size to move.  Otherwise, uses memcpy method
183 //
184 int64_t
transfer_bytes()185 ChunkedHandler::transfer_bytes()
186 {
187   int64_t block_read_avail, moved, to_move, total_moved = 0;
188 
189   // Handle the case where we are doing chunked passthrough.
190   if (!dechunked_buffer) {
191     moved = std::min(bytes_left, chunked_reader->read_avail());
192     chunked_reader->consume(moved);
193     bytes_left = bytes_left - moved;
194     return moved;
195   }
196 
197   while (bytes_left > 0) {
198     block_read_avail = chunked_reader->block_read_avail();
199 
200     to_move = std::min(bytes_left, block_read_avail);
201     if (to_move <= 0) {
202       break;
203     }
204 
205     if (to_move >= min_block_transfer_bytes) {
206       moved = dechunked_buffer->write(chunked_reader, bytes_left);
207     } else {
208       // Small amount of data available.  We want to copy the
209       // data rather than block reference to prevent the buildup
210       // of too many small blocks which leads to stack overflow
211       // on deallocation
212       moved = dechunked_buffer->write(chunked_reader->start(), to_move);
213     }
214 
215     if (moved > 0) {
216       chunked_reader->consume(moved);
217       bytes_left = bytes_left - moved;
218       dechunked_size += moved;
219       total_moved += moved;
220     } else {
221       break;
222     }
223   }
224   return total_moved;
225 }
226 
227 void
read_chunk()228 ChunkedHandler::read_chunk()
229 {
230   int64_t b = transfer_bytes();
231 
232   ink_assert(bytes_left >= 0);
233   if (bytes_left == 0) {
234     Debug("http_chunk", "completed read of chunk of %" PRId64 " bytes", cur_chunk_size);
235 
236     state = CHUNK_READ_SIZE_START;
237   } else if (bytes_left > 0) {
238     Debug("http_chunk", "read %" PRId64 " bytes of an %" PRId64 " chunk", b, cur_chunk_size);
239   }
240 }
241 
242 void
read_trailer()243 ChunkedHandler::read_trailer()
244 {
245   int64_t bytes_used;
246   bool done = false;
247 
248   while (chunked_reader->is_read_avail_more_than(0) && !done) {
249     const char *tmp   = chunked_reader->start();
250     int64_t data_size = chunked_reader->block_read_avail();
251 
252     ink_assert(data_size > 0);
253     for (bytes_used = 0; data_size > 0; data_size--) {
254       bytes_used++;
255 
256       if (ParseRules::is_cr(*tmp)) {
257         // For a CR to signal we are almost done, the preceding
258         //  part of the line must be blank and next character
259         //  must a LF
260         state = (state == CHUNK_READ_TRAILER_BLANK) ? CHUNK_READ_TRAILER_CR : CHUNK_READ_TRAILER_LINE;
261       } else if (ParseRules::is_lf(*tmp)) {
262         // For a LF to signal we are done reading the
263         //   trailer, the line must have either been blank
264         //   or must have have only had a CR on it
265         if (state == CHUNK_READ_TRAILER_CR || state == CHUNK_READ_TRAILER_BLANK) {
266           state = CHUNK_READ_DONE;
267           Debug("http_chunk", "completed read of trailers");
268           done = true;
269           break;
270         } else {
271           // A LF that does not terminate the trailer
272           //  indicates a new line
273           state = CHUNK_READ_TRAILER_BLANK;
274         }
275       } else {
276         // A character that is not a CR or LF indicates
277         //  the we are parsing a line of the trailer
278         state = CHUNK_READ_TRAILER_LINE;
279       }
280       tmp++;
281     }
282     chunked_reader->consume(bytes_used);
283   }
284 }
285 
286 bool
process_chunked_content()287 ChunkedHandler::process_chunked_content()
288 {
289   while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
290     switch (state) {
291     case CHUNK_READ_SIZE:
292     case CHUNK_READ_SIZE_CRLF:
293     case CHUNK_READ_SIZE_START:
294       read_size();
295       break;
296     case CHUNK_READ_CHUNK:
297       read_chunk();
298       break;
299     case CHUNK_READ_TRAILER_BLANK:
300     case CHUNK_READ_TRAILER_CR:
301     case CHUNK_READ_TRAILER_LINE:
302       read_trailer();
303       break;
304     case CHUNK_FLOW_CONTROL:
305       return false;
306     default:
307       ink_release_assert(0);
308       break;
309     }
310   }
311   return (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
312 }
313 
314 bool
generate_chunked_content()315 ChunkedHandler::generate_chunked_content()
316 {
317   char tmp[16];
318   bool server_done = false;
319   int64_t r_avail;
320 
321   ink_assert(max_chunk_header_len);
322 
323   switch (last_server_event) {
324   case VC_EVENT_EOS:
325   case VC_EVENT_READ_COMPLETE:
326   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
327     server_done = true;
328     break;
329   }
330 
331   while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) {
332     int64_t write_val = std::min(max_chunk_size, r_avail);
333 
334     state = CHUNK_WRITE_CHUNK;
335     Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val);
336 
337     // Output the chunk size.
338     if (write_val != max_chunk_size) {
339       int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
340       chunked_buffer->write(tmp, len);
341       chunked_size += len;
342     } else {
343       chunked_buffer->write(max_chunk_header, max_chunk_header_len);
344       chunked_size += max_chunk_header_len;
345     }
346 
347     // Output the chunk itself.
348     //
349     // BZ# 54395 Note - we really should only do a
350     //   block transfer if there is sizable amount of
351     //   data (like we do for the case where we are
352     //   removing chunked encoding in ChunkedHandler::transfer_bytes()
353     //   However, I want to do this fix with as small a risk
354     //   as possible so I'm leaving this issue alone for
355     //   now
356     //
357     chunked_buffer->write(dechunked_reader, write_val);
358     chunked_size += write_val;
359     dechunked_reader->consume(write_val);
360 
361     // Output the trailing CRLF.
362     chunked_buffer->write("\r\n", 2);
363     chunked_size += 2;
364   }
365 
366   if (server_done) {
367     state = CHUNK_WRITE_DONE;
368 
369     // Add the chunked transfer coding trailer.
370     chunked_buffer->write("0\r\n\r\n", 5);
371     chunked_size += 5;
372     return true;
373   }
374   return false;
375 }
376 
HttpTunnelProducer()377 HttpTunnelProducer::HttpTunnelProducer() : consumer_list() {}
378 
379 uint64_t
backlog(uint64_t limit)380 HttpTunnelProducer::backlog(uint64_t limit)
381 {
382   uint64_t zret = 0;
383   // Calculate the total backlog, the # of bytes inside ATS for this producer.
384   // We go all the way through each chain to the ending sink and take the maximum
385   // over those paths. Do need to be careful about loops which can occur.
386   for (HttpTunnelConsumer *c = consumer_list.head; c; c = c->link.next) {
387     if (c->alive && c->write_vio) {
388       uint64_t n = 0;
389       if (HT_TRANSFORM == c->vc_type) {
390         n += static_cast<TransformVCChain *>(c->vc)->backlog(limit);
391       } else {
392         IOBufferReader *r = c->write_vio->get_reader();
393         if (r) {
394           n += static_cast<uint64_t>(r->read_avail());
395         }
396       }
397       if (n >= limit) {
398         return n;
399       }
400 
401       if (!c->is_sink()) {
402         HttpTunnelProducer *dsp = c->self_producer;
403         if (dsp) {
404           n += dsp->backlog();
405         }
406       }
407       if (n >= limit) {
408         return n;
409       }
410       if (n > zret) {
411         zret = n;
412       }
413     }
414   }
415 
416   if (chunked_handler.chunked_reader) {
417     zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
418   }
419 
420   return zret;
421 }
422 
423 /*  We set the producers in a flow chain specifically rather than
424     using a tunnel level variable in order to handle bi-directional
425     tunnels correctly. In such a case the flow control on producers is
426     not related so a single value for the tunnel won't work.
427 */
428 void
set_throttle_src(HttpTunnelProducer * srcp)429 HttpTunnelProducer::set_throttle_src(HttpTunnelProducer *srcp)
430 {
431   HttpTunnelProducer *p  = this;
432   p->flow_control_source = srcp;
433   for (HttpTunnelConsumer *c = consumer_list.head; c; c = c->link.next) {
434     if (!c->is_sink()) {
435       p = c->self_producer;
436       if (p) {
437         p->set_throttle_src(srcp);
438       }
439     }
440   }
441 }
442 
HttpTunnelConsumer()443 HttpTunnelConsumer::HttpTunnelConsumer() : link() {}
444 
HttpTunnel()445 HttpTunnel::HttpTunnel() : Continuation(nullptr) {}
446 
447 void
init(HttpSM * sm_arg,Ptr<ProxyMutex> & amutex)448 HttpTunnel::init(HttpSM *sm_arg, Ptr<ProxyMutex> &amutex)
449 {
450   HttpConfigParams *params = sm_arg->t_state.http_config_param;
451   sm                       = sm_arg;
452   active                   = false;
453   mutex                    = amutex;
454   ink_release_assert(reentrancy_count == 0);
455   SET_HANDLER(&HttpTunnel::main_handler);
456   flow_state.enabled_p = params->oride.flow_control_enabled;
457   if (params->oride.flow_low_water_mark > 0) {
458     flow_state.low_water = params->oride.flow_low_water_mark;
459   }
460   if (params->oride.flow_high_water_mark > 0) {
461     flow_state.high_water = params->oride.flow_high_water_mark;
462   }
463   // This should always be true, we handled default cases back in HttpConfig::reconfigure()
464   ink_assert(flow_state.low_water <= flow_state.high_water);
465 }
466 
467 void
reset()468 HttpTunnel::reset()
469 {
470   ink_assert(active == false);
471 #ifdef DEBUG
472   for (auto &producer : producers) {
473     ink_assert(producer.alive == false);
474   }
475   for (auto &consumer : consumers) {
476     ink_assert(consumer.alive == false);
477   }
478 #endif
479 
480   num_producers = 0;
481   num_consumers = 0;
482   memset(consumers, 0, sizeof(consumers));
483   memset(producers, 0, sizeof(producers));
484 }
485 
486 void
kill_tunnel()487 HttpTunnel::kill_tunnel()
488 {
489   for (auto &producer : producers) {
490     if (producer.vc != nullptr) {
491       chain_abort_all(&producer);
492     }
493     ink_assert(producer.alive == false);
494   }
495   active = false;
496   this->deallocate_buffers();
497   this->reset();
498 }
499 
500 HttpTunnelProducer *
alloc_producer()501 HttpTunnel::alloc_producer()
502 {
503   for (int i = 0; i < MAX_PRODUCERS; ++i) {
504     if (producers[i].vc == nullptr) {
505       num_producers++;
506       ink_assert(num_producers <= MAX_PRODUCERS);
507       return producers + i;
508     }
509   }
510   ink_release_assert(0);
511   return nullptr;
512 }
513 
514 HttpTunnelConsumer *
alloc_consumer()515 HttpTunnel::alloc_consumer()
516 {
517   for (int i = 0; i < MAX_CONSUMERS; i++) {
518     if (consumers[i].vc == nullptr) {
519       num_consumers++;
520       ink_assert(num_consumers <= MAX_CONSUMERS);
521       return consumers + i;
522     }
523   }
524   ink_release_assert(0);
525   return nullptr;
526 }
527 
528 int
deallocate_buffers()529 HttpTunnel::deallocate_buffers()
530 {
531   int num = 0;
532   ink_release_assert(active == false);
533   for (auto &producer : producers) {
534     if (producer.read_buffer != nullptr) {
535       ink_assert(producer.vc != nullptr);
536       free_MIOBuffer(producer.read_buffer);
537       producer.read_buffer  = nullptr;
538       producer.buffer_start = nullptr;
539       num++;
540     }
541 
542     if (producer.chunked_handler.dechunked_buffer != nullptr) {
543       ink_assert(producer.vc != nullptr);
544       free_MIOBuffer(producer.chunked_handler.dechunked_buffer);
545       producer.chunked_handler.dechunked_buffer = nullptr;
546       num++;
547     }
548 
549     if (producer.chunked_handler.chunked_buffer != nullptr) {
550       ink_assert(producer.vc != nullptr);
551       free_MIOBuffer(producer.chunked_handler.chunked_buffer);
552       producer.chunked_handler.chunked_buffer = nullptr;
553       num++;
554     }
555     producer.chunked_handler.max_chunk_header_len = 0;
556   }
557   return num;
558 }
559 
560 void
set_producer_chunking_action(HttpTunnelProducer * p,int64_t skip_bytes,TunnelChunkingAction_t action)561 HttpTunnel::set_producer_chunking_action(HttpTunnelProducer *p, int64_t skip_bytes, TunnelChunkingAction_t action)
562 {
563   p->chunked_handler.skip_bytes = skip_bytes;
564   p->chunking_action            = action;
565 
566   switch (action) {
567   case TCA_CHUNK_CONTENT:
568     p->chunked_handler.state = p->chunked_handler.CHUNK_WRITE_CHUNK;
569     break;
570   case TCA_DECHUNK_CONTENT:
571   case TCA_PASSTHRU_CHUNKED_CONTENT:
572     p->chunked_handler.state = p->chunked_handler.CHUNK_READ_SIZE;
573     break;
574   default:
575     break;
576   };
577 }
578 
579 void
set_producer_chunking_size(HttpTunnelProducer * p,int64_t size)580 HttpTunnel::set_producer_chunking_size(HttpTunnelProducer *p, int64_t size)
581 {
582   p->chunked_handler.set_max_chunk_size(size);
583 }
584 
585 // HttpTunnelProducer* HttpTunnel::add_producer
586 //
587 //   Adds a new producer to the tunnel
588 //
589 HttpTunnelProducer *
add_producer(VConnection * vc,int64_t nbytes_arg,IOBufferReader * reader_start,HttpProducerHandler sm_handler,HttpTunnelType_t vc_type,const char * name_arg)590 HttpTunnel::add_producer(VConnection *vc, int64_t nbytes_arg, IOBufferReader *reader_start, HttpProducerHandler sm_handler,
591                          HttpTunnelType_t vc_type, const char *name_arg)
592 {
593   HttpTunnelProducer *p;
594 
595   Debug("http_tunnel", "[%" PRId64 "] adding producer '%s'", sm->sm_id, name_arg);
596 
597   ink_assert(reader_start->mbuf);
598   if ((p = alloc_producer()) != nullptr) {
599     p->vc              = vc;
600     p->nbytes          = nbytes_arg;
601     p->buffer_start    = reader_start;
602     p->read_buffer     = reader_start->mbuf;
603     p->vc_handler      = sm_handler;
604     p->vc_type         = vc_type;
605     p->name            = name_arg;
606     p->chunking_action = TCA_PASSTHRU_DECHUNKED_CONTENT;
607 
608     p->do_chunking         = false;
609     p->do_dechunking       = false;
610     p->do_chunked_passthru = false;
611 
612     p->init_bytes_done = reader_start->read_avail();
613     if (p->nbytes < 0) {
614       p->ntodo = p->nbytes;
615     } else { // The byte count given us includes bytes
616       //  that already may be in the buffer.
617       //  ntodo represents the number of bytes
618       //  the tunneling mechanism needs to read
619       //  for the producer
620       p->ntodo = p->nbytes - p->init_bytes_done;
621       ink_assert(p->ntodo >= 0);
622     }
623 
624     // We are static, the producer is never "alive"
625     //   It just has data in the buffer
626     if (vc == HTTP_TUNNEL_STATIC_PRODUCER) {
627       ink_assert(p->ntodo == 0);
628       p->alive        = false;
629       p->read_success = true;
630     } else {
631       p->alive = true;
632     }
633   }
634   return p;
635 }
636 
637 // void HttpTunnel::add_consumer
638 //
639 //    Adds a new consumer to the tunnel.  The producer must
640 //    be specified and already added to the tunnel.  Attaches
641 //    the new consumer to the entry for the existing producer
642 //
643 //    Returns true if the consumer successfully added.  Returns
644 //    false if the consumer was not added because the source failed
645 //
646 HttpTunnelConsumer *
add_consumer(VConnection * vc,VConnection * producer,HttpConsumerHandler sm_handler,HttpTunnelType_t vc_type,const char * name_arg,int64_t skip_bytes)647 HttpTunnel::add_consumer(VConnection *vc, VConnection *producer, HttpConsumerHandler sm_handler, HttpTunnelType_t vc_type,
648                          const char *name_arg, int64_t skip_bytes)
649 {
650   Debug("http_tunnel", "[%" PRId64 "] adding consumer '%s'", sm->sm_id, name_arg);
651 
652   // Find the producer entry
653   HttpTunnelProducer *p = get_producer(producer);
654   ink_release_assert(p);
655 
656   // Check to see if the producer terminated
657   //  without sending all of its data
658   if (p->alive == false && p->read_success == false) {
659     Debug("http_tunnel", "[%" PRId64 "] consumer '%s' not added due to producer failure", sm->sm_id, name_arg);
660     return nullptr;
661   }
662   // Initialize the consumer structure
663   HttpTunnelConsumer *c = alloc_consumer();
664   c->producer           = p;
665   c->vc                 = vc;
666   c->alive              = true;
667   c->skip_bytes         = skip_bytes;
668   c->vc_handler         = sm_handler;
669   c->vc_type            = vc_type;
670   c->name               = name_arg;
671 
672   // Register the consumer with the producer
673   p->consumer_list.push(c);
674   p->num_consumers++;
675 
676   return c;
677 }
678 
679 void
chain(HttpTunnelConsumer * c,HttpTunnelProducer * p)680 HttpTunnel::chain(HttpTunnelConsumer *c, HttpTunnelProducer *p)
681 {
682   p->self_consumer = c;
683   c->self_producer = p;
684   // If the flow is already throttled update the chained producer.
685   if (c->producer->is_throttled()) {
686     p->set_throttle_src(c->producer->flow_control_source);
687   }
688 }
689 
690 // void HttpTunnel::tunnel_run()
691 //
692 //    Makes the tunnel go
693 //
694 void
tunnel_run(HttpTunnelProducer * p_arg)695 HttpTunnel::tunnel_run(HttpTunnelProducer *p_arg)
696 {
697   Debug("http_tunnel", "tunnel_run started, p_arg is %s", p_arg ? "provided" : "NULL");
698   if (p_arg) {
699     producer_run(p_arg);
700   } else {
701     HttpTunnelProducer *p;
702 
703     ink_assert(active == false);
704 
705     for (int i = 0; i < MAX_PRODUCERS; ++i) {
706       p = producers + i;
707       if (p->vc != nullptr && (p->alive || (p->vc_type == HT_STATIC && p->buffer_start != nullptr))) {
708         producer_run(p);
709       }
710     }
711   }
712 
713   // It is possible that there was nothing to do
714   //   due to a all transfers being zero length
715   //   If that is the case, call the state machine
716   //   back to say we are done
717   if (!is_tunnel_alive()) {
718     active = false;
719     sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
720   }
721 }
722 
723 void
producer_run(HttpTunnelProducer * p)724 HttpTunnel::producer_run(HttpTunnelProducer *p)
725 {
726   // Determine whether the producer has a cache-write consumer,
727   // since all chunked content read by the producer gets dechunked
728   // prior to being written into the cache.
729   HttpTunnelConsumer *c, *cache_write_consumer = nullptr;
730   bool transform_consumer = false;
731 
732   for (c = p->consumer_list.head; c; c = c->link.next) {
733     if (c->vc_type == HT_CACHE_WRITE) {
734       cache_write_consumer = c;
735       break;
736     }
737   }
738 
739   // bz57413
740   for (c = p->consumer_list.head; c; c = c->link.next) {
741     if (c->vc_type == HT_TRANSFORM) {
742       transform_consumer = true;
743       break;
744     }
745   }
746 
747   // Determine whether the producer is to perform chunking,
748   // dechunking, or chunked-passthough of the incoming response.
749   TunnelChunkingAction_t action = p->chunking_action;
750 
751   // [bug 2579251] static producers won't have handler set
752   if (p->vc != HTTP_TUNNEL_STATIC_PRODUCER) {
753     if (action == TCA_CHUNK_CONTENT) {
754       p->do_chunking = true;
755     } else if (action == TCA_DECHUNK_CONTENT) {
756       p->do_dechunking = true;
757     } else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
758       p->do_chunked_passthru = true;
759 
760       // Dechunk the chunked content into the cache.
761       if (cache_write_consumer != nullptr) {
762         p->do_dechunking = true;
763       }
764     }
765   }
766 
767   int64_t consumer_n;
768   int64_t producer_n;
769 
770   ink_assert(p->vc != nullptr);
771   active = true;
772 
773   IOBufferReader *chunked_buffer_start = nullptr, *dechunked_buffer_start = nullptr;
774   if (p->do_chunking || p->do_dechunking || p->do_chunked_passthru) {
775     p->chunked_handler.init(p->buffer_start, p);
776 
777     // Copy the header into the chunked/dechunked buffers.
778     if (p->do_chunking) {
779       // initialize a reader to chunked buffer start before writing to keep ref count
780       chunked_buffer_start = p->chunked_handler.chunked_buffer->alloc_reader();
781       p->chunked_handler.chunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
782     }
783     if (p->do_dechunking) {
784       // bz57413
785       Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64 "",
786             p->chunked_handler.chunked_reader->read_avail());
787 
788       // initialize a reader to dechunked buffer start before writing to keep ref count
789       dechunked_buffer_start = p->chunked_handler.dechunked_buffer->alloc_reader();
790 
791       // If there is no transformation then add the header to the buffer, else the
792       // client already has got the header from us, no need for it in the buffer.
793       if (!transform_consumer) {
794         p->chunked_handler.dechunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
795 
796         Debug("http_tunnel", "[producer_run] do_dechunking::Copied header of size %" PRId64 "", p->chunked_handler.skip_bytes);
797       }
798     }
799   }
800 
801   int64_t read_start_pos = 0;
802   if (p->vc_type == HT_CACHE_READ && sm->t_state.range_setup == HttpTransact::RANGE_NOT_TRANSFORM_REQUESTED) {
803     ink_assert(sm->t_state.num_range_fields == 1); // we current just support only one range entry
804     read_start_pos = sm->t_state.ranges[0]._start;
805     producer_n     = (sm->t_state.ranges[0]._end - sm->t_state.ranges[0]._start) + 1;
806     consumer_n     = (producer_n + sm->client_response_hdr_bytes);
807   } else if (p->nbytes >= 0) {
808     consumer_n = p->nbytes;
809     producer_n = p->ntodo;
810   } else {
811     consumer_n = (producer_n = INT64_MAX);
812   }
813 
814   // At least set up the consumer readers first so the data
815   // doesn't disappear out from under the tunnel
816   for (c = p->consumer_list.head; c; c = c->link.next) {
817     // Create a reader for each consumer.  The reader allows
818     // us to implement skip bytes
819     if (c->vc_type == HT_CACHE_WRITE) {
820       switch (action) {
821       case TCA_CHUNK_CONTENT:
822       case TCA_PASSTHRU_DECHUNKED_CONTENT:
823         c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
824         break;
825       case TCA_DECHUNK_CONTENT:
826       case TCA_PASSTHRU_CHUNKED_CONTENT:
827         c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
828         break;
829       default:
830         break;
831       }
832     }
833     // Non-cache consumers.
834     else if (action == TCA_CHUNK_CONTENT) {
835       c->buffer_reader = p->chunked_handler.chunked_buffer->clone_reader(chunked_buffer_start);
836     } else if (action == TCA_DECHUNK_CONTENT) {
837       c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
838     } else {
839       c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
840     }
841 
842     // Consume bytes of the reader if we skipping bytes
843     if (c->skip_bytes > 0) {
844       ink_assert(c->skip_bytes <= c->buffer_reader->read_avail());
845       c->buffer_reader->consume(c->skip_bytes);
846     }
847   }
848 
849   // YTS Team, yamsat Plugin
850   // Allocate and copy partial POST data to buffers. Check for the various parameters
851   // including the maximum configured post data size
852   if ((p->vc_type == HT_BUFFER_READ && sm->is_postbuf_valid()) ||
853       (p->alive && sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection && p->vc_type == HT_HTTP_CLIENT)) {
854     Debug("http_redirect", "[HttpTunnel::producer_run] client post: %" PRId64 " max size: %" PRId64 "",
855           p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
856 
857     // (note that since we are not dechunking POST, this is the chunked size if chunked)
858     if (p->buffer_start->read_avail() > HttpConfig::m_master.post_copy_size) {
859       Warning("http_redirect, [HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64 " limit=%" PRId64 "",
860               p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
861       sm->disable_redirect();
862       if (p->vc_type == HT_BUFFER_READ) {
863         producer_handler(VC_EVENT_ERROR, p);
864         return;
865       }
866     } else {
867       sm->postbuf_copy_partial_data();
868     }
869   } // end of added logic for partial POST
870 
871   if (p->do_chunking) {
872     // remove the chunked reader marker so that it doesn't act like a buffer guard
873     p->chunked_handler.chunked_buffer->dealloc_reader(chunked_buffer_start);
874     p->chunked_handler.dechunked_reader->consume(p->chunked_handler.skip_bytes);
875 
876     // If there is data to process in the buffer, do it now
877     producer_handler(VC_EVENT_READ_READY, p);
878   } else if (p->do_dechunking || p->do_chunked_passthru) {
879     // remove the dechunked reader marker so that it doesn't act like a buffer guard
880     if (p->do_dechunking && dechunked_buffer_start) {
881       p->chunked_handler.dechunked_buffer->dealloc_reader(dechunked_buffer_start);
882     }
883 
884     // bz57413
885     // If there is no transformation plugin, then we didn't add the header, hence no need to consume it
886     Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64 "",
887           p->chunked_handler.chunked_reader->read_avail());
888     if (!transform_consumer && (p->chunked_handler.chunked_reader->read_avail() >= p->chunked_handler.skip_bytes)) {
889       p->chunked_handler.chunked_reader->consume(p->chunked_handler.skip_bytes);
890       Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.skip_bytes = %" PRId64 "",
891             p->chunked_handler.skip_bytes);
892     }
893     // if(p->chunked_handler.chunked_reader->read_avail() > 0)
894     // p->chunked_handler.chunked_reader->consume(
895     // p->chunked_handler.skip_bytes);
896 
897     producer_handler(VC_EVENT_READ_READY, p);
898     if (sm->get_postbuf_done() && p->vc_type == HT_HTTP_CLIENT) { // read_avail() == 0
899       // [bug 2579251]
900       // Ugh, this is horrible but in the redirect case they are running a the tunnel again with the
901       // now closed/empty producer to trigger PRECOMPLETE.  If the POST was chunked, producer_n is set
902       // (incorrectly) to INT64_MAX.  It needs to be set to 0 to prevent triggering another read.
903       producer_n = 0;
904     }
905   }
906   for (c = p->consumer_list.head; c; c = c->link.next) {
907     int64_t c_write = consumer_n;
908 
909     // Don't bother to set up the consumer if it is dead
910     if (!c->alive) {
911       continue;
912     }
913 
914     if (!p->alive) {
915       // Adjust the amount of chunked data to write if the only data was in the initial read
916       // The amount to write in some cases is dependent on the type of the consumer, so this
917       // value must be computed for each consumer
918       c_write = final_consumer_bytes_to_write(p, c);
919     } else {
920       // INKqa05109 - if we don't know the length leave it at
921       //  INT64_MAX or else the cache may bounce the write
922       //  because it thinks the document is too big.  INT64_MAX
923       //  is a special case for the max document size code
924       //  in the cache
925       if (c_write != INT64_MAX) {
926         c_write -= c->skip_bytes;
927       }
928       // Fix for problems with not chunked content being chunked and
929       // not sending the entire data.  The content length grows when
930       // it is being chunked.
931       if (p->do_chunking == true) {
932         c_write = INT64_MAX;
933       }
934     }
935 
936     if (c_write == 0) {
937       // Nothing to do, call back the cleanup handlers
938       c->write_vio = nullptr;
939       consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
940     } else {
941       // In the client half close case, all the data that will be sent
942       // from the client is already in the buffer.  Go ahead and set
943       // the amount to read since we know it.  We will forward the FIN
944       // to the server on VC_EVENT_WRITE_COMPLETE.
945       if (p->vc_type == HT_HTTP_CLIENT) {
946         ProxyTransaction *ua_vc = static_cast<ProxyTransaction *>(p->vc);
947         if (ua_vc->get_half_close_flag()) {
948           int tmp = c->buffer_reader->read_avail();
949           if (tmp < c_write) {
950             c_write = tmp;
951           }
952           p->alive         = false;
953           p->handler_state = HTTP_SM_POST_SUCCESS;
954         }
955       }
956       // Start the writes now that we know we will consume all the initial data
957       c->write_vio = c->vc->do_io_write(this, c_write, c->buffer_reader);
958       ink_assert(c_write > 0);
959     }
960   }
961   if (p->alive) {
962     ink_assert(producer_n >= 0);
963 
964     if (producer_n == 0) {
965       // Everything is already in the buffer so mark the producer as done.  We need to notify
966       // state machine that everything is done.  We use a special event to say the producers is
967       // done but we didn't do anything
968       p->alive         = false;
969       p->read_success  = true;
970       p->handler_state = HTTP_SM_POST_SUCCESS;
971       Debug("http_tunnel", "[%" PRId64 "] [tunnel_run] producer already done", sm->sm_id);
972       producer_handler(HTTP_TUNNEL_EVENT_PRECOMPLETE, p);
973     } else {
974       if (read_start_pos > 0) {
975         p->read_vio = ((CacheVC *)p->vc)->do_io_pread(this, producer_n, p->read_buffer, read_start_pos);
976       } else {
977         p->read_vio = p->vc->do_io_read(this, producer_n, p->read_buffer);
978       }
979     }
980   }
981 
982   // Now that the tunnel has started, we must remove producer's reader so
983   // that it doesn't act like a buffer guard
984   if (p->read_buffer && p->buffer_start) {
985     p->read_buffer->dealloc_reader(p->buffer_start);
986   }
987   p->buffer_start = nullptr;
988 }
989 
990 int
producer_handler_dechunked(int event,HttpTunnelProducer * p)991 HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer *p)
992 {
993   ink_assert(p->do_chunking);
994 
995   Debug("http_tunnel", "[%" PRId64 "] producer_handler_dechunked [%s %s]", sm->sm_id, p->name,
996         HttpDebugNames::get_event_name(event));
997 
998   // We only interested in translating certain events
999   switch (event) {
1000   case VC_EVENT_READ_READY:
1001   case VC_EVENT_READ_COMPLETE:
1002   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1003   case VC_EVENT_EOS:
1004     p->last_event = p->chunked_handler.last_server_event = event;
1005     if (p->chunked_handler.generate_chunked_content()) { // We are done, make sure the consumer is activated
1006       HttpTunnelConsumer *c;
1007       for (c = p->consumer_list.head; c; c = c->link.next) {
1008         if (c->alive) {
1009           c->write_vio->nbytes = p->chunked_handler.chunked_size;
1010           // consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
1011         }
1012       }
1013     }
1014     break;
1015   };
1016   // Since we will consume all the data if the server is actually finished
1017   //   we don't have to translate events like we do in the
1018   //   case producer_handler_chunked()
1019   return event;
1020 }
1021 
1022 // int HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer* p)
1023 //
1024 //   Handles events from chunked producers.  It calls the chunking handlers
1025 //    if appropriate and then translates the event we got into a suitable
1026 //    event to represent the unchunked state, and does chunked bookeeping
1027 //
1028 int
producer_handler_chunked(int event,HttpTunnelProducer * p)1029 HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer *p)
1030 {
1031   ink_assert(p->do_dechunking || p->do_chunked_passthru);
1032 
1033   Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
1034 
1035   // We only interested in translating certain events
1036   switch (event) {
1037   case VC_EVENT_READ_READY:
1038   case VC_EVENT_READ_COMPLETE:
1039   case VC_EVENT_INACTIVITY_TIMEOUT:
1040   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1041   case VC_EVENT_EOS:
1042     break;
1043   default:
1044     return event;
1045   }
1046 
1047   p->last_event = p->chunked_handler.last_server_event = event;
1048   bool done                                            = p->chunked_handler.process_chunked_content();
1049 
1050   // If we couldn't understand the encoding, return
1051   //   an error
1052   if (p->chunked_handler.state == ChunkedHandler::CHUNK_READ_ERROR) {
1053     Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s chunk decoding error]", sm->sm_id, p->name);
1054     p->chunked_handler.truncation = true;
1055     // FIX ME: we return EOS here since it will cause the
1056     //  the client to be reenabled.  ERROR makes more
1057     //  sense but no reenables follow
1058     return VC_EVENT_EOS;
1059   }
1060 
1061   switch (event) {
1062   case VC_EVENT_READ_READY:
1063     if (done) {
1064       return VC_EVENT_READ_COMPLETE;
1065     }
1066     break;
1067   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1068   case VC_EVENT_EOS:
1069   case VC_EVENT_READ_COMPLETE:
1070   case VC_EVENT_INACTIVITY_TIMEOUT:
1071     if (!done) {
1072       p->chunked_handler.truncation = true;
1073     }
1074     break;
1075   }
1076 
1077   return event;
1078 }
1079 
1080 //
1081 // bool HttpTunnel::producer_handler(int event, HttpTunnelProducer* p)
1082 //
1083 //   Handles events from producers.
1084 //
1085 //   If the event is interesting only to the tunnel, this
1086 //    handler takes all necessary actions and returns false
1087 //    If the event is interesting to the state_machine,
1088 //    it calls back the state machine and returns true
1089 //
1090 //
1091 bool
producer_handler(int event,HttpTunnelProducer * p)1092 HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
1093 {
1094   HttpTunnelConsumer *c;
1095   HttpProducerHandler jump_point;
1096   bool sm_callback = false;
1097 
1098   Debug("http_tunnel", "[%" PRId64 "] producer_handler [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
1099 
1100   // Handle chunking/dechunking/chunked-passthrough if necessary.
1101   if (p->do_chunking) {
1102     event = producer_handler_dechunked(event, p);
1103 
1104     // If we were in PRECOMPLETE when this function was called
1105     // and we are doing chunking, then we just wrote the last
1106     // chunk in the the function call above.  We are done with the
1107     // tunnel.
1108     if (event == HTTP_TUNNEL_EVENT_PRECOMPLETE) {
1109       event = VC_EVENT_EOS;
1110     }
1111   } else if (p->do_dechunking || p->do_chunked_passthru) {
1112     event = producer_handler_chunked(event, p);
1113   } else {
1114     p->last_event = event;
1115   }
1116 
1117   // YTS Team, yamsat Plugin
1118   // Copy partial POST data to buffers. Check for the various parameters including
1119   // the maximum configured post data size
1120   if ((p->vc_type == HT_BUFFER_READ && sm->is_postbuf_valid()) ||
1121       (sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection &&
1122        (event == VC_EVENT_READ_READY || event == VC_EVENT_READ_COMPLETE) && p->vc_type == HT_HTTP_CLIENT)) {
1123     Debug("http_redirect", "[HttpTunnel::producer_handler] [%s %s]", p->name, HttpDebugNames::get_event_name(event));
1124 
1125     if ((sm->postbuf_buffer_avail() + sm->postbuf_reader_avail()) > HttpConfig::m_master.post_copy_size) {
1126       Warning("http_redirect, [HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64
1127               " reader_avail=%" PRId64 " limit=%" PRId64 "",
1128               sm->postbuf_buffer_avail(), sm->postbuf_reader_avail(), HttpConfig::m_master.post_copy_size);
1129       sm->disable_redirect();
1130       if (p->vc_type == HT_BUFFER_READ) {
1131         event = VC_EVENT_ERROR;
1132       }
1133     } else {
1134       sm->postbuf_copy_partial_data();
1135       if (event == VC_EVENT_READ_COMPLETE || event == HTTP_TUNNEL_EVENT_PRECOMPLETE || event == VC_EVENT_EOS) {
1136         sm->set_postbuf_done(true);
1137       }
1138     }
1139   } // end of added logic for partial copy of POST
1140 
1141   Debug("http_redirect", "[HttpTunnel::producer_handler] enable_redirection: [%d %d %d] event: %d, state: %d", p->alive == true,
1142         sm->enable_redirection, (p->self_consumer && p->self_consumer->alive == true), event, p->chunked_handler.state);
1143 
1144   switch (event) {
1145   case VC_EVENT_READ_READY:
1146     // Data read from producer, reenable consumers
1147     for (c = p->consumer_list.head; c; c = c->link.next) {
1148       if (c->alive && c->write_vio) {
1149         c->write_vio->reenable();
1150       }
1151     }
1152     break;
1153 
1154   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1155     // If the write completes on the stack (as it can for http2), then
1156     // consumer could have called back by this point.  Must treat this as
1157     // a regular read complete (falling through to the following cases).
1158 
1159   case VC_EVENT_READ_COMPLETE:
1160   case VC_EVENT_EOS:
1161     // The producer completed
1162     p->alive = false;
1163     if (p->read_vio) {
1164       p->bytes_read = p->read_vio->ndone;
1165     } else {
1166       // If we are chunked, we can receive the whole document
1167       //   along with the header without knowing it (due to
1168       //   the message length being a property of the encoding)
1169       //   In that case, we won't have done a do_io so there
1170       //   will not be vio
1171       p->bytes_read = 0;
1172     }
1173 
1174     // callback the SM to notify of completion
1175     //  Note: we need to callback the SM before
1176     //  reenabling the consumers as the reenable may
1177     //  make the data visible to the consumer and
1178     //  initiate async I/O operation.  The SM needs to
1179     //  set how much I/O to do before async I/O is
1180     //  initiated
1181     jump_point = p->vc_handler;
1182     (sm->*jump_point)(event, p);
1183     sm_callback = true;
1184     p->update_state_if_not_set(HTTP_SM_POST_SUCCESS);
1185 
1186     // Data read from producer, reenable consumers
1187     for (c = p->consumer_list.head; c; c = c->link.next) {
1188       if (c->alive && c->write_vio) {
1189         c->write_vio->reenable();
1190       }
1191     }
1192     break;
1193 
1194   case VC_EVENT_ERROR:
1195   case VC_EVENT_ACTIVE_TIMEOUT:
1196   case VC_EVENT_INACTIVITY_TIMEOUT:
1197   case HTTP_TUNNEL_EVENT_CONSUMER_DETACH:
1198     if (p->alive) {
1199       p->alive = false;
1200       if (p->read_vio) {
1201         p->bytes_read = p->read_vio->ndone;
1202       } else {
1203         p->bytes_read = 0;
1204       }
1205       // Clear any outstanding reads so they don't
1206       // collide with future tunnel IO's
1207       p->vc->do_io_read(nullptr, 0, nullptr);
1208       // Interesting tunnel event, call SM
1209       jump_point = p->vc_handler;
1210       (sm->*jump_point)(event, p);
1211       sm_callback = true;
1212       // Failure case anyway
1213       p->update_state_if_not_set(HTTP_SM_POST_UA_FAIL);
1214     }
1215     break;
1216 
1217   case VC_EVENT_WRITE_READY:
1218   case VC_EVENT_WRITE_COMPLETE:
1219   default:
1220     // Producers should not get these events
1221     ink_release_assert(0);
1222     break;
1223   }
1224 
1225   return sm_callback;
1226 }
1227 
1228 void
consumer_reenable(HttpTunnelConsumer * c)1229 HttpTunnel::consumer_reenable(HttpTunnelConsumer *c)
1230 {
1231   HttpTunnelProducer *p = c->producer;
1232 
1233   if (p && p->alive && p->read_buffer->write_avail() > 0) {
1234     // Only do flow control if enabled and the producer is an external
1235     // source.  Otherwise disable by making the backlog zero. Because
1236     // the backlog short cuts quit when the value is equal (or
1237     // greater) to the target, we use strict comparison only for
1238     // checking low water, otherwise the flow control can stall out.
1239     uint64_t backlog         = (flow_state.enabled_p && p->is_source()) ? p->backlog(flow_state.high_water) : 0;
1240     HttpTunnelProducer *srcp = p->flow_control_source;
1241 
1242     if (backlog >= flow_state.high_water) {
1243       if (is_debug_tag_set("http_tunnel")) {
1244         Debug("http_tunnel", "Throttle   %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
1245       }
1246       p->throttle(); // p becomes srcp for future calls to this method
1247     } else {
1248       if (srcp && srcp->alive && c->is_sink()) {
1249         // Check if backlog is below low water - note we need to check
1250         // against the source producer, not necessarily the producer
1251         // for this consumer. We don't have to recompute the backlog
1252         // if they are the same because we know low water <= high
1253         // water so the value is sufficiently accurate.
1254         if (srcp != p) {
1255           backlog = srcp->backlog(flow_state.low_water);
1256         }
1257         if (backlog < flow_state.low_water) {
1258           if (is_debug_tag_set("http_tunnel")) {
1259             Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
1260           }
1261           srcp->unthrottle();
1262           if (srcp->read_vio) {
1263             srcp->read_vio->reenable();
1264           }
1265           // Kick source producer to get flow ... well, flowing.
1266           this->producer_handler(VC_EVENT_READ_READY, srcp);
1267         } else {
1268           // We can stall for small thresholds on network sinks because this event happens
1269           // before the actual socket write. So we trap for the buffer becoming empty to
1270           // make sure we get an event to unthrottle after the write.
1271           if (HT_HTTP_CLIENT == c->vc_type) {
1272             NetVConnection *netvc = dynamic_cast<NetVConnection *>(c->write_vio->vc_server);
1273             if (netvc) { // really, this should always be true.
1274               netvc->trapWriteBufferEmpty();
1275             }
1276           }
1277         }
1278       }
1279       if (p->read_vio) {
1280         p->read_vio->reenable();
1281       }
1282     }
1283   }
1284 }
1285 
1286 //
1287 // bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer* p)
1288 //
1289 //   Handles events from consumers.
1290 //
1291 //   If the event is interesting only to the tunnel, this
1292 //    handler takes all necessary actions and returns false
1293 //    If the event is interesting to the state_machine,
1294 //    it calls back the state machine and returns true
1295 //
1296 //
1297 bool
consumer_handler(int event,HttpTunnelConsumer * c)1298 HttpTunnel::consumer_handler(int event, HttpTunnelConsumer *c)
1299 {
1300   bool sm_callback = false;
1301   HttpConsumerHandler jump_point;
1302   HttpTunnelProducer *p = c->producer;
1303 
1304   Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event));
1305 
1306   ink_assert(c->alive == true);
1307 
1308   switch (event) {
1309   case VC_EVENT_WRITE_READY:
1310     this->consumer_reenable(c);
1311     break;
1312 
1313   case VC_EVENT_WRITE_COMPLETE:
1314   case VC_EVENT_EOS:
1315   case VC_EVENT_ERROR:
1316   case VC_EVENT_ACTIVE_TIMEOUT:
1317   case VC_EVENT_INACTIVITY_TIMEOUT:
1318     ink_assert(c->alive);
1319     ink_assert(c->buffer_reader);
1320     c->alive = false;
1321 
1322     c->bytes_written = c->write_vio ? c->write_vio->ndone : 0;
1323 
1324     // Interesting tunnel event, call SM
1325     jump_point = c->vc_handler;
1326     (sm->*jump_point)(event, c);
1327     // Make sure the handler_state is set
1328     // Necessary for post tunnel end processing
1329     if (c->producer && c->producer->handler_state == 0) {
1330       if (event == VC_EVENT_WRITE_COMPLETE) {
1331         c->producer->handler_state = HTTP_SM_POST_SUCCESS;
1332       } else if (c->vc_type == HT_HTTP_SERVER) {
1333         c->producer->handler_state = HTTP_SM_POST_UA_FAIL;
1334       } else if (c->vc_type == HT_HTTP_CLIENT) {
1335         c->producer->handler_state = HTTP_SM_POST_SERVER_FAIL;
1336       }
1337     }
1338     sm_callback = true;
1339 
1340     // Deallocate the reader after calling back the sm
1341     //  because buffer problems are easier to debug
1342     //  in the sm when the reader is still valid
1343     if (c->buffer_reader) {
1344       c->buffer_reader->mbuf->dealloc_reader(c->buffer_reader);
1345       c->buffer_reader = nullptr;
1346     }
1347 
1348     // Since we removed a consumer, it may now be
1349     //   possible to put more stuff in the buffer
1350     // Note: we reenable only after calling back
1351     //    the SM since the reenabling has the side effect
1352     //    updating the buffer state for the VConnection
1353     //    that is being reenabled
1354     if (p->alive && p->read_vio && p->read_buffer->write_avail() > 0) {
1355       if (p->is_throttled()) {
1356         this->consumer_reenable(c);
1357       } else {
1358         p->read_vio->reenable();
1359       }
1360     }
1361     // [amc] I don't think this happens but we'll leave a debug trap
1362     // here just in case.
1363     if (p->is_throttled()) {
1364       Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p);
1365     }
1366     break;
1367 
1368   case VC_EVENT_READ_READY:
1369   case VC_EVENT_READ_COMPLETE:
1370   default:
1371     // Consumers should not get these events
1372     ink_release_assert(0);
1373     break;
1374   }
1375 
1376   return sm_callback;
1377 }
1378 
1379 // void HttpTunnel::chain_abort_all(HttpTunnelProducer* p)
1380 //
1381 //    Abort the producer and everyone still alive
1382 //     downstream of the producer
1383 //
1384 void
chain_abort_all(HttpTunnelProducer * p)1385 HttpTunnel::chain_abort_all(HttpTunnelProducer *p)
1386 {
1387   HttpTunnelConsumer *c = p->consumer_list.head;
1388 
1389   while (c) {
1390     if (c->alive) {
1391       c->alive     = false;
1392       c->write_vio = nullptr;
1393       c->vc->do_io_close(EHTTP_ERROR);
1394       update_stats_after_abort(c->vc_type);
1395     }
1396 
1397     if (c->self_producer) {
1398       // Must snip the link before recursively
1399       // freeing to avoid looks introduced by
1400       // blind tunneling
1401       HttpTunnelProducer *selfp = c->self_producer;
1402       c->self_producer          = nullptr;
1403       chain_abort_all(selfp);
1404     }
1405 
1406     c = c->link.next;
1407   }
1408 
1409   if (p->alive) {
1410     p->alive = false;
1411     if (p->read_vio) {
1412       p->bytes_read = p->read_vio->ndone;
1413     }
1414     if (p->self_consumer) {
1415       p->self_consumer->alive = false;
1416     }
1417     p->read_vio = nullptr;
1418     p->vc->do_io_close(EHTTP_ERROR);
1419     update_stats_after_abort(p->vc_type);
1420   }
1421 }
1422 
1423 //
1424 // Determine the number of bytes a consumer should read from a producer
1425 //
1426 int64_t
final_consumer_bytes_to_write(HttpTunnelProducer * p,HttpTunnelConsumer * c)1427 HttpTunnel::final_consumer_bytes_to_write(HttpTunnelProducer *p, HttpTunnelConsumer *c)
1428 {
1429   int64_t total_bytes = 0;
1430   int64_t consumer_n  = 0;
1431   if (p->alive) {
1432     consumer_n = INT64_MAX;
1433   } else {
1434     TunnelChunkingAction_t action = p->chunking_action;
1435     if (c->alive) {
1436       if (c->vc_type == HT_CACHE_WRITE) {
1437         switch (action) {
1438         case TCA_CHUNK_CONTENT:
1439         case TCA_PASSTHRU_DECHUNKED_CONTENT:
1440           total_bytes = p->bytes_read + p->init_bytes_done;
1441           break;
1442         case TCA_DECHUNK_CONTENT:
1443         case TCA_PASSTHRU_CHUNKED_CONTENT:
1444           total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
1445           break;
1446         default:
1447           break;
1448         }
1449       } else if (action == TCA_CHUNK_CONTENT) {
1450         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
1451       } else if (action == TCA_DECHUNK_CONTENT) {
1452         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
1453       } else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
1454         total_bytes = p->bytes_read + p->init_bytes_done;
1455       } else {
1456         total_bytes = p->bytes_read + p->init_bytes_done;
1457       }
1458       consumer_n = total_bytes - c->skip_bytes;
1459     }
1460   }
1461   return consumer_n;
1462 }
1463 
1464 //
1465 // void HttpTunnel::finish_all_internal(HttpTunnelProducer* p)
1466 //
1467 //    Internal function for finishing all consumers.  Takes
1468 //       chain argument about where to finish just immediate
1469 //       consumer or all those downstream
1470 //
1471 void
finish_all_internal(HttpTunnelProducer * p,bool chain)1472 HttpTunnel::finish_all_internal(HttpTunnelProducer *p, bool chain)
1473 {
1474   ink_assert(p->alive == false);
1475   HttpTunnelConsumer *c         = p->consumer_list.head;
1476   int64_t total_bytes           = 0;
1477   TunnelChunkingAction_t action = p->chunking_action;
1478 
1479   if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
1480     // if the only chunked data was in the initial read, make sure we don't consume too much
1481     if (p->bytes_read == 0 && p->buffer_start != nullptr) {
1482       int num_read = p->buffer_start->read_avail() - p->chunked_handler.chunked_reader->read_avail();
1483       if (num_read < p->init_bytes_done) {
1484         p->init_bytes_done = num_read;
1485       }
1486     }
1487   }
1488 
1489   while (c) {
1490     if (c->alive) {
1491       if (c->write_vio) {
1492         // Adjust the number of bytes to write in the case of
1493         // a completed unlimited producer
1494         c->write_vio->nbytes = final_consumer_bytes_to_write(p, c);
1495         ink_assert(c->write_vio->nbytes >= 0);
1496 
1497         if (c->write_vio->nbytes < 0) {
1498           // TODO: Wtf, printf?
1499           fprintf(stderr, "[HttpTunnel::finish_all_internal] ERROR: Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n",
1500                   static_cast<int64_t>(total_bytes - c->skip_bytes));
1501         }
1502       }
1503 
1504       if (chain == true && c->self_producer) {
1505         chain_finish_all(c->self_producer);
1506       }
1507       // The IO Core will not call us back if there
1508       //   is nothing to do.  Check to see if there is
1509       //   nothing to do and take the appripriate
1510       //   action
1511       if (c->write_vio && c->alive && c->write_vio->nbytes == c->write_vio->ndone) {
1512         consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
1513       }
1514     }
1515 
1516     c = c->link.next;
1517   }
1518 }
1519 
1520 // void HttpTunnel::chain_abort_cache_write(HttpProducer* p)
1521 //
1522 //    Terminates all cache writes.  Used to prevent truncated
1523 //     documents from being stored in the cache
1524 //
1525 void
chain_abort_cache_write(HttpTunnelProducer * p)1526 HttpTunnel::chain_abort_cache_write(HttpTunnelProducer *p)
1527 {
1528   HttpTunnelConsumer *c = p->consumer_list.head;
1529 
1530   while (c) {
1531     if (c->alive) {
1532       if (c->vc_type == HT_CACHE_WRITE) {
1533         ink_assert(c->self_producer == nullptr);
1534         c->write_vio = nullptr;
1535         c->vc->do_io_close(EHTTP_ERROR);
1536         c->alive = false;
1537         HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
1538       } else if (c->self_producer) {
1539         chain_abort_cache_write(c->self_producer);
1540       }
1541     }
1542     c = c->link.next;
1543   }
1544 }
1545 
1546 // void HttpTunnel::close_vc(HttpTunnelProducer* p)
1547 //
1548 //    Closes the vc associated with the producer and
1549 //      updates the state of the self_consumer
1550 //
1551 void
close_vc(HttpTunnelProducer * p)1552 HttpTunnel::close_vc(HttpTunnelProducer *p)
1553 {
1554   ink_assert(p->alive == false);
1555   HttpTunnelConsumer *c = p->self_consumer;
1556 
1557   if (c && c->alive) {
1558     c->alive = false;
1559     if (c->write_vio) {
1560       c->bytes_written = c->write_vio->ndone;
1561     }
1562   }
1563 
1564   p->vc->do_io_close();
1565 }
1566 
1567 // void HttpTunnel::close_vc(HttpTunnelConsumer* c)
1568 //
1569 //    Closes the vc associated with the consumer and
1570 //      updates the state of the self_producer
1571 //
1572 void
close_vc(HttpTunnelConsumer * c)1573 HttpTunnel::close_vc(HttpTunnelConsumer *c)
1574 {
1575   ink_assert(c->alive == false);
1576   HttpTunnelProducer *p = c->self_producer;
1577 
1578   if (p && p->alive) {
1579     p->alive = false;
1580     if (p->read_vio) {
1581       p->bytes_read = p->read_vio->ndone;
1582     }
1583   }
1584 
1585   c->vc->do_io_close();
1586 }
1587 
1588 // int HttpTunnel::main_handler(int event, void* data)
1589 //
1590 //   Main handler for the tunnel.  Vectors events
1591 //   based on whether they are from consumers or
1592 //   producers
1593 //
1594 int
main_handler(int event,void * data)1595 HttpTunnel::main_handler(int event, void *data)
1596 {
1597   HttpTunnelProducer *p = nullptr;
1598   HttpTunnelConsumer *c = nullptr;
1599   bool sm_callback      = false;
1600 
1601   ++reentrancy_count;
1602 
1603   ink_assert(sm->magic == HTTP_SM_MAGIC_ALIVE);
1604 
1605   // Find the appropriate entry
1606   if ((p = get_producer(static_cast<VIO *>(data))) != nullptr) {
1607     sm_callback = producer_handler(event, p);
1608   } else {
1609     if ((c = get_consumer(static_cast<VIO *>(data))) != nullptr) {
1610       ink_assert(c->write_vio == (VIO *)data || c->vc == ((VIO *)data)->vc_server);
1611       sm_callback = consumer_handler(event, c);
1612     } else {
1613       internal_error(); // do nothing
1614     }
1615   }
1616 
1617   // We called a vc handler, the tunnel might be
1618   //  finished.  Check to see if there are any remaining
1619   //  VConnections alive.  If not, notifiy the state machine
1620   //
1621   // Don't call out if we are nested
1622   if (call_sm || (sm_callback && !is_tunnel_alive())) {
1623     if (reentrancy_count == 1) {
1624       reentrancy_count = 0;
1625       active           = false;
1626       sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
1627       return EVENT_DONE;
1628     } else {
1629       call_sm = true;
1630     }
1631   }
1632   --reentrancy_count;
1633   return EVENT_CONT;
1634 }
1635 
1636 void
update_stats_after_abort(HttpTunnelType_t t)1637 HttpTunnel::update_stats_after_abort(HttpTunnelType_t t)
1638 {
1639   switch (t) {
1640   case HT_CACHE_READ:
1641   case HT_CACHE_WRITE:
1642     HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
1643     break;
1644   default:
1645     // Handled here:
1646     // HT_HTTP_SERVER, HT_HTTP_CLIENT,
1647     // HT_TRANSFORM, HT_STATIC
1648     break;
1649   };
1650 }
1651 
1652 void
internal_error()1653 HttpTunnel::internal_error()
1654 {
1655 }
1656