xref: /trafficserver/proxy/Transform.cc (revision 4cfd5a73)
1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22 
23   @section thoughts Transform thoughts
24 
25     - Must be able to handle a chain of transformations.
26     - Any transformation in the chain may fail.
27       Failure options:
28         - abort the client (if transformed data already sent)
29         - serve the client the untransformed document
30         - remove the failing transformation from the chain and attempt the transformation again (difficult to do)
31         - never send untransformed document to client if client would not understand it (e.g. a set top box)
32     - Must be able to change response header fields up until the point that TRANSFORM_READ_READY is sent to the user.
33 
34   @section usage Transform usage
35 
36     -# transformProcessor.open (cont, hooks); - returns "tvc", a TransformVConnection if 'hooks != NULL'
37     -# tvc->do_io_write (cont, nbytes, buffer1);
38     -# cont->handleEvent (TRANSFORM_READ_READY, NULL);
39     -# tvc->do_io_read (cont, nbytes, buffer2);
40     -# tvc->do_io_close ();
41 
42   @section visualization Transform visualization
43 
44   @verbatim
45          +----+     +----+     +----+     +----+
46     -IB->| T1 |-B1->| T2 |-B2->| T3 |-B3->| T4 |-OB->
47          +----+     +----+     +----+     +----+
48   @endverbatim
49 
50   Data flows into the first transform in the form of the buffer
51   passed to TransformVConnection::do_io_write (IB). Data flows
52   out of the last transform in the form of the buffer passed to
53   TransformVConnection::do_io_read (OB). Between each transformation is
54   another buffer (B1, B2 and B3).
55 
56   A transformation is a Continuation. The continuation is called with the
57   event TRANSFORM_IO_WRITE to initialize the write and TRANSFORM_IO_READ
58   to initialize the read.
59 
60 */
61 
62 #include "ProxyConfig.h"
63 #include "P_Net.h"
64 #include "TransformInternal.h"
65 #include "HdrUtils.h"
66 #include "Log.h"
67 
68 TransformProcessor transformProcessor;
69 
70 /*-------------------------------------------------------------------------
71   -------------------------------------------------------------------------*/
72 
73 void
start()74 TransformProcessor::start()
75 {
76 }
77 
78 /*-------------------------------------------------------------------------
79   -------------------------------------------------------------------------*/
80 
81 VConnection *
open(Continuation * cont,APIHook * hooks)82 TransformProcessor::open(Continuation *cont, APIHook *hooks)
83 {
84   if (hooks) {
85     return new TransformVConnection(cont, hooks);
86   } else {
87     return nullptr;
88   }
89 }
90 
91 /*-------------------------------------------------------------------------
92   -------------------------------------------------------------------------*/
93 
94 INKVConnInternal *
null_transform(ProxyMutex * mutex)95 TransformProcessor::null_transform(ProxyMutex *mutex)
96 {
97   return new NullTransform(mutex);
98 }
99 
100 /*-------------------------------------------------------------------------
101   -------------------------------------------------------------------------*/
102 
103 INKVConnInternal *
range_transform(ProxyMutex * mut,RangeRecord * ranges,int num_fields,HTTPHdr * transform_resp,const char * content_type,int content_type_len,int64_t content_length)104 TransformProcessor::range_transform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp,
105                                     const char *content_type, int content_type_len, int64_t content_length)
106 {
107   RangeTransform *range_transform =
108     new RangeTransform(mut, ranges, num_fields, transform_resp, content_type, content_type_len, content_length);
109   return range_transform;
110 }
111 
112 /*-------------------------------------------------------------------------
113   -------------------------------------------------------------------------*/
114 
TransformTerminus(TransformVConnection * tvc)115 TransformTerminus::TransformTerminus(TransformVConnection *tvc)
116   : VConnection(tvc->mutex),
117     m_tvc(tvc),
118     m_read_vio(),
119     m_write_vio(),
120     m_event_count(0),
121     m_deletable(0),
122     m_closed(0),
123     m_called_user(0)
124 {
125   SET_HANDLER(&TransformTerminus::handle_event);
126 }
127 
128 #define RETRY()                                                  \
129   if (ink_atomic_increment((int *)&m_event_count, 1) < 0) {      \
130     ink_assert(!"not reached");                                  \
131   }                                                              \
132   eventProcessor.schedule_in(this, HRTIME_MSECONDS(10), ET_NET); \
133   return 0;
134 
135 int
handle_event(int event,void *)136 TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
137 {
138   int val;
139 
140   m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0));
141 
142   val = ink_atomic_increment(&m_event_count, -1);
143 
144   Debug("transform", "[TransformTerminus::handle_event] event_count %d", m_event_count);
145 
146   if (val <= 0) {
147     ink_assert(!"not reached");
148   }
149 
150   m_deletable = m_deletable && (val == 1);
151 
152   if (m_closed != 0 && m_tvc->m_closed != 0) {
153     if (m_deletable) {
154       Debug("transform", "TransformVConnection destroy [0x%lx]", (long)m_tvc);
155       delete m_tvc;
156       return 0;
157     }
158   } else if (m_write_vio.op == VIO::WRITE) {
159     if (m_read_vio.op == VIO::NONE) {
160       if (!m_called_user) {
161         Debug("transform", "TransformVConnection calling user: %d %d [0x%lx] [0x%lx]", m_event_count, event, (long)m_tvc,
162               (long)m_tvc->m_cont);
163 
164         m_called_user = 1;
165         // It is our belief this is safe to pass a reference, i.e. its scope
166         // and locking ought to be safe across the lifetime of the continuation.
167         m_tvc->m_cont->handleEvent(TRANSFORM_READ_READY, (void *)&m_write_vio.nbytes);
168       }
169     } else {
170       int64_t towrite;
171 
172       MUTEX_TRY_LOCK(trylock1, m_write_vio.mutex, this_ethread());
173       if (!trylock1.is_locked()) {
174         RETRY();
175       }
176 
177       MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
178       if (!trylock2.is_locked()) {
179         RETRY();
180       }
181 
182       if (m_closed != 0) {
183         return 0;
184       }
185 
186       if (m_write_vio.op == VIO::NONE) {
187         return 0;
188       }
189 
190       towrite = m_write_vio.ntodo();
191       if (towrite > 0) {
192         if (towrite > m_write_vio.get_reader()->read_avail()) {
193           towrite = m_write_vio.get_reader()->read_avail();
194         }
195         if (towrite > m_read_vio.ntodo()) {
196           towrite = m_read_vio.ntodo();
197         }
198 
199         if (towrite > 0) {
200           m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite);
201           m_read_vio.ndone += towrite;
202 
203           m_write_vio.get_reader()->consume(towrite);
204           m_write_vio.ndone += towrite;
205         }
206       }
207 
208       if (m_write_vio.ntodo() > 0) {
209         if (towrite > 0) {
210           m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
211         }
212       } else {
213         m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
214       }
215 
216       // We could have closed on the write callback
217       if (m_closed != 0 && m_tvc->m_closed != 0) {
218         return 0;
219       }
220 
221       if (m_read_vio.ntodo() > 0) {
222         if (m_write_vio.ntodo() <= 0) {
223           m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
224         } else if (towrite > 0) {
225           m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
226         }
227       } else {
228         m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
229       }
230     }
231   } else {
232     MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
233     if (!trylock2.is_locked()) {
234       RETRY();
235     }
236 
237     if (m_closed != 0) {
238       // The terminus was closed, but the enclosing transform
239       // vconnection wasn't. If the terminus was aborted then we
240       // call the read_vio cont back with VC_EVENT_ERROR. If it
241       // was closed normally then we call it back with
242       // VC_EVENT_EOS. If a read operation hasn't been initiated
243       // yet and we haven't called the user back then we call
244       // the user back instead of the read_vio cont (which won't
245       // exist).
246       if (m_tvc->m_closed == 0) {
247         int ev = (m_closed == TS_VC_CLOSE_ABORT) ? VC_EVENT_ERROR : VC_EVENT_EOS;
248 
249         if (!m_called_user) {
250           m_called_user = 1;
251           m_tvc->m_cont->handleEvent(ev, &m_read_vio);
252         } else {
253           ink_assert(m_read_vio.cont != nullptr);
254           m_read_vio.cont->handleEvent(ev, &m_read_vio);
255         }
256       }
257 
258       return 0;
259     }
260   }
261 
262   return 0;
263 }
264 
265 /*-------------------------------------------------------------------------
266   -------------------------------------------------------------------------*/
267 
268 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)269 TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
270 {
271   m_read_vio.buffer.writer_for(buf);
272   m_read_vio.op = VIO::READ;
273   m_read_vio.set_continuation(c);
274   m_read_vio.nbytes    = nbytes;
275   m_read_vio.ndone     = 0;
276   m_read_vio.vc_server = this;
277 
278   if (ink_atomic_increment(&m_event_count, 1) < 0) {
279     ink_assert(!"not reached");
280   }
281   Debug("transform", "[TransformTerminus::do_io_read] event_count %d", m_event_count);
282 
283   eventProcessor.schedule_imm(this, ET_NET);
284 
285   return &m_read_vio;
286 }
287 
288 /*-------------------------------------------------------------------------
289   -------------------------------------------------------------------------*/
290 
291 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool owner)292 TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
293 {
294   // In the process of eliminating 'owner' mode so asserting against it
295   ink_assert(!owner);
296   m_write_vio.buffer.reader_for(buf);
297   m_write_vio.op = VIO::WRITE;
298   m_write_vio.set_continuation(c);
299   m_write_vio.nbytes    = nbytes;
300   m_write_vio.ndone     = 0;
301   m_write_vio.vc_server = this;
302 
303   if (ink_atomic_increment(&m_event_count, 1) < 0) {
304     ink_assert(!"not reached");
305   }
306   Debug("transform", "[TransformTerminus::do_io_write] event_count %d", m_event_count);
307 
308   eventProcessor.schedule_imm(this, ET_NET);
309 
310   return &m_write_vio;
311 }
312 
313 /*-------------------------------------------------------------------------
314   -------------------------------------------------------------------------*/
315 
316 void
do_io_close(int error)317 TransformTerminus::do_io_close(int error)
318 {
319   if (ink_atomic_increment(&m_event_count, 1) < 0) {
320     ink_assert(!"not reached");
321   }
322 
323   INK_WRITE_MEMORY_BARRIER;
324 
325   if (error != -1) {
326     lerrno   = error;
327     m_closed = TS_VC_CLOSE_ABORT;
328   } else {
329     m_closed = TS_VC_CLOSE_NORMAL;
330   }
331 
332   m_read_vio.op = VIO::NONE;
333   m_read_vio.buffer.clear();
334 
335   m_write_vio.op = VIO::NONE;
336   m_write_vio.buffer.clear();
337 
338   eventProcessor.schedule_imm(this, ET_NET);
339 }
340 
341 /*-------------------------------------------------------------------------
342   -------------------------------------------------------------------------*/
343 
344 void
do_io_shutdown(ShutdownHowTo_t howto)345 TransformTerminus::do_io_shutdown(ShutdownHowTo_t howto)
346 {
347   if ((howto == IO_SHUTDOWN_READ) || (howto == IO_SHUTDOWN_READWRITE)) {
348     m_read_vio.op = VIO::NONE;
349     m_read_vio.buffer.clear();
350   }
351 
352   if ((howto == IO_SHUTDOWN_WRITE) || (howto == IO_SHUTDOWN_READWRITE)) {
353     m_write_vio.op = VIO::NONE;
354     m_write_vio.buffer.clear();
355   }
356 }
357 
358 /*-------------------------------------------------------------------------
359   -------------------------------------------------------------------------*/
360 
361 void
reenable(VIO * vio)362 TransformTerminus::reenable(VIO *vio)
363 {
364   ink_assert((vio == &m_read_vio) || (vio == &m_write_vio));
365 
366   if (m_event_count == 0) {
367     if (ink_atomic_increment(&m_event_count, 1) < 0) {
368       ink_assert(!"not reached");
369     }
370     Debug("transform", "[TransformTerminus::reenable] event_count %d", m_event_count);
371     eventProcessor.schedule_imm(this, ET_NET);
372   } else {
373     Debug("transform", "[TransformTerminus::reenable] skipping due to "
374                        "pending events");
375   }
376 }
377 
378 /*-------------------------------------------------------------------------
379   -------------------------------------------------------------------------*/
380 
TransformVConnection(Continuation * cont,APIHook * hooks)381 TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
382   : TransformVCChain(cont->mutex.get()), m_cont(cont), m_terminus(this), m_closed(0)
383 {
384   INKVConnInternal *xform;
385 
386   SET_HANDLER(&TransformVConnection::handle_event);
387 
388   ink_assert(hooks != nullptr);
389 
390   m_transform = hooks->m_cont;
391   while (hooks->m_link.next) {
392     xform = (INKVConnInternal *)hooks->m_cont;
393     hooks = hooks->m_link.next;
394     xform->do_io_transform(hooks->m_cont);
395   }
396   xform = (INKVConnInternal *)hooks->m_cont;
397   xform->do_io_transform(&m_terminus);
398 
399   Debug("transform", "TransformVConnection create [0x%lx]", (long)this);
400 }
401 
402 /*-------------------------------------------------------------------------
403   -------------------------------------------------------------------------*/
404 
~TransformVConnection()405 TransformVConnection::~TransformVConnection()
406 {
407   // Clear the continuations in terminus VConnections so that
408   //  mutex's get released (INKqa05596)
409   m_terminus.m_read_vio.set_continuation(nullptr);
410   m_terminus.m_write_vio.set_continuation(nullptr);
411   m_terminus.mutex = nullptr;
412   this->mutex      = nullptr;
413 }
414 
415 /*-------------------------------------------------------------------------
416   -------------------------------------------------------------------------*/
417 
418 int
handle_event(int,void *)419 TransformVConnection::handle_event(int /* event ATS_UNUSED */, void * /* edata ATS_UNUSED */)
420 {
421   ink_assert(!"not reached");
422   return 0;
423 }
424 
425 /*-------------------------------------------------------------------------
426   -------------------------------------------------------------------------*/
427 
428 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)429 TransformVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
430 {
431   Debug("transform", "TransformVConnection do_io_read: 0x%lx [0x%lx]", (long)c, (long)this);
432 
433   return m_terminus.do_io_read(c, nbytes, buf);
434 }
435 
436 /*-------------------------------------------------------------------------
437   -------------------------------------------------------------------------*/
438 
439 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool)440 TransformVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool /* owner ATS_UNUSED */)
441 {
442   Debug("transform", "TransformVConnection do_io_write: 0x%lx [0x%lx]", (long)c, (long)this);
443 
444   return m_transform->do_io_write(c, nbytes, buf);
445 }
446 
447 /*-------------------------------------------------------------------------
448   -------------------------------------------------------------------------*/
449 
450 void
do_io_close(int error)451 TransformVConnection::do_io_close(int error)
452 {
453   Debug("transform", "TransformVConnection do_io_close: %d [0x%lx]", error, (long)this);
454 
455   if (m_closed != 0) {
456     return;
457   }
458 
459   if (error != -1) {
460     m_closed = TS_VC_CLOSE_ABORT;
461   } else {
462     m_closed = TS_VC_CLOSE_NORMAL;
463   }
464 
465   m_transform->do_io_close(error);
466   m_transform = nullptr;
467 }
468 
469 /*-------------------------------------------------------------------------
470   -------------------------------------------------------------------------*/
471 
472 void
do_io_shutdown(ShutdownHowTo_t howto)473 TransformVConnection::do_io_shutdown(ShutdownHowTo_t howto)
474 {
475   ink_assert(howto == IO_SHUTDOWN_WRITE);
476 
477   Debug("transform", "TransformVConnection do_io_shutdown: %d [0x%lx]", howto, (long)this);
478 
479   m_transform->do_io_shutdown(howto);
480 }
481 
482 /*-------------------------------------------------------------------------
483   -------------------------------------------------------------------------*/
484 
485 void
reenable(VIO *)486 TransformVConnection::reenable(VIO * /* vio ATS_UNUSED */)
487 {
488   ink_assert(!"not reached");
489 }
490 
491 /*-------------------------------------------------------------------------
492   -------------------------------------------------------------------------*/
493 
494 uint64_t
backlog(uint64_t limit)495 TransformVConnection::backlog(uint64_t limit)
496 {
497   uint64_t b          = 0; // backlog
498   VConnection *raw_vc = m_transform;
499   MIOBuffer *w;
500   while (raw_vc && raw_vc != &m_terminus) {
501     INKVConnInternal *vc = static_cast<INKVConnInternal *>(raw_vc);
502     if (nullptr != (w = vc->m_read_vio.buffer.writer())) {
503       b += w->max_read_avail();
504     }
505     if (b >= limit) {
506       return b;
507     }
508     raw_vc = vc->m_output_vc;
509   }
510   if (nullptr != (w = m_terminus.m_read_vio.buffer.writer())) {
511     b += w->max_read_avail();
512   }
513   if (b >= limit) {
514     return b;
515   }
516 
517   IOBufferReader *r = m_terminus.m_write_vio.get_reader();
518   if (r) {
519     b += r->read_avail();
520   }
521   return b;
522 }
523 
524 /*-------------------------------------------------------------------------
525   -------------------------------------------------------------------------*/
526 
TransformControl()527 TransformControl::TransformControl() : Continuation(new_ProxyMutex()), m_hooks()
528 {
529   SET_HANDLER(&TransformControl::handle_event);
530 
531   m_hooks.append(transformProcessor.null_transform(new_ProxyMutex()));
532 }
533 
534 /*-------------------------------------------------------------------------
535   -------------------------------------------------------------------------*/
536 
537 int
handle_event(int event,void *)538 TransformControl::handle_event(int event, void * /* edata ATS_UNUSED */)
539 {
540   switch (event) {
541   case EVENT_IMMEDIATE: {
542     char *s, *e;
543 
544     ink_assert(m_tvc == nullptr);
545     if (http_global_hooks && http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)) {
546       m_tvc = transformProcessor.open(this, http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK));
547     } else {
548       m_tvc = transformProcessor.open(this, m_hooks.head());
549     }
550     ink_assert(m_tvc != nullptr);
551 
552     m_write_buf = new_MIOBuffer();
553     s           = m_write_buf->end();
554     e           = m_write_buf->buf_end();
555 
556     memset(s, 'a', e - s);
557     m_write_buf->fill(e - s);
558 
559     m_tvc->do_io_write(this, 4 * 1024, m_write_buf->alloc_reader());
560     break;
561   }
562 
563   case TRANSFORM_READ_READY: {
564     MIOBuffer *buf = new_empty_MIOBuffer();
565 
566     m_read_buf = buf->alloc_reader();
567     m_tvc->do_io_read(this, INT64_MAX, buf);
568     break;
569   }
570 
571   case VC_EVENT_READ_COMPLETE:
572   case VC_EVENT_EOS:
573     m_tvc->do_io_close();
574 
575     free_MIOBuffer(m_read_buf->mbuf);
576     m_read_buf = nullptr;
577 
578     free_MIOBuffer(m_write_buf);
579     m_write_buf = nullptr;
580     break;
581 
582   case VC_EVENT_WRITE_COMPLETE:
583     break;
584 
585   default:
586     ink_assert(!"not reached");
587     break;
588   }
589 
590   return 0;
591 }
592 
593 /*-------------------------------------------------------------------------
594   -------------------------------------------------------------------------*/
595 
NullTransform(ProxyMutex * _mutex)596 NullTransform::NullTransform(ProxyMutex *_mutex)
597   : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(_mutex)),
598     m_output_buf(nullptr),
599     m_output_reader(nullptr),
600     m_output_vio(nullptr)
601 {
602   SET_HANDLER(&NullTransform::handle_event);
603 
604   Debug("transform", "NullTransform create [0x%lx]", (long)this);
605 }
606 
607 /*-------------------------------------------------------------------------
608   -------------------------------------------------------------------------*/
609 
~NullTransform()610 NullTransform::~NullTransform()
611 {
612   if (m_output_buf) {
613     free_MIOBuffer(m_output_buf);
614   }
615 }
616 
617 /*-------------------------------------------------------------------------
618   -------------------------------------------------------------------------*/
619 
620 int
handle_event(int event,void * edata)621 NullTransform::handle_event(int event, void *edata)
622 {
623   handle_event_count(event);
624 
625   Debug("transform", "[NullTransform::handle_event] event count %d", m_event_count);
626 
627   if (m_closed) {
628     if (m_deletable) {
629       Debug("transform", "NullTransform destroy: %" PRId64 " [%p]", m_output_vio ? m_output_vio->ndone : 0, this);
630       delete this;
631     }
632   } else {
633     switch (event) {
634     case VC_EVENT_ERROR:
635       m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
636       break;
637     case VC_EVENT_WRITE_COMPLETE:
638       ink_assert(m_output_vio == (VIO *)edata);
639 
640       // The write to the output vconnection completed. This
641       // could only be the case if the data being fed into us
642       // has also completed.
643       ink_assert(m_write_vio.ntodo() == 0);
644 
645       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
646       break;
647     case VC_EVENT_WRITE_READY:
648     default: {
649       int64_t towrite;
650       int64_t avail;
651 
652       ink_assert(m_output_vc != nullptr);
653 
654       if (!m_output_vio) {
655         m_output_buf    = new_empty_MIOBuffer();
656         m_output_reader = m_output_buf->alloc_reader();
657         m_output_vio    = m_output_vc->do_io_write(this, m_write_vio.nbytes, m_output_reader);
658       }
659 
660       MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
661       if (!trylock.is_locked()) {
662         retry(10);
663         return 0;
664       }
665 
666       if (m_closed) {
667         return 0;
668       }
669 
670       if (m_write_vio.op == VIO::NONE) {
671         m_output_vio->nbytes = m_write_vio.ndone;
672         m_output_vio->reenable();
673         return 0;
674       }
675 
676       towrite = m_write_vio.ntodo();
677       if (towrite > 0) {
678         avail = m_write_vio.get_reader()->read_avail();
679         if (towrite > avail) {
680           towrite = avail;
681         }
682 
683         if (towrite > 0) {
684           Debug("transform",
685                 "[NullTransform::handle_event] "
686                 "writing %" PRId64 " bytes to output",
687                 towrite);
688           m_output_buf->write(m_write_vio.get_reader(), towrite);
689 
690           m_write_vio.get_reader()->consume(towrite);
691           m_write_vio.ndone += towrite;
692         }
693       }
694 
695       if (m_write_vio.ntodo() > 0) {
696         if (towrite > 0) {
697           m_output_vio->reenable();
698           m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
699         }
700       } else {
701         m_output_vio->nbytes = m_write_vio.ndone;
702         m_output_vio->reenable();
703         m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
704       }
705 
706       break;
707     }
708     }
709   }
710 
711   return 0;
712 }
713 
714 /*-------------------------------------------------------------------------
715   Reasons the JG transform cannot currently be a plugin:
716     a) Uses the config system
717        - Easily avoided by using the plugin.config file to pass the config
718          values as parameters to the plugin initialization routine.
719     b) Uses the stat system
720        - FIXME: should probably solve this.
721   -------------------------------------------------------------------------*/
722 
723 /* the JG transform is now a plugin. All the JG code,
724    config variables and stats are removed from Transform.cc */
725 
726 /*-------------------------------------------------------------------------
727   -------------------------------------------------------------------------*/
728 
729 #if TS_HAS_TESTS
730 void
run()731 TransformTest::run()
732 {
733   if (is_action_tag_set("transform_test")) {
734     eventProcessor.schedule_imm(new TransformControl(), ET_NET);
735   }
736 }
737 #endif
738 
739 /*-------------------------------------------------------------------------
740   -------------------------------------------------------------------------*/
741 
RangeTransform(ProxyMutex * mut,RangeRecord * ranges,int num_fields,HTTPHdr * transform_resp,const char * content_type,int content_type_len,int64_t content_length)742 RangeTransform::RangeTransform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp,
743                                const char *content_type, int content_type_len, int64_t content_length)
744   : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(mut)),
745     m_output_buf(nullptr),
746     m_output_reader(nullptr),
747     m_transform_resp(transform_resp),
748     m_output_vio(nullptr),
749     m_range_content_length(0),
750     m_num_range_fields(num_fields),
751     m_current_range(0),
752     m_content_type(content_type),
753     m_content_type_len(content_type_len),
754     m_ranges(ranges),
755     m_output_cl(content_length),
756     m_done(0)
757 {
758   SET_HANDLER(&RangeTransform::handle_event);
759 
760   m_num_chars_for_cl = num_chars_for_int(m_range_content_length);
761   Debug("http_trans", "RangeTransform creation finishes");
762 }
763 
764 /*-------------------------------------------------------------------------
765   -------------------------------------------------------------------------*/
766 
~RangeTransform()767 RangeTransform::~RangeTransform()
768 {
769   if (m_output_buf) {
770     free_MIOBuffer(m_output_buf);
771   }
772 }
773 
774 /*-------------------------------------------------------------------------
775   -------------------------------------------------------------------------*/
776 
777 int
handle_event(int event,void * edata)778 RangeTransform::handle_event(int event, void *edata)
779 {
780   handle_event_count(event);
781 
782   if (m_closed) {
783     if (m_deletable) {
784       if (m_output_vc != nullptr) {
785         Debug("http_trans", "RangeTransform destroy: %p ndone=%" PRId64, this, m_output_vio ? m_output_vio->ndone : 0);
786       } else {
787         Debug("http_trans", "RangeTransform destroy");
788       }
789       delete this;
790     }
791   } else {
792     switch (event) {
793     case VC_EVENT_ERROR:
794       m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
795       break;
796     case VC_EVENT_WRITE_COMPLETE:
797       ink_assert(m_output_vio == (VIO *)edata);
798       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
799       break;
800     case VC_EVENT_WRITE_READY:
801     default:
802       ink_assert(m_output_vc != nullptr);
803 
804       if (!m_output_vio) {
805         m_output_buf    = new_empty_MIOBuffer();
806         m_output_reader = m_output_buf->alloc_reader();
807         m_output_vio    = m_output_vc->do_io_write(this, m_output_cl, m_output_reader);
808 
809         change_response_header();
810 
811         if (m_num_range_fields > 1) {
812           add_boundary(false);
813           add_sub_header(m_current_range);
814         }
815       }
816 
817       MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
818       if (!trylock.is_locked()) {
819         retry(10);
820         return 0;
821       }
822 
823       if (m_closed) {
824         return 0;
825       }
826 
827       if (m_write_vio.op == VIO::NONE) {
828         m_output_vio->nbytes = m_done;
829         m_output_vio->reenable();
830         return 0;
831       }
832 
833       transform_to_range();
834       break;
835     }
836   }
837 
838   return 0;
839 }
840 
841 /*-------------------------------------------------------------------------
842   -------------------------------------------------------------------------*/
843 
844 void
transform_to_range()845 RangeTransform::transform_to_range()
846 {
847   IOBufferReader *reader = m_write_vio.get_reader();
848   int64_t toskip, tosend, avail;
849   const int64_t *end, *start;
850   int64_t prev_end = 0;
851   int64_t *done_byte;
852 
853   end       = &m_ranges[m_current_range]._end;
854   done_byte = &m_ranges[m_current_range]._done_byte;
855   start     = &m_ranges[m_current_range]._start;
856   avail     = reader->read_avail();
857 
858   while (true) {
859     if (*done_byte < (*start - 1)) {
860       toskip = *start - *done_byte - 1;
861 
862       if (toskip > avail) {
863         toskip = avail;
864       }
865 
866       if (toskip > 0) {
867         reader->consume(toskip);
868         *done_byte += toskip;
869         avail = reader->read_avail();
870       }
871     }
872 
873     if (avail > 0) {
874       tosend = *end - *done_byte;
875 
876       if (tosend > avail) {
877         tosend = avail;
878       }
879 
880       m_output_buf->write(reader, tosend);
881       reader->consume(tosend);
882 
883       m_done += tosend;
884       *done_byte += tosend;
885     }
886 
887     if (*done_byte == *end) {
888       prev_end = *end;
889     }
890 
891     // move to next Range if done one
892     // ignore bad Range: _done_byte -1, _end -1
893     while (*done_byte == *end) {
894       m_current_range++;
895 
896       if (m_current_range == m_num_range_fields) {
897         if (m_num_range_fields > 1) {
898           m_done += m_output_buf->write("\r\n", 2);
899           add_boundary(true);
900         }
901 
902         Debug("http_trans", "total bytes of Range response body is %" PRId64, m_done);
903         m_output_vio->nbytes = m_done;
904         m_output_vio->reenable();
905 
906         // if we are detaching before processing all the
907         //   input data, send VC_EVENT_EOS to let the upstream know
908         //   that it can rely on us consuming any more data
909         int cb_event = (m_write_vio.ntodo() > 0) ? VC_EVENT_EOS : VC_EVENT_WRITE_COMPLETE;
910         m_write_vio.cont->handleEvent(cb_event, &m_write_vio);
911         return;
912       }
913 
914       end       = &m_ranges[m_current_range]._end;
915       done_byte = &m_ranges[m_current_range]._done_byte;
916       start     = &m_ranges[m_current_range]._start;
917 
918       // if this is a good Range
919       if (*end != -1) {
920         m_done += m_output_buf->write("\r\n", 2);
921         add_boundary(false);
922         add_sub_header(m_current_range);
923 
924         // keep this part for future support of out-of-order Range
925         // if this is NOT a sequential Range compared to the previous one -
926         // start of current Range is larger than the end of last Range, do
927         // not need to go back to the start of the IOBuffereReader.
928         // Otherwise, reset the IOBufferReader.
929         // if ( *start > prev_end )
930         *done_byte = prev_end;
931         // else
932         //  reader->reset();
933 
934         break;
935       }
936     }
937 
938     // When we need to read and there is nothing available
939     avail = reader->read_avail();
940     if (avail == 0) {
941       break;
942     }
943   }
944 
945   m_output_vio->reenable();
946   m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
947 }
948 
949 /*-------------------------------------------------------------------------
950   -------------------------------------------------------------------------*/
951 
952 /*
953  * these two need be changed at the same time
954  */
955 
956 static char bound[]      = "RANGE_SEPARATOR";
957 static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR";
958 static char cont_type[]  = "Content-type: ";
959 static char cont_range[] = "Content-range: bytes ";
960 
961 void
add_boundary(bool end)962 RangeTransform::add_boundary(bool end)
963 {
964   m_done += m_output_buf->write("--", 2);
965   m_done += m_output_buf->write(bound, sizeof(bound) - 1);
966 
967   if (end) {
968     m_done += m_output_buf->write("--", 2);
969   }
970 
971   m_done += m_output_buf->write("\r\n", 2);
972 }
973 
974 /*-------------------------------------------------------------------------
975   -------------------------------------------------------------------------*/
976 
977 #define RANGE_NUMBERS_LENGTH 60
978 
979 void
add_sub_header(int index)980 RangeTransform::add_sub_header(int index)
981 {
982   // this should be large enough to hold three integers!
983   char numbers[RANGE_NUMBERS_LENGTH];
984   int len;
985 
986   m_done += m_output_buf->write(cont_type, sizeof(cont_type) - 1);
987   if (m_content_type) {
988     m_done += m_output_buf->write(m_content_type, m_content_type_len);
989   }
990   m_done += m_output_buf->write("\r\n", 2);
991   m_done += m_output_buf->write(cont_range, sizeof(cont_range) - 1);
992 
993   snprintf(numbers, sizeof(numbers), "%" PRId64 "-%" PRId64 "/%" PRId64 "", m_ranges[index]._start, m_ranges[index]._end,
994            m_output_cl);
995   len = strlen(numbers);
996   if (len < RANGE_NUMBERS_LENGTH) {
997     m_done += m_output_buf->write(numbers, len);
998   }
999   m_done += m_output_buf->write("\r\n\r\n", 4);
1000 }
1001 
1002 /*-------------------------------------------------------------------------
1003   -------------------------------------------------------------------------*/
1004 
1005 /*
1006  * this function changes the response header to reflect this is
1007  * a Range response.
1008  */
1009 
1010 void
change_response_header()1011 RangeTransform::change_response_header()
1012 {
1013   MIMEField *field;
1014   char *reason_phrase;
1015   HTTPStatus status_code;
1016 
1017   ink_release_assert(m_transform_resp);
1018 
1019   status_code = HTTP_STATUS_PARTIAL_CONTENT;
1020   m_transform_resp->status_set(status_code);
1021   reason_phrase = const_cast<char *>(http_hdr_reason_lookup(status_code));
1022   m_transform_resp->reason_set(reason_phrase, strlen(reason_phrase));
1023 
1024   if (m_num_range_fields > 1) {
1025     // set the right Content-Type for multiple entry Range
1026     field = m_transform_resp->field_find(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1027 
1028     if (field != nullptr) {
1029       m_transform_resp->field_delete(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1030     }
1031 
1032     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1033     field->value_append(m_transform_resp->m_heap, m_transform_resp->m_mime, range_type, sizeof(range_type) - 1);
1034 
1035     m_transform_resp->field_attach(field);
1036   } else {
1037     char numbers[RANGE_NUMBERS_LENGTH];
1038     m_transform_resp->field_delete(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
1039     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
1040     snprintf(numbers, sizeof(numbers), "bytes %" PRId64 "-%" PRId64 "/%" PRId64, m_ranges[0]._start, m_ranges[0]._end, m_output_cl);
1041     field->value_set(m_transform_resp->m_heap, m_transform_resp->m_mime, numbers, strlen(numbers));
1042     m_transform_resp->field_attach(field);
1043   }
1044 }
1045 
1046 #undef RANGE_NUMBERS_LENGTH
1047