xref: /trafficserver/proxy/PluginVC.cc (revision 4cfd5a73)
1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26    PluginVC.cc
27 
28    Description: Allows bi-directional transfer for data from one
29       continuation to another via a mechanism that impersonates a
30       NetVC.  Should implement all external attributes of NetVConnections.
31 
32    Since data is transfered within Traffic Server, this is a two
33    headed beast.  One NetVC on initiating side (active side) and
34    one NetVC on the receiving side (passive side).
35 
36    The two NetVC subclasses, PluginVC, are part PluginVCCore object.  All
37    three objects share the same mutex.  That mutex is required
38    for doing operations that affect the shared buffers,
39    read state from the PluginVC on the other side or deal with deallocation.
40 
41    To simplify the code, all data passing through the system goes initially
42    into a shared buffer.  There are two shared buffers, one for each
43    direction of the connection.  While it's more efficient to transfer
44    the data from one buffer to another directly, this creates a lot
45    of tricky conditions since you must be holding the lock for both
46    sides, in additional this VC's lock.  Additionally, issues like
47    watermarks are very hard to deal with.  Since we try to
48    to move data by IOBufferData references the efficiency penalty shouldn't
49    be too bad and if it is a big penalty, a brave soul can reimplement
50    to move the data directly without the intermediate buffer.
51 
52    Locking is difficult issue for this multi-headed beast.  In each
53    PluginVC, there a two locks. The one we got from our PluginVCCore and
54    the lock from the state machine using the PluginVC.  The read side
55    lock & the write side lock must be the same.  The regular net processor has
56    this constraint as well.  In order to handle scheduling of retry events cleanly,
57    we have two event pointers, one for each lock.  sm_lock_retry_event can only
58    be changed while holding the using state machine's lock and
59    core_lock_retry_event can only be manipulated while holding the PluginVC's
60    lock.  On entry to PluginVC::main_handler, we obtain all the locks
61    before looking at the events.  If we can't get all the locks
62    we reschedule the event for further retries.  Since all the locks are
63    obtained in the beginning of the handler, we know we are running
64    exclusively in the later parts of the handler and we will
65    be free from do_io or reenable calls on the PluginVC.
66 
67    The assumption is made (consistent with IO Core spec) that any close,
68    shutdown, reenable, or do_io_{read,write) operation is done by the callee
69    while holding the lock for that side of the operation.
70 
71 
72  ****************************************************************************/
73 
74 #include "PluginVC.h"
75 #include "P_EventSystem.h"
76 #include "P_Net.h"
77 #include "tscore/Regression.h"
78 
79 #define PVC_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
80 #define PVC_DEFAULT_MAX_BYTES 32768
81 #define MIN_BLOCK_TRANSFER_BYTES 128
82 
83 #define PVC_TYPE ((vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")
84 
PluginVC(PluginVCCore * core_obj)85 PluginVC::PluginVC(PluginVCCore *core_obj)
86   : NetVConnection(),
87     magic(PLUGIN_VC_MAGIC_ALIVE),
88     vc_type(PLUGIN_VC_UNKNOWN),
89     core_obj(core_obj),
90     other_side(nullptr),
91     read_state(),
92     write_state(),
93     need_read_process(false),
94     need_write_process(false),
95     closed(false),
96     sm_lock_retry_event(nullptr),
97     core_lock_retry_event(nullptr),
98     deletable(false),
99     reentrancy_count(0),
100     active_timeout(0),
101     active_event(nullptr),
102     inactive_timeout(0),
103     inactive_timeout_at(0),
104     inactive_event(nullptr),
105     plugin_tag(nullptr),
106     plugin_id(0)
107 {
108   ink_assert(core_obj != nullptr);
109   SET_HANDLER(&PluginVC::main_handler);
110 }
111 
~PluginVC()112 PluginVC::~PluginVC()
113 {
114   mutex = nullptr;
115 }
116 
117 int
main_handler(int event,void * data)118 PluginVC::main_handler(int event, void *data)
119 {
120   Debug("pvc_event", "[%u] %s: Received event %d", core_obj->id, PVC_TYPE, event);
121 
122   ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE);
123   ink_release_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
124   ink_assert(!deletable);
125   ink_assert(data != nullptr);
126 
127   Event *call_event   = static_cast<Event *>(data);
128   EThread *my_ethread = mutex->thread_holding;
129   ink_release_assert(my_ethread != nullptr);
130 
131   bool read_mutex_held             = false;
132   bool write_mutex_held            = false;
133   Ptr<ProxyMutex> read_side_mutex  = read_state.vio.mutex;
134   Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex;
135 
136   if (read_side_mutex) {
137     read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread);
138 
139     if (!read_mutex_held) {
140       if (call_event != inactive_event) {
141         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
142       }
143       return 0;
144     }
145 
146     if (read_side_mutex != read_state.vio.mutex) {
147       // It's possible some swapped the mutex on us before
148       //  we were able to grab it
149       Mutex_unlock(read_side_mutex, my_ethread);
150       if (call_event != inactive_event) {
151         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
152       }
153       return 0;
154     }
155   }
156 
157   if (write_side_mutex) {
158     write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread);
159 
160     if (!write_mutex_held) {
161       if (read_mutex_held) {
162         Mutex_unlock(read_side_mutex, my_ethread);
163       }
164       if (call_event != inactive_event) {
165         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
166       }
167       return 0;
168     }
169 
170     if (write_side_mutex != write_state.vio.mutex) {
171       // It's possible some swapped the mutex on us before
172       //  we were able to grab it
173       Mutex_unlock(write_side_mutex, my_ethread);
174       if (read_mutex_held) {
175         Mutex_unlock(read_side_mutex, my_ethread);
176       }
177       if (call_event != inactive_event) {
178         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
179       }
180       return 0;
181     }
182   }
183   // We've got all the locks so there should not be any
184   //   other calls active
185   ink_release_assert(reentrancy_count == 0);
186 
187   if (closed) {
188     process_close();
189 
190     if (read_mutex_held) {
191       Mutex_unlock(read_side_mutex, my_ethread);
192     }
193 
194     if (write_mutex_held) {
195       Mutex_unlock(write_side_mutex, my_ethread);
196     }
197 
198     return 0;
199   }
200   // We can get closed while we're calling back the
201   //  continuation.  Set the reentrancy count so we know
202   //  we could be calling the continuation and that we
203   //  need to defer close processing
204   reentrancy_count++;
205 
206   if (call_event == active_event) {
207     process_timeout(&active_event, VC_EVENT_ACTIVE_TIMEOUT);
208   } else if (call_event == inactive_event) {
209     if (inactive_timeout_at && inactive_timeout_at < Thread::get_hrtime()) {
210       process_timeout(&inactive_event, VC_EVENT_INACTIVITY_TIMEOUT);
211     }
212   } else {
213     if (call_event == sm_lock_retry_event) {
214       sm_lock_retry_event = nullptr;
215     } else {
216       ink_release_assert(call_event == core_lock_retry_event);
217       core_lock_retry_event = nullptr;
218     }
219 
220     if (need_read_process) {
221       process_read_side(false);
222     }
223 
224     if (need_write_process && !closed) {
225       process_write_side(false);
226     }
227   }
228 
229   reentrancy_count--;
230   if (closed) {
231     process_close();
232   }
233 
234   if (read_mutex_held) {
235     Mutex_unlock(read_side_mutex, my_ethread);
236   }
237 
238   if (write_mutex_held) {
239     Mutex_unlock(write_side_mutex, my_ethread);
240   }
241 
242   return 0;
243 }
244 
245 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)246 PluginVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
247 {
248   ink_assert(!closed);
249   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
250 
251   if (buf) {
252     read_state.vio.buffer.writer_for(buf);
253   } else {
254     read_state.vio.buffer.clear();
255   }
256 
257   // Note: we set vio.op last because process_read_side looks at it to
258   //  tell if the VConnection is active.
259   read_state.vio.mutex     = c ? c->mutex : this->mutex;
260   read_state.vio.cont      = c;
261   read_state.vio.nbytes    = nbytes;
262   read_state.vio.ndone     = 0;
263   read_state.vio.vc_server = (VConnection *)this;
264   read_state.vio.op        = VIO::READ;
265 
266   Debug("pvc", "[%u] %s: do_io_read for %" PRId64 " bytes", core_obj->id, PVC_TYPE, nbytes);
267 
268   // Since reentrant callbacks are not allowed on from do_io
269   //   functions schedule ourselves get on a different stack
270   need_read_process = true;
271   setup_event_cb(0, &sm_lock_retry_event);
272 
273   return &read_state.vio;
274 }
275 
276 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuffer,bool owner)277 PluginVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
278 {
279   ink_assert(!closed);
280   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
281 
282   if (abuffer) {
283     ink_assert(!owner);
284     write_state.vio.buffer.reader_for(abuffer);
285   } else {
286     write_state.vio.buffer.clear();
287   }
288 
289   // Note: we set vio.op last because process_write_side looks at it to
290   //  tell if the VConnection is active.
291   write_state.vio.mutex     = c ? c->mutex : this->mutex;
292   write_state.vio.cont      = c;
293   write_state.vio.nbytes    = nbytes;
294   write_state.vio.ndone     = 0;
295   write_state.vio.vc_server = (VConnection *)this;
296   write_state.vio.op        = VIO::WRITE;
297 
298   Debug("pvc", "[%u] %s: do_io_write for %" PRId64 " bytes", core_obj->id, PVC_TYPE, nbytes);
299 
300   // Since reentrant callbacks are not allowed on from do_io
301   //   functions schedule ourselves get on a different stack
302   need_write_process = true;
303   setup_event_cb(0, &sm_lock_retry_event);
304 
305   return &write_state.vio;
306 }
307 
308 void
reenable(VIO * vio)309 PluginVC::reenable(VIO *vio)
310 {
311   ink_assert(!closed);
312   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
313   ink_assert(vio->mutex->thread_holding == this_ethread());
314 
315   Ptr<ProxyMutex> sm_mutex = vio->mutex;
316   SCOPED_MUTEX_LOCK(lock, sm_mutex, this_ethread());
317 
318   Debug("pvc", "[%u] %s: reenable %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
319 
320   if (vio->op == VIO::WRITE) {
321     ink_assert(vio == &write_state.vio);
322     need_write_process = true;
323   } else if (vio->op == VIO::READ) {
324     need_read_process = true;
325   } else {
326     ink_release_assert(0);
327   }
328   setup_event_cb(0, &sm_lock_retry_event);
329 }
330 
331 void
reenable_re(VIO * vio)332 PluginVC::reenable_re(VIO *vio)
333 {
334   ink_assert(!closed);
335   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
336   ink_assert(vio->mutex->thread_holding == this_ethread());
337 
338   Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
339 
340   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
341 
342   ++reentrancy_count;
343 
344   if (vio->op == VIO::WRITE) {
345     ink_assert(vio == &write_state.vio);
346     need_write_process = true;
347     process_write_side(false);
348   } else if (vio->op == VIO::READ) {
349     ink_assert(vio == &read_state.vio);
350     need_read_process = true;
351     process_read_side(false);
352   } else {
353     ink_release_assert(0);
354   }
355 
356   --reentrancy_count;
357 
358   // To process the close, we need the lock
359   //   for the PluginVC.  Schedule an event
360   //   to make sure we get it
361   if (closed) {
362     setup_event_cb(0, &sm_lock_retry_event);
363   }
364 }
365 
366 void
do_io_close(int)367 PluginVC::do_io_close(int /* flag ATS_UNUSED */)
368 {
369   ink_assert(!closed);
370   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
371 
372   Debug("pvc", "[%u] %s: do_io_close", core_obj->id, PVC_TYPE);
373 
374   SCOPED_MUTEX_LOCK(lock, mutex, this_ethread());
375   if (!closed) { // if already closed, need to do nothing.
376     closed = true;
377 
378     // If re-entered then that earlier handler will clean up, otherwise set up a ping
379     // to drive that process (too dangerous to do it here).
380     if (reentrancy_count <= 0) {
381       setup_event_cb(0, &sm_lock_retry_event);
382     }
383   }
384 }
385 
386 void
do_io_shutdown(ShutdownHowTo_t howto)387 PluginVC::do_io_shutdown(ShutdownHowTo_t howto)
388 {
389   ink_assert(!closed);
390   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
391 
392   switch (howto) {
393   case IO_SHUTDOWN_READ:
394     read_state.shutdown = true;
395     break;
396   case IO_SHUTDOWN_WRITE:
397     write_state.shutdown = true;
398     break;
399   case IO_SHUTDOWN_READWRITE:
400     read_state.shutdown  = true;
401     write_state.shutdown = true;
402     break;
403   }
404 }
405 
406 // int PluginVC::transfer_bytes(MIOBuffer* transfer_to,
407 //                              IOBufferReader* transfer_from, int act_on)
408 //
409 //   Takes care of transfering bytes from a reader to another buffer
410 //      In the case of large transfers, we move blocks.  In the case
411 //      of small transfers we copy data so as to not build too many
412 //      buffer blocks
413 //
414 // Args:
415 //   transfer_to:  buffer to copy to
416 //   transfer_from:  buffer_copy_from
417 //   act_on: is the max number of bytes we are to copy.  There must
418 //          be at least act_on bytes available from transfer_from
419 //
420 // Returns number of bytes transfered
421 //
422 int64_t
transfer_bytes(MIOBuffer * transfer_to,IOBufferReader * transfer_from,int64_t act_on)423 PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from, int64_t act_on)
424 {
425   int64_t total_added = 0;
426 
427   ink_assert(act_on <= transfer_from->read_avail());
428 
429   while (act_on > 0) {
430     int64_t block_read_avail = transfer_from->block_read_avail();
431     int64_t to_move          = std::min(act_on, block_read_avail);
432     int64_t moved            = 0;
433 
434     if (to_move <= 0) {
435       break;
436     }
437 
438     if (to_move >= MIN_BLOCK_TRANSFER_BYTES) {
439       moved = transfer_to->write(transfer_from, to_move, 0);
440     } else {
441       // We have a really small amount of data.  To make
442       //  sure we don't get a huge build up of blocks which
443       //  can lead to stack overflows if the buffer is destroyed
444       //  before we read from it, we need copy over to the new
445       //  buffer instead of doing a block transfer
446       moved = transfer_to->write(transfer_from->start(), to_move);
447 
448       if (moved == 0) {
449         // We are out of buffer space
450         break;
451       }
452     }
453 
454     act_on -= moved;
455     transfer_from->consume(moved);
456     total_added += moved;
457   }
458 
459   return total_added;
460 }
461 
462 // void PluginVC::process_write_side(bool cb_ok)
463 //
464 //   This function may only be called while holding
465 //      this->mutex & while it is ok to callback the
466 //      write side continuation
467 //
468 //   Does write side processing
469 //
470 void
process_write_side(bool other_side_call)471 PluginVC::process_write_side(bool other_side_call)
472 {
473   ink_assert(!deletable);
474   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
475 
476   MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
477 
478   Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
479   need_write_process = false;
480 
481   // Check write_state
482   if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
483     return;
484   }
485 
486   // Check the state of our write buffer as well as ntodo
487   int64_t ntodo = write_state.vio.ntodo();
488   if (ntodo == 0) {
489     return;
490   }
491 
492   IOBufferReader *reader = write_state.vio.get_reader();
493   int64_t bytes_avail    = reader->read_avail();
494   int64_t act_on         = std::min(bytes_avail, ntodo);
495 
496   Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
497 
498   if (other_side->closed || other_side->read_state.shutdown) {
499     write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
500     return;
501   }
502 
503   if (act_on <= 0) {
504     if (ntodo > 0) {
505       // Notify the continuation that we are "disabling"
506       //  ourselves due to to nothing to write
507       write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
508     }
509     return;
510   }
511   // Bytes available, try to transfer to the PluginVCCore
512   //   intermediate buffer
513   //
514   int64_t buf_space = PVC_DEFAULT_MAX_BYTES - core_buffer->max_read_avail();
515   if (buf_space <= 0) {
516     Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
517     return;
518   }
519   act_on = std::min(act_on, buf_space);
520 
521   int64_t added = transfer_bytes(core_buffer, reader, act_on);
522   if (added < 0) {
523     // Couldn't actually get the buffer space.  This only
524     //   happens on small transfers with the above
525     //   PVC_DEFAULT_MAX_BYTES factor doesn't apply
526     Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
527     return;
528   }
529 
530   write_state.vio.ndone += added;
531 
532   Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
533 
534   if (write_state.vio.ntodo() == 0) {
535     write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
536   } else {
537     write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
538   }
539 
540   update_inactive_time();
541 
542   // Wake up the read side on the other side to process these bytes
543   if (!other_side->closed) {
544     if (!other_side_call) {
545       /* To clear the `need_read_process`, the mutexes must be obtained:
546        *
547        *   - PluginVC::mutex
548        *   - PluginVC::read_state.vio.mutex
549        *
550        */
551       if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
552         // Just return, no touch on `other_side->need_read_process`.
553         return;
554       }
555       // Acquire the lock of the read side continuation
556       EThread *my_ethread = mutex->thread_holding;
557       ink_assert(my_ethread != nullptr);
558       MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
559       if (!lock.is_locked()) {
560         Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
561               ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
562 
563         // set need_read_process to enforce the read processing
564         other_side->need_read_process = true;
565         other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
566         return;
567       }
568 
569       other_side->process_read_side(true);
570     } else {
571       other_side->read_state.vio.reenable();
572     }
573   }
574 }
575 
576 // void PluginVC::process_read_side()
577 //
578 //   This function may only be called while holding
579 //      this->mutex & while it is ok to callback the
580 //      read side continuation
581 //
582 //   Does read side processing
583 //
584 void
process_read_side(bool other_side_call)585 PluginVC::process_read_side(bool other_side_call)
586 {
587   ink_assert(!deletable);
588   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
589 
590   // TODO: Never used??
591   // MIOBuffer *core_buffer;
592 
593   IOBufferReader *core_reader;
594 
595   if (vc_type == PLUGIN_VC_ACTIVE) {
596     // core_buffer = core_obj->p_to_a_buffer;
597     core_reader = core_obj->p_to_a_reader;
598   } else {
599     ink_assert(vc_type == PLUGIN_VC_PASSIVE);
600     // core_buffer = core_obj->a_to_p_buffer;
601     core_reader = core_obj->a_to_p_reader;
602   }
603 
604   Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
605   need_read_process = false;
606 
607   // Check read_state
608   if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
609     return;
610   }
611 
612   // Check the state of our read buffer as well as ntodo
613   int64_t ntodo = read_state.vio.ntodo();
614   if (ntodo == 0) {
615     return;
616   }
617 
618   int64_t bytes_avail = core_reader->read_avail();
619   int64_t act_on      = std::min(bytes_avail, ntodo);
620 
621   Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
622 
623   if (act_on <= 0) {
624     if (other_side->closed || other_side->write_state.shutdown) {
625       read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
626     }
627     return;
628   }
629   // Bytes available, try to transfer from the PluginVCCore
630   //   intermediate buffer
631   //
632   MIOBuffer *output_buffer = read_state.vio.get_writer();
633 
634   int64_t water_mark = output_buffer->water_mark;
635   water_mark         = std::max(water_mark, static_cast<int64_t>(PVC_DEFAULT_MAX_BYTES));
636   int64_t buf_space  = water_mark - output_buffer->max_read_avail();
637   if (buf_space <= 0) {
638     Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
639     return;
640   }
641   act_on = std::min(act_on, buf_space);
642 
643   int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
644   if (added <= 0) {
645     // Couldn't actually get the buffer space.  This only
646     //   happens on small transfers with the above
647     //   PVC_DEFAULT_MAX_BYTES factor doesn't apply
648     Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
649     return;
650   }
651 
652   read_state.vio.ndone += added;
653 
654   Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
655 
656   if (read_state.vio.ntodo() == 0) {
657     read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
658   } else {
659     read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
660   }
661 
662   update_inactive_time();
663 
664   // Wake up the other side so it knows there is space available in
665   //  intermediate buffer
666   if (!other_side->closed) {
667     if (!other_side_call) {
668       /* To clear the `need_write_process`, the mutexes must be obtained:
669        *
670        *   - PluginVC::mutex
671        *   - PluginVC::write_state.vio.mutex
672        *
673        */
674       if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) {
675         // Just return, no touch on `other_side->need_write_process`.
676         return;
677       }
678       // Acquire the lock of the write side continuation
679       EThread *my_ethread = mutex->thread_holding;
680       ink_assert(my_ethread != nullptr);
681       MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
682       if (!lock.is_locked()) {
683         Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
684               ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
685 
686         // set need_write_process to enforce the write processing
687         other_side->need_write_process = true;
688         other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
689         return;
690       }
691 
692       other_side->process_write_side(true);
693     } else {
694       other_side->write_state.vio.reenable();
695     }
696   }
697 }
698 
699 // void PluginVC::process_read_close()
700 //
701 //   This function may only be called while holding
702 //      this->mutex
703 //
704 //   Tries to close the and dealloc the the vc
705 //
706 void
process_close()707 PluginVC::process_close()
708 {
709   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
710 
711   Debug("pvc", "[%u] %s: process_close", core_obj->id, PVC_TYPE);
712 
713   if (!deletable) {
714     deletable = true;
715   }
716 
717   if (sm_lock_retry_event) {
718     sm_lock_retry_event->cancel();
719     sm_lock_retry_event = nullptr;
720   }
721 
722   if (core_lock_retry_event) {
723     core_lock_retry_event->cancel();
724     core_lock_retry_event = nullptr;
725   }
726 
727   if (active_event) {
728     active_event->cancel();
729     active_event = nullptr;
730   }
731 
732   if (inactive_event) {
733     inactive_event->cancel();
734     inactive_event      = nullptr;
735     inactive_timeout_at = 0;
736   }
737   // If the other side of the PluginVC is not closed
738   //  we need to force it process both living sides
739   //  of the connection in order that it recognizes
740   //  the close
741   if (!other_side->closed && core_obj->connected) {
742     other_side->need_write_process = true;
743     other_side->need_read_process  = true;
744     other_side->setup_event_cb(0, &other_side->core_lock_retry_event);
745   }
746 
747   core_obj->attempt_delete();
748 }
749 
750 // void PluginVC::process_timeout(Event** e, int event_to_send, Event** our_eptr)
751 //
752 //   Handles sending timeout event to the VConnection.  e is the event we got
753 //     which indicates the timeout.  event_to_send is the event to the
754 //     vc user.  e is a pointer to either inactive_event,
755 //     or active_event.  If we successfully send the timeout to vc user,
756 //     we clear the pointer, otherwise we reschedule it.
757 //
758 //   Because the possibility of reentrant close from vc user, we don't want to
759 //      touch any state after making the call back
760 //
761 void
process_timeout(Event ** e,int event_to_send)762 PluginVC::process_timeout(Event **e, int event_to_send)
763 {
764   ink_assert(*e == inactive_event || *e == active_event);
765 
766   if (closed) {
767     // already closed, ignore the timeout event
768     // to avoid handle_event asserting use-after-free
769     clear_event(e);
770     return;
771   }
772 
773   if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) {
774     MUTEX_TRY_LOCK(lock, read_state.vio.mutex, (*e)->ethread);
775     if (!lock.is_locked()) {
776       if (*e == active_event) {
777         // Only reschedule active_event due to inactive_event is perorid event.
778         (*e)->schedule_in(PVC_LOCK_RETRY_TIME);
779       }
780       return;
781     }
782     clear_event(e);
783     read_state.vio.cont->handleEvent(event_to_send, &read_state.vio);
784   } else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) {
785     MUTEX_TRY_LOCK(lock, write_state.vio.mutex, (*e)->ethread);
786     if (!lock.is_locked()) {
787       if (*e == active_event) {
788         // Only reschedule active_event due to inactive_event is perorid event.
789         (*e)->schedule_in(PVC_LOCK_RETRY_TIME);
790       }
791       return;
792     }
793     clear_event(e);
794     write_state.vio.cont->handleEvent(event_to_send, &write_state.vio);
795   } else {
796     clear_event(e);
797   }
798 }
799 
800 void
clear_event(Event ** e)801 PluginVC::clear_event(Event **e)
802 {
803   if (e == nullptr || *e == nullptr) {
804     return;
805   }
806   if (*e == inactive_event) {
807     inactive_event->cancel();
808     inactive_timeout_at = 0;
809   }
810   *e = nullptr;
811 }
812 
813 void
update_inactive_time()814 PluginVC::update_inactive_time()
815 {
816   if (inactive_event && inactive_timeout) {
817     // inactive_event->cancel();
818     // inactive_event = eventProcessor.schedule_in(this, inactive_timeout);
819     inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
820   }
821 }
822 
823 // void PluginVC::setup_event_cb(ink_hrtime in)
824 //
825 //    Setup up the event processor to call us back.
826 //      We've got two different event pointers to handle
827 //      locking issues
828 //
829 void
setup_event_cb(ink_hrtime in,Event ** e_ptr)830 PluginVC::setup_event_cb(ink_hrtime in, Event **e_ptr)
831 {
832   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
833 
834   if (*e_ptr == nullptr) {
835     // We locked the pointer so we can now allocate an event
836     //   to call us back
837     if (in == 0) {
838       if (this_ethread()->tt == REGULAR) {
839         *e_ptr = this_ethread()->schedule_imm_local(this);
840       } else {
841         *e_ptr = eventProcessor.schedule_imm(this);
842       }
843     } else {
844       if (this_ethread()->tt == REGULAR) {
845         *e_ptr = this_ethread()->schedule_in_local(this, in);
846       } else {
847         *e_ptr = eventProcessor.schedule_in(this, in);
848       }
849     }
850   }
851 }
852 
853 void
set_active_timeout(ink_hrtime timeout_in)854 PluginVC::set_active_timeout(ink_hrtime timeout_in)
855 {
856   active_timeout = timeout_in;
857 
858   // FIX - Do we need to handle the case where the timeout is set
859   //   but no io has been done?
860   if (active_event) {
861     ink_assert(!active_event->cancelled);
862     active_event->cancel();
863     active_event = nullptr;
864   }
865 
866   if (active_timeout > 0) {
867     active_event = eventProcessor.schedule_in(this, active_timeout);
868   }
869 }
870 
871 void
set_inactivity_timeout(ink_hrtime timeout_in)872 PluginVC::set_inactivity_timeout(ink_hrtime timeout_in)
873 {
874   inactive_timeout = timeout_in;
875   if (inactive_timeout != 0) {
876     inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
877     if (inactive_event == nullptr) {
878       inactive_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(1));
879     }
880   } else {
881     inactive_timeout_at = 0;
882     if (inactive_event) {
883       inactive_event->cancel();
884       inactive_event = nullptr;
885     }
886   }
887 }
888 
889 void
cancel_active_timeout()890 PluginVC::cancel_active_timeout()
891 {
892   set_active_timeout(0);
893 }
894 
895 void
cancel_inactivity_timeout()896 PluginVC::cancel_inactivity_timeout()
897 {
898   set_inactivity_timeout(0);
899 }
900 
901 ink_hrtime
get_active_timeout()902 PluginVC::get_active_timeout()
903 {
904   return active_timeout;
905 }
906 
907 ink_hrtime
get_inactivity_timeout()908 PluginVC::get_inactivity_timeout()
909 {
910   return inactive_timeout;
911 }
912 
913 void
add_to_keep_alive_queue()914 PluginVC::add_to_keep_alive_queue()
915 {
916   // do nothing
917 }
918 
919 void
remove_from_keep_alive_queue()920 PluginVC::remove_from_keep_alive_queue()
921 {
922   // do nothing
923 }
924 
925 bool
add_to_active_queue()926 PluginVC::add_to_active_queue()
927 {
928   // do nothing
929   return false;
930 }
931 
932 SOCKET
get_socket()933 PluginVC::get_socket()
934 {
935   // Return an invalid file descriptor
936   return ts::NO_FD;
937 }
938 
939 void
set_local_addr()940 PluginVC::set_local_addr()
941 {
942   if (vc_type == PLUGIN_VC_ACTIVE) {
943     ats_ip_copy(&local_addr, &core_obj->active_addr_struct);
944     //    local_addr = core_obj->active_addr_struct;
945   } else {
946     ats_ip_copy(&local_addr, &core_obj->passive_addr_struct);
947     //    local_addr = core_obj->passive_addr_struct;
948   }
949 }
950 
951 void
set_remote_addr()952 PluginVC::set_remote_addr()
953 {
954   if (vc_type == PLUGIN_VC_ACTIVE) {
955     ats_ip_copy(&remote_addr, &core_obj->passive_addr_struct);
956   } else {
957     ats_ip_copy(&remote_addr, &core_obj->active_addr_struct);
958   }
959 }
960 
961 void
set_remote_addr(const sockaddr *)962 PluginVC::set_remote_addr(const sockaddr * /* new_sa ATS_UNUSED */)
963 {
964   return;
965 }
966 
967 void
set_mptcp_state()968 PluginVC::set_mptcp_state()
969 {
970   return;
971 }
972 
973 int
set_tcp_congestion_control(int ATS_UNUSED)974 PluginVC::set_tcp_congestion_control(int ATS_UNUSED)
975 {
976   return -1;
977 }
978 
979 void
apply_options()980 PluginVC::apply_options()
981 {
982   // do nothing
983 }
984 
985 bool
get_data(int id,void * data)986 PluginVC::get_data(int id, void *data)
987 {
988   if (data == nullptr) {
989     return false;
990   }
991   switch (id) {
992   case PLUGIN_VC_DATA_LOCAL:
993     if (vc_type == PLUGIN_VC_ACTIVE) {
994       *static_cast<void **>(data) = core_obj->active_data;
995     } else {
996       *static_cast<void **>(data) = core_obj->passive_data;
997     }
998     return true;
999   case PLUGIN_VC_DATA_REMOTE:
1000     if (vc_type == PLUGIN_VC_ACTIVE) {
1001       *static_cast<void **>(data) = core_obj->passive_data;
1002     } else {
1003       *static_cast<void **>(data) = core_obj->active_data;
1004     }
1005     return true;
1006   case TS_API_DATA_CLOSED:
1007     *static_cast<int *>(data) = this->closed;
1008     return true;
1009   default:
1010     *static_cast<void **>(data) = nullptr;
1011     return false;
1012   }
1013 }
1014 
1015 bool
set_data(int id,void * data)1016 PluginVC::set_data(int id, void *data)
1017 {
1018   switch (id) {
1019   case PLUGIN_VC_DATA_LOCAL:
1020     if (vc_type == PLUGIN_VC_ACTIVE) {
1021       core_obj->active_data = data;
1022     } else {
1023       core_obj->passive_data = data;
1024     }
1025     return true;
1026   case PLUGIN_VC_DATA_REMOTE:
1027     if (vc_type == PLUGIN_VC_ACTIVE) {
1028       core_obj->passive_data = data;
1029     } else {
1030       core_obj->active_data = data;
1031     }
1032     return true;
1033   default:
1034     return false;
1035   }
1036 }
1037 
1038 // PluginVCCore
1039 
1040 int32_t PluginVCCore::nextid;
1041 
1042 PluginVCCore::~PluginVCCore() = default;
1043 
1044 PluginVCCore *
alloc(Continuation * acceptor)1045 PluginVCCore::alloc(Continuation *acceptor)
1046 {
1047   PluginVCCore *pvc = new PluginVCCore;
1048   pvc->init();
1049   pvc->connect_to = acceptor;
1050   return pvc;
1051 }
1052 
1053 void
init()1054 PluginVCCore::init()
1055 {
1056   mutex = new_ProxyMutex();
1057 
1058   active_vc.vc_type    = PLUGIN_VC_ACTIVE;
1059   active_vc.other_side = &passive_vc;
1060   active_vc.core_obj   = this;
1061   active_vc.mutex      = mutex;
1062   active_vc.thread     = this_ethread();
1063 
1064   passive_vc.vc_type    = PLUGIN_VC_PASSIVE;
1065   passive_vc.other_side = &active_vc;
1066   passive_vc.core_obj   = this;
1067   passive_vc.mutex      = mutex;
1068   passive_vc.thread     = active_vc.thread;
1069 
1070   p_to_a_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
1071   p_to_a_reader = p_to_a_buffer->alloc_reader();
1072 
1073   a_to_p_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
1074   a_to_p_reader = a_to_p_buffer->alloc_reader();
1075 
1076   Debug("pvc", "[%u] Created PluginVCCore at %p, active %p, passive %p", id, this, &active_vc, &passive_vc);
1077 }
1078 
1079 void
destroy()1080 PluginVCCore::destroy()
1081 {
1082   Debug("pvc", "[%u] Destroying PluginVCCore at %p", id, this);
1083 
1084   ink_assert(active_vc.closed == true || !connected);
1085   active_vc.mutex = nullptr;
1086   active_vc.read_state.vio.buffer.clear();
1087   active_vc.write_state.vio.buffer.clear();
1088   active_vc.magic = PLUGIN_VC_MAGIC_DEAD;
1089 
1090   ink_assert(passive_vc.closed == true || !connected);
1091   passive_vc.mutex = nullptr;
1092   passive_vc.read_state.vio.buffer.clear();
1093   passive_vc.write_state.vio.buffer.clear();
1094   passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
1095 
1096   if (p_to_a_buffer) {
1097     free_MIOBuffer(p_to_a_buffer);
1098     p_to_a_buffer = nullptr;
1099   }
1100 
1101   if (a_to_p_buffer) {
1102     free_MIOBuffer(a_to_p_buffer);
1103     a_to_p_buffer = nullptr;
1104   }
1105 
1106   this->mutex = nullptr;
1107   delete this;
1108 }
1109 
1110 PluginVC *
connect()1111 PluginVCCore::connect()
1112 {
1113   ink_release_assert(connect_to != nullptr);
1114 
1115   connected = true;
1116   state_send_accept(EVENT_IMMEDIATE, nullptr);
1117 
1118   return &active_vc;
1119 }
1120 
1121 Action *
connect_re(Continuation * c)1122 PluginVCCore::connect_re(Continuation *c)
1123 {
1124   ink_release_assert(connect_to != nullptr);
1125 
1126   EThread *my_thread = this_ethread();
1127   MUTEX_TAKE_LOCK(this->mutex, my_thread);
1128 
1129   connected = true;
1130   state_send_accept(EVENT_IMMEDIATE, nullptr);
1131 
1132   // We have to take out our mutex because rest of the
1133   //   system expects the VC mutex to held when calling back.
1134   // We can use take lock here instead of try lock because the
1135   //   lock should never already be held.
1136 
1137   c->handleEvent(NET_EVENT_OPEN, &active_vc);
1138   MUTEX_UNTAKE_LOCK(this->mutex, my_thread);
1139 
1140   return ACTION_RESULT_DONE;
1141 }
1142 
1143 int
state_send_accept_failed(int,void *)1144 PluginVCCore::state_send_accept_failed(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1145 {
1146   if (connect_to->mutex == nullptr) {
1147     connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, nullptr);
1148     destroy();
1149   } else {
1150     MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1151 
1152     if (lock.is_locked()) {
1153       connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, nullptr);
1154       destroy();
1155     } else {
1156       SET_HANDLER(&PluginVCCore::state_send_accept_failed);
1157       eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1158     }
1159   }
1160 
1161   return 0;
1162 }
1163 
1164 int
state_send_accept(int,void *)1165 PluginVCCore::state_send_accept(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1166 {
1167   if (connect_to->mutex == nullptr) {
1168     connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
1169   } else {
1170     MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1171 
1172     if (lock.is_locked()) {
1173       connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
1174     } else {
1175       SET_HANDLER(&PluginVCCore::state_send_accept);
1176       eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1177     }
1178   }
1179 
1180   return 0;
1181 }
1182 
1183 // void PluginVCCore::attempt_delete()
1184 //
1185 //  Mutex must be held when calling this function
1186 //
1187 void
attempt_delete()1188 PluginVCCore::attempt_delete()
1189 {
1190   if (active_vc.deletable) {
1191     if (passive_vc.deletable) {
1192       destroy();
1193     } else if (!connected) {
1194       state_send_accept_failed(EVENT_IMMEDIATE, nullptr);
1195     }
1196   }
1197 }
1198 
1199 // void PluginVCCore::kill_no_connect()
1200 //
1201 //   Called to kill the PluginVCCore when the
1202 //     connect call hasn't been made yet
1203 //
1204 void
kill_no_connect()1205 PluginVCCore::kill_no_connect()
1206 {
1207   ink_assert(!connected);
1208   ink_assert(!active_vc.closed);
1209   active_vc.do_io_close();
1210 }
1211 
1212 void
set_passive_addr(in_addr_t ip,int port)1213 PluginVCCore::set_passive_addr(in_addr_t ip, int port)
1214 {
1215   ats_ip4_set(&passive_addr_struct, htonl(ip), htons(port));
1216 }
1217 
1218 void
set_passive_addr(sockaddr const * ip)1219 PluginVCCore::set_passive_addr(sockaddr const *ip)
1220 {
1221   passive_addr_struct.assign(ip);
1222 }
1223 
1224 void
set_active_addr(in_addr_t ip,int port)1225 PluginVCCore::set_active_addr(in_addr_t ip, int port)
1226 {
1227   ats_ip4_set(&active_addr_struct, htonl(ip), htons(port));
1228 }
1229 
1230 void
set_active_addr(sockaddr const * ip)1231 PluginVCCore::set_active_addr(sockaddr const *ip)
1232 {
1233   active_addr_struct.assign(ip);
1234 }
1235 
1236 void
set_passive_data(void * data)1237 PluginVCCore::set_passive_data(void *data)
1238 {
1239   passive_data = data;
1240 }
1241 
1242 void
set_active_data(void * data)1243 PluginVCCore::set_active_data(void *data)
1244 {
1245   active_data = data;
1246 }
1247 
1248 void
set_transparent(bool passive_side,bool active_side)1249 PluginVCCore::set_transparent(bool passive_side, bool active_side)
1250 {
1251   passive_vc.set_is_transparent(passive_side);
1252   active_vc.set_is_transparent(active_side);
1253 }
1254 
1255 void
set_plugin_id(int64_t id)1256 PluginVCCore::set_plugin_id(int64_t id)
1257 {
1258   passive_vc.plugin_id = active_vc.plugin_id = id;
1259 }
1260 
1261 void
set_plugin_tag(const char * tag)1262 PluginVCCore::set_plugin_tag(const char *tag)
1263 {
1264   passive_vc.plugin_tag = active_vc.plugin_tag = tag;
1265 }
1266 
1267 /*************************************************************
1268  *
1269  *   REGRESSION TEST STUFF
1270  *
1271  **************************************************************/
1272 
1273 #if TS_HAS_TESTS
1274 class PVCTestDriver : public NetTestDriver
1275 {
1276 public:
1277   PVCTestDriver();
1278   ~PVCTestDriver() override;
1279 
1280   void start_tests(RegressionTest *r_arg, int *pstatus_arg);
1281   void run_next_test();
1282   int main_handler(int event, void *data);
1283 
1284 private:
1285   unsigned i                    = 0;
1286   unsigned completions_received = 0;
1287 };
1288 
PVCTestDriver()1289 PVCTestDriver::PVCTestDriver() : NetTestDriver() {}
1290 
~PVCTestDriver()1291 PVCTestDriver::~PVCTestDriver()
1292 {
1293   mutex = nullptr;
1294 }
1295 
1296 void
start_tests(RegressionTest * r_arg,int * pstatus_arg)1297 PVCTestDriver::start_tests(RegressionTest *r_arg, int *pstatus_arg)
1298 {
1299   mutex = new_ProxyMutex();
1300   MUTEX_TRY_LOCK(lock, mutex, this_ethread());
1301 
1302   r       = r_arg;
1303   pstatus = pstatus_arg;
1304   SET_HANDLER(&PVCTestDriver::main_handler);
1305 
1306   run_next_test();
1307 }
1308 
1309 void
run_next_test()1310 PVCTestDriver::run_next_test()
1311 {
1312   unsigned a_index = i * 2;
1313   unsigned p_index = a_index + 1;
1314 
1315   if (p_index >= num_netvc_tests) {
1316     // We are done - // FIX - PASS or FAIL?
1317     if (errors == 0) {
1318       *pstatus = REGRESSION_TEST_PASSED;
1319     } else {
1320       *pstatus = REGRESSION_TEST_FAILED;
1321     }
1322     delete this;
1323     return;
1324   }
1325   completions_received = 0;
1326   i++;
1327 
1328   Debug("pvc_test", "Starting test %s", netvc_tests_def[a_index].test_name);
1329 
1330   NetVCTest *p       = new NetVCTest;
1331   NetVCTest *a       = new NetVCTest;
1332   PluginVCCore *core = PluginVCCore::alloc(p);
1333 
1334   p->init_test(NET_VC_TEST_PASSIVE, this, nullptr, r, &netvc_tests_def[p_index], "PluginVC", "pvc_test_detail");
1335   PluginVC *a_vc = core->connect();
1336 
1337   a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "PluginVC", "pvc_test_detail");
1338 }
1339 
1340 int
main_handler(int,void *)1341 PVCTestDriver::main_handler(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1342 {
1343   completions_received++;
1344 
1345   if (completions_received == 2) {
1346     run_next_test();
1347   }
1348 
1349   return 0;
1350 }
1351 
EXCLUSIVE_REGRESSION_TEST(PVC)1352 EXCLUSIVE_REGRESSION_TEST(PVC)(RegressionTest *t, int /* atype ATS_UNUSED */, int *pstatus)
1353 {
1354   PVCTestDriver *driver = new PVCTestDriver;
1355   driver->start_tests(t, pstatus);
1356 }
1357 #endif
1358