1 /** @file
2 
3     A brief file description
4 
5     @section license License
6 
7     Licensed to the Apache Software Foundation (ASF) under one
8     or more contributor license agreements.  See the NOTICE file
9     distributed with this work for additional information
10     regarding copyright ownership.  The ASF licenses this file
11     to you under the Apache License, Version 2.0 (the
12     "License"); you may not use this file except in compliance
13     with the License.  You may obtain a copy of the License at
14 
15     http://www.apache.org/licenses/LICENSE-2.0
16 
17     Unless required by applicable law or agreed to in writing, software
18     distributed under the License is distributed on an "AS IS" BASIS,
19     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20     See the License for the specific language governing permissions and
21     limitations under the License.
22 */
23 
24 #include "P_Net.h"
25 #include "tscore/ink_platform.h"
26 #include "tscore/InkErrno.h"
27 #include "Log.h"
28 
29 #include <termios.h>
30 
31 #define STATE_VIO_OFFSET ((uintptr_t) & ((NetState *)0)->vio)
32 #define STATE_FROM_VIO(_x) ((NetState *)(((char *)(_x)) - STATE_VIO_OFFSET))
33 
34 // Global
35 ClassAllocator<UnixNetVConnection> netVCAllocator("netVCAllocator");
36 
37 //
38 // Reschedule a UnixNetVConnection by moving it
39 // onto or off of the ready_list
40 //
41 static inline void
read_reschedule(NetHandler * nh,UnixNetVConnection * vc)42 read_reschedule(NetHandler *nh, UnixNetVConnection *vc)
43 {
44   vc->ep.refresh(EVENTIO_READ);
45   if (vc->read.triggered && vc->read.enabled) {
46     nh->read_ready_list.in_or_enqueue(vc);
47   } else {
48     nh->read_ready_list.remove(vc);
49   }
50 }
51 
52 static inline void
write_reschedule(NetHandler * nh,UnixNetVConnection * vc)53 write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
54 {
55   vc->ep.refresh(EVENTIO_WRITE);
56   if (vc->write.triggered && vc->write.enabled) {
57     nh->write_ready_list.in_or_enqueue(vc);
58   } else {
59     nh->write_ready_list.remove(vc);
60   }
61 }
62 
63 void
net_activity(UnixNetVConnection * vc,EThread * thread)64 net_activity(UnixNetVConnection *vc, EThread *thread)
65 {
66   Debug("socket", "net_activity updating inactivity %" PRId64 ", NetVC=%p", vc->inactivity_timeout_in, vc);
67   (void)thread;
68   if (vc->inactivity_timeout_in) {
69     vc->next_inactivity_timeout_at = Thread::get_hrtime() + vc->inactivity_timeout_in;
70   } else {
71     vc->next_inactivity_timeout_at = 0;
72   }
73 }
74 
75 //
76 // Signal an event
77 //
78 static inline int
read_signal_and_update(int event,UnixNetVConnection * vc)79 read_signal_and_update(int event, UnixNetVConnection *vc)
80 {
81   vc->recursion++;
82   if (vc->read.vio.cont) {
83     vc->read.vio.cont->handleEvent(event, &vc->read.vio);
84   } else {
85     switch (event) {
86     case VC_EVENT_EOS:
87     case VC_EVENT_ERROR:
88     case VC_EVENT_ACTIVE_TIMEOUT:
89     case VC_EVENT_INACTIVITY_TIMEOUT:
90       Debug("inactivity_cop", "event %d: null read.vio cont, closing vc %p", event, vc);
91       vc->closed = 1;
92       break;
93     default:
94       Error("Unexpected event %d for vc %p", event, vc);
95       ink_release_assert(0);
96       break;
97     }
98   }
99   if (!--vc->recursion && vc->closed) {
100     /* BZ  31932 */
101     ink_assert(vc->thread == this_ethread());
102     vc->nh->free_netevent(vc);
103     return EVENT_DONE;
104   } else {
105     return EVENT_CONT;
106   }
107 }
108 
109 static inline int
write_signal_and_update(int event,UnixNetVConnection * vc)110 write_signal_and_update(int event, UnixNetVConnection *vc)
111 {
112   vc->recursion++;
113   if (vc->write.vio.cont) {
114     vc->write.vio.cont->handleEvent(event, &vc->write.vio);
115   } else {
116     switch (event) {
117     case VC_EVENT_EOS:
118     case VC_EVENT_ERROR:
119     case VC_EVENT_ACTIVE_TIMEOUT:
120     case VC_EVENT_INACTIVITY_TIMEOUT:
121       Debug("inactivity_cop", "event %d: null write.vio cont, closing vc %p", event, vc);
122       vc->closed = 1;
123       break;
124     default:
125       Error("Unexpected event %d for vc %p", event, vc);
126       ink_release_assert(0);
127       break;
128     }
129   }
130   if (!--vc->recursion && vc->closed) {
131     /* BZ  31932 */
132     ink_assert(vc->thread == this_ethread());
133     vc->nh->free_netevent(vc);
134     return EVENT_DONE;
135   } else {
136     return EVENT_CONT;
137   }
138 }
139 
140 static inline int
read_signal_done(int event,NetHandler * nh,UnixNetVConnection * vc)141 read_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
142 {
143   vc->read.enabled = 0;
144   if (read_signal_and_update(event, vc) == EVENT_DONE) {
145     return EVENT_DONE;
146   } else {
147     read_reschedule(nh, vc);
148     return EVENT_CONT;
149   }
150 }
151 
152 static inline int
write_signal_done(int event,NetHandler * nh,UnixNetVConnection * vc)153 write_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
154 {
155   vc->write.enabled = 0;
156   if (write_signal_and_update(event, vc) == EVENT_DONE) {
157     return EVENT_DONE;
158   } else {
159     write_reschedule(nh, vc);
160     return EVENT_CONT;
161   }
162 }
163 
164 static inline int
read_signal_error(NetHandler * nh,UnixNetVConnection * vc,int lerrno)165 read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
166 {
167   vc->lerrno = lerrno;
168   return read_signal_done(VC_EVENT_ERROR, nh, vc);
169 }
170 
171 static inline int
write_signal_error(NetHandler * nh,UnixNetVConnection * vc,int lerrno)172 write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
173 {
174   vc->lerrno = lerrno;
175   return write_signal_done(VC_EVENT_ERROR, nh, vc);
176 }
177 
178 // Read the data for a UnixNetVConnection.
179 // Rescheduling the UnixNetVConnection by moving the VC
180 // onto or off of the ready_list.
181 // Had to wrap this function with net_read_io for SSL.
182 static void
read_from_net(NetHandler * nh,UnixNetVConnection * vc,EThread * thread)183 read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
184 {
185   NetState *s       = &vc->read;
186   ProxyMutex *mutex = thread->mutex.get();
187   int64_t r         = 0;
188 
189   MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
190 
191   if (!lock.is_locked()) {
192     read_reschedule(nh, vc);
193     return;
194   }
195 
196   // It is possible that the closed flag got set from HttpSessionManager in the
197   // global session pool case.  If so, the closed flag should be stable once we get the
198   // s->vio.mutex (the global session pool mutex).
199   if (vc->closed) {
200     vc->nh->free_netevent(vc);
201     return;
202   }
203   // if it is not enabled.
204   if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) {
205     read_disable(nh, vc);
206     return;
207   }
208 
209   MIOBufferAccessor &buf = s->vio.buffer;
210   ink_assert(buf.writer());
211 
212   // if there is nothing to do, disable connection
213   int64_t ntodo = s->vio.ntodo();
214   if (ntodo <= 0) {
215     read_disable(nh, vc);
216     return;
217   }
218   int64_t toread = buf.writer()->write_avail();
219   if (toread > ntodo) {
220     toread = ntodo;
221   }
222 
223   // read data
224   int64_t rattempted = 0, total_read = 0;
225   unsigned niov = 0;
226   IOVec tiovec[NET_MAX_IOV];
227   if (toread) {
228     IOBufferBlock *b = buf.writer()->first_write_block();
229     do {
230       niov       = 0;
231       rattempted = 0;
232       while (b && niov < NET_MAX_IOV) {
233         int64_t a = b->write_avail();
234         if (a > 0) {
235           tiovec[niov].iov_base = b->_end;
236           int64_t togo          = toread - total_read - rattempted;
237           if (a > togo) {
238             a = togo;
239           }
240           tiovec[niov].iov_len = a;
241           rattempted += a;
242           niov++;
243           if (a >= togo) {
244             break;
245           }
246         }
247         b = b->next.get();
248       }
249 
250       ink_assert(niov > 0);
251       ink_assert(niov <= countof(tiovec));
252       r = socketManager.readv(vc->con.fd, &tiovec[0], niov);
253 
254       NET_INCREMENT_DYN_STAT(net_calls_to_read_stat);
255 
256       total_read += rattempted;
257     } while (rattempted && r == rattempted && total_read < toread);
258 
259     // if we have already moved some bytes successfully, summarize in r
260     if (total_read != rattempted) {
261       if (r <= 0) {
262         r = total_read - rattempted;
263       } else {
264         r = total_read - rattempted + r;
265       }
266     }
267     // check for errors
268     if (r <= 0) {
269       if (r == -EAGAIN || r == -ENOTCONN) {
270         NET_INCREMENT_DYN_STAT(net_calls_to_read_nodata_stat);
271         vc->read.triggered = 0;
272         nh->read_ready_list.remove(vc);
273         return;
274       }
275 
276       if (!r || r == -ECONNRESET) {
277         vc->read.triggered = 0;
278         nh->read_ready_list.remove(vc);
279         read_signal_done(VC_EVENT_EOS, nh, vc);
280         return;
281       }
282       vc->read.triggered = 0;
283       read_signal_error(nh, vc, static_cast<int>(-r));
284       return;
285     }
286     NET_SUM_DYN_STAT(net_read_bytes_stat, r);
287 
288     // Add data to buffer and signal continuation.
289     buf.writer()->fill(r);
290 #ifdef DEBUG
291     if (buf.writer()->write_avail() <= 0)
292       Debug("iocore_net", "read_from_net, read buffer full");
293 #endif
294     s->vio.ndone += r;
295     net_activity(vc, thread);
296   } else {
297     r = 0;
298   }
299 
300   // Signal read ready, check if user is not done
301   if (r) {
302     // If there are no more bytes to read, signal read complete
303     ink_assert(ntodo >= 0);
304     if (s->vio.ntodo() <= 0) {
305       read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc);
306       Debug("iocore_net", "read_from_net, read finished - signal done");
307       return;
308     } else {
309       if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT) {
310         return;
311       }
312 
313       // change of lock... don't look at shared variables!
314       if (lock.get_mutex() != s->vio.mutex.get()) {
315         read_reschedule(nh, vc);
316         return;
317       }
318     }
319   }
320   // If here are is no more room, or nothing to do, disable the connection
321   if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
322     read_disable(nh, vc);
323     return;
324   }
325 
326   read_reschedule(nh, vc);
327 }
328 
329 //
330 // Write the data for a UnixNetVConnection.
331 // Rescheduling the UnixNetVConnection when necessary.
332 //
333 void
write_to_net(NetHandler * nh,UnixNetVConnection * vc,EThread * thread)334 write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
335 {
336   ProxyMutex *mutex = thread->mutex.get();
337 
338   NET_INCREMENT_DYN_STAT(net_calls_to_writetonet_stat);
339   NET_INCREMENT_DYN_STAT(net_calls_to_writetonet_afterpoll_stat);
340 
341   write_to_net_io(nh, vc, thread);
342 }
343 
344 void
write_to_net_io(NetHandler * nh,UnixNetVConnection * vc,EThread * thread)345 write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
346 {
347   NetState *s       = &vc->write;
348   ProxyMutex *mutex = thread->mutex.get();
349 
350   MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
351 
352   if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) {
353     write_reschedule(nh, vc);
354     return;
355   }
356 
357   // This function will always return true unless
358   // vc is an SSLNetVConnection.
359   if (!vc->getSSLHandShakeComplete()) {
360     if (vc->trackFirstHandshake()) {
361       // Eat the first write-ready.  Until the TLS handshake is complete,
362       // we should still be under the connect timeout and shouldn't bother
363       // the state machine until the TLS handshake is complete
364       vc->write.triggered = 0;
365       nh->write_ready_list.remove(vc);
366     }
367 
368     int err, ret;
369 
370     if (vc->get_context() == NET_VCONNECTION_OUT) {
371       ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
372     } else {
373       ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
374     }
375 
376     if (ret == EVENT_ERROR) {
377       vc->write.triggered = 0;
378       write_signal_error(nh, vc, err);
379     } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT) {
380       vc->read.triggered = 0;
381       nh->read_ready_list.remove(vc);
382       read_reschedule(nh, vc);
383     } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret == SSL_HANDSHAKE_WANT_WRITE) {
384       vc->write.triggered = 0;
385       nh->write_ready_list.remove(vc);
386       write_reschedule(nh, vc);
387     } else if (ret == EVENT_DONE) {
388       vc->write.triggered = 1;
389       if (vc->write.enabled) {
390         nh->write_ready_list.in_or_enqueue(vc);
391       }
392     } else {
393       write_reschedule(nh, vc);
394     }
395 
396     return;
397   }
398 
399   // If it is not enabled,add to WaitList.
400   if (!s->enabled || s->vio.op != VIO::WRITE) {
401     write_disable(nh, vc);
402     return;
403   }
404 
405   // If there is nothing to do, disable
406   int64_t ntodo = s->vio.ntodo();
407   if (ntodo <= 0) {
408     write_disable(nh, vc);
409     return;
410   }
411 
412   MIOBufferAccessor &buf = s->vio.buffer;
413   ink_assert(buf.writer());
414 
415   // Calculate the amount to write.
416   int64_t towrite = buf.reader()->read_avail();
417   if (towrite > ntodo) {
418     towrite = ntodo;
419   }
420 
421   int signalled = 0;
422 
423   // signal write ready to allow user to fill the buffer
424   if (towrite != ntodo && buf.writer()->write_avail()) {
425     if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
426       return;
427     }
428 
429     ntodo = s->vio.ntodo();
430     if (ntodo <= 0) {
431       write_disable(nh, vc);
432       return;
433     }
434 
435     signalled = 1;
436 
437     // Recalculate amount to write
438     towrite = buf.reader()->read_avail();
439     if (towrite > ntodo) {
440       towrite = ntodo;
441     }
442   }
443 
444   // if there is nothing to do, disable
445   ink_assert(towrite >= 0);
446   if (towrite <= 0) {
447     write_disable(nh, vc);
448     return;
449   }
450 
451   int needs             = 0;
452   int64_t total_written = 0;
453   int64_t r             = vc->load_buffer_and_write(towrite, buf, total_written, needs);
454 
455   if (total_written > 0) {
456     NET_SUM_DYN_STAT(net_write_bytes_stat, total_written);
457     s->vio.ndone += total_written;
458     net_activity(vc, thread);
459   }
460 
461   // A write of 0 makes no sense since we tried to write more than 0.
462   ink_assert(r != 0);
463   // Either we wrote something or got an error.
464   // check for errors
465   if (r < 0) { // if the socket was not ready, add to WaitList
466     if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) {
467       NET_INCREMENT_DYN_STAT(net_calls_to_write_nodata_stat);
468       if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
469         vc->write.triggered = 0;
470         nh->write_ready_list.remove(vc);
471         write_reschedule(nh, vc);
472       }
473 
474       if ((needs & EVENTIO_READ) == EVENTIO_READ) {
475         vc->read.triggered = 0;
476         nh->read_ready_list.remove(vc);
477         read_reschedule(nh, vc);
478       }
479 
480       return;
481     }
482 
483     vc->write.triggered = 0;
484     write_signal_error(nh, vc, static_cast<int>(-total_written));
485     return;
486   } else {                                        // Wrote data.  Finished without error
487     int wbe_event = vc->write_buffer_empty_event; // save so we can clear if needed.
488 
489     // If the empty write buffer trap is set, clear it.
490     if (!(buf.reader()->is_read_avail_more_than(0))) {
491       vc->write_buffer_empty_event = 0;
492     }
493 
494     // If there are no more bytes to write, signal write complete,
495     ink_assert(ntodo >= 0);
496     if (s->vio.ntodo() <= 0) {
497       write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc);
498       return;
499     }
500 
501     int e = 0;
502     if (!signalled) {
503       e = VC_EVENT_WRITE_READY;
504     } else if (wbe_event != vc->write_buffer_empty_event) {
505       // @a signalled means we won't send an event, and the event values differing means we
506       // had a write buffer trap and cleared it, so we need to send it now.
507       e = wbe_event;
508     }
509 
510     if (e) {
511       if (write_signal_and_update(e, vc) != EVENT_CONT) {
512         return;
513       }
514 
515       // change of lock... don't look at shared variables!
516       if (lock.get_mutex() != s->vio.mutex.get()) {
517         write_reschedule(nh, vc);
518         return;
519       }
520     }
521 
522     if ((needs & EVENTIO_READ) == EVENTIO_READ) {
523       read_reschedule(nh, vc);
524     }
525 
526     if (!(buf.reader()->is_read_avail_more_than(0))) {
527       write_disable(nh, vc);
528       return;
529     }
530 
531     if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
532       write_reschedule(nh, vc);
533     }
534 
535     return;
536   }
537 }
538 
539 bool
get_data(int id,void * data)540 UnixNetVConnection::get_data(int id, void *data)
541 {
542   union {
543     TSVIO *vio;
544     void *data;
545     int *n;
546   } ptr;
547 
548   ptr.data = data;
549 
550   switch (id) {
551   case TS_API_DATA_READ_VIO:
552     *ptr.vio = reinterpret_cast<TSVIO>(&this->read.vio);
553     return true;
554   case TS_API_DATA_WRITE_VIO:
555     *ptr.vio = reinterpret_cast<TSVIO>(&this->write.vio);
556     return true;
557   case TS_API_DATA_CLOSED:
558     *ptr.n = this->closed;
559     return true;
560   default:
561     return false;
562   }
563 }
564 
565 int64_t
outstanding()566 UnixNetVConnection::outstanding()
567 {
568   int n;
569   int ret = ioctl(this->get_socket(), TIOCOUTQ, &n);
570   // if there was an error (such as ioctl doesn't support this call on this platform) then
571   // we return -1
572   if (ret == -1) {
573     return ret;
574   }
575   return n;
576 }
577 
578 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)579 UnixNetVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
580 {
581   if (closed && !(c == nullptr && nbytes == 0 && buf == nullptr)) {
582     Error("do_io_read invoked on closed vc %p, cont %p, nbytes %" PRId64 ", buf %p", this, c, nbytes, buf);
583     return nullptr;
584   }
585   read.vio.op        = VIO::READ;
586   read.vio.mutex     = c ? c->mutex : this->mutex;
587   read.vio.cont      = c;
588   read.vio.nbytes    = nbytes;
589   read.vio.ndone     = 0;
590   read.vio.vc_server = (VConnection *)this;
591   if (buf) {
592     read.vio.buffer.writer_for(buf);
593     if (!read.enabled) {
594       read.vio.reenable();
595     }
596   } else {
597     read.vio.buffer.clear();
598     read.enabled = 0;
599   }
600   return &read.vio;
601 }
602 
603 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * reader,bool owner)604 UnixNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *reader, bool owner)
605 {
606   if (closed && !(c == nullptr && nbytes == 0 && reader == nullptr)) {
607     Error("do_io_write invoked on closed vc %p, cont %p, nbytes %" PRId64 ", reader %p", this, c, nbytes, reader);
608     return nullptr;
609   }
610   write.vio.op        = VIO::WRITE;
611   write.vio.mutex     = c ? c->mutex : this->mutex;
612   write.vio.cont      = c;
613   write.vio.nbytes    = nbytes;
614   write.vio.ndone     = 0;
615   write.vio.vc_server = (VConnection *)this;
616   if (reader) {
617     ink_assert(!owner);
618     write.vio.buffer.reader_for(reader);
619     if (nbytes && !write.enabled) {
620       write.vio.reenable();
621     }
622   } else {
623     write.enabled = 0;
624   }
625   return &write.vio;
626 }
627 
628 void
do_io_close(int alerrno)629 UnixNetVConnection::do_io_close(int alerrno /* = -1 */)
630 {
631   // FIXME: the nh must not nullptr.
632   ink_assert(nh);
633 
634   read.enabled  = 0;
635   write.enabled = 0;
636   read.vio.buffer.clear();
637   read.vio.nbytes = 0;
638   read.vio.op     = VIO::NONE;
639   read.vio.cont   = nullptr;
640   write.vio.buffer.clear();
641   write.vio.nbytes = 0;
642   write.vio.op     = VIO::NONE;
643   write.vio.cont   = nullptr;
644 
645   EThread *t        = this_ethread();
646   bool close_inline = !recursion && (!nh || nh->mutex->thread_holding == t);
647 
648   INK_WRITE_MEMORY_BARRIER;
649   if (alerrno && alerrno != -1) {
650     this->lerrno = alerrno;
651   }
652   if (alerrno == -1) {
653     closed = 1;
654   } else {
655     closed = -1;
656   }
657 
658   if (close_inline) {
659     if (nh) {
660       nh->free_netevent(this);
661     } else {
662       free(t);
663     }
664   }
665 }
666 
667 void
do_io_shutdown(ShutdownHowTo_t howto)668 UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t howto)
669 {
670   switch (howto) {
671   case IO_SHUTDOWN_READ:
672     socketManager.shutdown((this)->con.fd, 0);
673     read.enabled = 0;
674     read.vio.buffer.clear();
675     read.vio.nbytes = 0;
676     read.vio.cont   = nullptr;
677     f.shutdown |= NET_VC_SHUTDOWN_READ;
678     break;
679   case IO_SHUTDOWN_WRITE:
680     socketManager.shutdown((this)->con.fd, 1);
681     write.enabled = 0;
682     write.vio.buffer.clear();
683     write.vio.nbytes = 0;
684     write.vio.cont   = nullptr;
685     f.shutdown |= NET_VC_SHUTDOWN_WRITE;
686     break;
687   case IO_SHUTDOWN_READWRITE:
688     socketManager.shutdown((this)->con.fd, 2);
689     read.enabled  = 0;
690     write.enabled = 0;
691     read.vio.buffer.clear();
692     read.vio.nbytes = 0;
693     write.vio.buffer.clear();
694     write.vio.nbytes = 0;
695     read.vio.cont    = nullptr;
696     write.vio.cont   = nullptr;
697     f.shutdown       = NET_VC_SHUTDOWN_READ | NET_VC_SHUTDOWN_WRITE;
698     break;
699   default:
700     ink_assert(!"not reached");
701   }
702 }
703 
704 int
retry_OOB_send(int,Event *)705 OOB_callback::retry_OOB_send(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
706 {
707   ink_assert(mutex->thread_holding == this_ethread());
708   // the NetVC and the OOB_callback share a mutex
709   server_vc->oob_ptr = nullptr;
710   server_vc->send_OOB(server_cont, data, length);
711   delete this;
712   return EVENT_DONE;
713 }
714 
715 void
cancel_OOB()716 UnixNetVConnection::cancel_OOB()
717 {
718   UnixNetVConnection *u = this;
719   if (u->oob_ptr) {
720     if (u->oob_ptr->trigger) {
721       u->oob_ptr->trigger->cancel_action();
722       u->oob_ptr->trigger = nullptr;
723     }
724     delete u->oob_ptr;
725     u->oob_ptr = nullptr;
726   }
727 }
728 
729 Action *
send_OOB(Continuation * cont,char * buf,int len)730 UnixNetVConnection::send_OOB(Continuation *cont, char *buf, int len)
731 {
732   UnixNetVConnection *u = this;
733   ink_assert(len > 0);
734   ink_assert(buf);
735   ink_assert(!u->oob_ptr);
736   int written;
737   ink_assert(cont->mutex->thread_holding == this_ethread());
738   written = socketManager.send(u->con.fd, buf, len, MSG_OOB);
739   if (written == len) {
740     cont->handleEvent(VC_EVENT_OOB_COMPLETE, nullptr);
741     return ACTION_RESULT_DONE;
742   } else if (!written) {
743     cont->handleEvent(VC_EVENT_EOS, nullptr);
744     return ACTION_RESULT_DONE;
745   }
746   if (written > 0 && written < len) {
747     u->oob_ptr          = new OOB_callback(mutex, this, cont, buf + written, len - written);
748     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
749     return u->oob_ptr->trigger;
750   } else {
751     // should be a rare case : taking a new continuation should not be
752     // expensive for this
753     written = -errno;
754     ink_assert(written == -EAGAIN || written == -ENOTCONN);
755     u->oob_ptr          = new OOB_callback(mutex, this, cont, buf, len);
756     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
757     return u->oob_ptr->trigger;
758   }
759 }
760 
761 //
762 // Function used to reenable the VC for reading or
763 // writing.
764 //
765 void
reenable(VIO * vio)766 UnixNetVConnection::reenable(VIO *vio)
767 {
768   if (STATE_FROM_VIO(vio)->enabled) {
769     return;
770   }
771   set_enabled(vio);
772   if (!thread) {
773     return;
774   }
775   EThread *t = vio->mutex->thread_holding;
776   ink_assert(t == this_ethread());
777   ink_release_assert(!closed);
778   if (nh->mutex->thread_holding == t) {
779     if (vio == &read.vio) {
780       ep.modify(EVENTIO_READ);
781       ep.refresh(EVENTIO_READ);
782       if (read.triggered) {
783         nh->read_ready_list.in_or_enqueue(this);
784       } else {
785         nh->read_ready_list.remove(this);
786       }
787     } else {
788       ep.modify(EVENTIO_WRITE);
789       ep.refresh(EVENTIO_WRITE);
790       if (write.triggered) {
791         nh->write_ready_list.in_or_enqueue(this);
792       } else {
793         nh->write_ready_list.remove(this);
794       }
795     }
796   } else {
797     MUTEX_TRY_LOCK(lock, nh->mutex, t);
798     if (!lock.is_locked()) {
799       if (vio == &read.vio) {
800         int isin = ink_atomic_swap(&read.in_enabled_list, 1);
801         if (!isin) {
802           nh->read_enable_list.push(this);
803         }
804       } else {
805         int isin = ink_atomic_swap(&write.in_enabled_list, 1);
806         if (!isin) {
807           nh->write_enable_list.push(this);
808         }
809       }
810       if (likely(nh->thread)) {
811         nh->thread->tail_cb->signalActivity();
812       } else if (nh->trigger_event) {
813         nh->trigger_event->ethread->tail_cb->signalActivity();
814       }
815     } else {
816       if (vio == &read.vio) {
817         ep.modify(EVENTIO_READ);
818         ep.refresh(EVENTIO_READ);
819         if (read.triggered) {
820           nh->read_ready_list.in_or_enqueue(this);
821         } else {
822           nh->read_ready_list.remove(this);
823         }
824       } else {
825         ep.modify(EVENTIO_WRITE);
826         ep.refresh(EVENTIO_WRITE);
827         if (write.triggered) {
828           nh->write_ready_list.in_or_enqueue(this);
829         } else {
830           nh->write_ready_list.remove(this);
831         }
832       }
833     }
834   }
835 }
836 
837 void
reenable_re(VIO * vio)838 UnixNetVConnection::reenable_re(VIO *vio)
839 {
840   if (!thread) {
841     return;
842   }
843   EThread *t = vio->mutex->thread_holding;
844   ink_assert(t == this_ethread());
845   if (nh->mutex->thread_holding == t) {
846     set_enabled(vio);
847     if (vio == &read.vio) {
848       ep.modify(EVENTIO_READ);
849       ep.refresh(EVENTIO_READ);
850       if (read.triggered) {
851         net_read_io(nh, t);
852       } else {
853         nh->read_ready_list.remove(this);
854       }
855     } else {
856       ep.modify(EVENTIO_WRITE);
857       ep.refresh(EVENTIO_WRITE);
858       if (write.triggered) {
859         write_to_net(nh, this, t);
860       } else {
861         nh->write_ready_list.remove(this);
862       }
863     }
864   } else {
865     reenable(vio);
866   }
867 }
868 
UnixNetVConnection()869 UnixNetVConnection::UnixNetVConnection()
870 {
871   SET_HANDLER((NetVConnHandler)&UnixNetVConnection::startEvent);
872 }
873 
874 // Private methods
875 
876 void
set_enabled(VIO * vio)877 UnixNetVConnection::set_enabled(VIO *vio)
878 {
879   ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
880   ink_release_assert(!closed);
881   STATE_FROM_VIO(vio)->enabled = 1;
882   if (!next_inactivity_timeout_at && inactivity_timeout_in) {
883     next_inactivity_timeout_at = Thread::get_hrtime() + inactivity_timeout_in;
884   }
885 }
886 
887 void
net_read_io(NetHandler * nh,EThread * lthread)888 UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
889 {
890   read_from_net(nh, this, lthread);
891 }
892 
893 void
net_write_io(NetHandler * nh,EThread * lthread)894 UnixNetVConnection::net_write_io(NetHandler *nh, EThread *lthread)
895 {
896   write_to_net(nh, this, lthread);
897 }
898 
899 // This code was pulled out of write_to_net so
900 // I could overwrite it for the SSL implementation
901 // (SSL read does not support overlapped i/o)
902 // without duplicating all the code in write_to_net.
903 int64_t
load_buffer_and_write(int64_t towrite,MIOBufferAccessor & buf,int64_t & total_written,int & needs)904 UnixNetVConnection::load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs)
905 {
906   int64_t r                  = 0;
907   int64_t try_to_write       = 0;
908   IOBufferReader *tmp_reader = buf.reader()->clone();
909 
910   do {
911     IOVec tiovec[NET_MAX_IOV];
912     unsigned niov = 0;
913     try_to_write  = 0;
914 
915     while (niov < NET_MAX_IOV) {
916       int64_t wavail = towrite - total_written;
917       int64_t len    = tmp_reader->block_read_avail();
918 
919       // Check if we have done this block.
920       if (len <= 0) {
921         break;
922       }
923 
924       // Check if the amount to write exceeds that in this buffer.
925       if (len > wavail) {
926         len = wavail;
927       }
928 
929       if (len == 0) {
930         break;
931       }
932 
933       // build an iov entry
934       tiovec[niov].iov_len  = len;
935       tiovec[niov].iov_base = tmp_reader->start();
936       niov++;
937 
938       try_to_write += len;
939       tmp_reader->consume(len);
940     }
941 
942     ink_assert(niov > 0);
943     ink_assert(niov <= countof(tiovec));
944 
945     // If the platform doesn't support TCP Fast Open, verify that we
946     // correctly disabled support in the socket option configuration.
947     ink_assert(MSG_FASTOPEN != 0 || this->options.f_tcp_fastopen == false);
948 
949     if (!this->con.is_connected && this->options.f_tcp_fastopen) {
950       struct msghdr msg;
951 
952       ink_zero(msg);
953       msg.msg_name    = const_cast<sockaddr *>(this->get_remote_addr());
954       msg.msg_namelen = ats_ip_size(this->get_remote_addr());
955       msg.msg_iov     = &tiovec[0];
956       msg.msg_iovlen  = niov;
957 
958       NET_INCREMENT_DYN_STAT(net_fastopen_attempts_stat);
959 
960       r = socketManager.sendmsg(con.fd, &msg, MSG_FASTOPEN);
961       if (r < 0) {
962         if (r == -EINPROGRESS || r == -EWOULDBLOCK) {
963           this->con.is_connected = true;
964         }
965       } else {
966         NET_INCREMENT_DYN_STAT(net_fastopen_successes_stat);
967         this->con.is_connected = true;
968       }
969 
970     } else {
971       r = socketManager.writev(con.fd, &tiovec[0], niov);
972     }
973 
974     if (r > 0) {
975       buf.reader()->consume(r);
976       total_written += r;
977     }
978 
979     ProxyMutex *mutex = thread->mutex.get();
980     NET_INCREMENT_DYN_STAT(net_calls_to_write_stat);
981   } while (r == try_to_write && total_written < towrite);
982 
983   tmp_reader->dealloc();
984 
985   needs |= EVENTIO_WRITE;
986 
987   return r;
988 }
989 
990 void
readDisable(NetHandler * nh)991 UnixNetVConnection::readDisable(NetHandler *nh)
992 {
993   read_disable(nh, this);
994 }
995 
996 void
readSignalError(NetHandler * nh,int err)997 UnixNetVConnection::readSignalError(NetHandler *nh, int err)
998 {
999   read_signal_error(nh, this, err);
1000 }
1001 
1002 int
readSignalDone(int event,NetHandler * nh)1003 UnixNetVConnection::readSignalDone(int event, NetHandler *nh)
1004 {
1005   return (read_signal_done(event, nh, this));
1006 }
1007 
1008 int
readSignalAndUpdate(int event)1009 UnixNetVConnection::readSignalAndUpdate(int event)
1010 {
1011   return (read_signal_and_update(event, this));
1012 }
1013 
1014 // Interface so SSL inherited class can call some static in-line functions
1015 // without affecting regular net stuff or copying a bunch of code into
1016 // the header files.
1017 void
readReschedule(NetHandler * nh)1018 UnixNetVConnection::readReschedule(NetHandler *nh)
1019 {
1020   read_reschedule(nh, this);
1021 }
1022 
1023 void
writeReschedule(NetHandler * nh)1024 UnixNetVConnection::writeReschedule(NetHandler *nh)
1025 {
1026   write_reschedule(nh, this);
1027 }
1028 
1029 void
netActivity(EThread * lthread)1030 UnixNetVConnection::netActivity(EThread *lthread)
1031 {
1032   net_activity(this, lthread);
1033 }
1034 
1035 int
startEvent(int,Event * e)1036 UnixNetVConnection::startEvent(int /* event ATS_UNUSED */, Event *e)
1037 {
1038   MUTEX_TRY_LOCK(lock, get_NetHandler(e->ethread)->mutex, e->ethread);
1039   if (!lock.is_locked()) {
1040     e->schedule_in(HRTIME_MSECONDS(net_retry_delay));
1041     return EVENT_CONT;
1042   }
1043   if (!action_.cancelled) {
1044     connectUp(e->ethread, NO_FD);
1045   } else {
1046     free(e->ethread);
1047   }
1048   return EVENT_DONE;
1049 }
1050 
1051 int
acceptEvent(int event,Event * e)1052 UnixNetVConnection::acceptEvent(int event, Event *e)
1053 {
1054   EThread *t    = (e == nullptr) ? this_ethread() : e->ethread;
1055   NetHandler *h = get_NetHandler(t);
1056 
1057   thread = t;
1058 
1059   // Send this NetVC to NetHandler and start to polling read & write event.
1060   if (h->startIO(this) < 0) {
1061     free(t);
1062     return EVENT_DONE;
1063   }
1064 
1065   // Switch vc->mutex from NetHandler->mutex to new mutex
1066   mutex = new_ProxyMutex();
1067   SCOPED_MUTEX_LOCK(lock2, mutex, t);
1068 
1069   // Setup a timeout callback handler.
1070   SET_HANDLER((NetVConnHandler)&UnixNetVConnection::mainEvent);
1071 
1072   // Send this netvc to InactivityCop.
1073   nh->startCop(this);
1074 
1075   if (inactivity_timeout_in) {
1076     UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
1077   } else {
1078     set_inactivity_timeout(0);
1079   }
1080 
1081   if (active_timeout_in) {
1082     UnixNetVConnection::set_active_timeout(active_timeout_in);
1083   }
1084   if (action_.continuation->mutex != nullptr) {
1085     MUTEX_TRY_LOCK(lock3, action_.continuation->mutex, t);
1086     if (!lock3.is_locked()) {
1087       ink_release_assert(0);
1088     }
1089     action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
1090   } else {
1091     action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
1092   }
1093   return EVENT_DONE;
1094 }
1095 
1096 //
1097 // The main event for UnixNetVConnections.
1098 // This is called by the Event subsystem to initialize the UnixNetVConnection
1099 // and for active and inactivity timeouts.
1100 //
1101 int
mainEvent(int event,Event * e)1102 UnixNetVConnection::mainEvent(int event, Event *e)
1103 {
1104   ink_assert(event == VC_EVENT_ACTIVE_TIMEOUT || event == VC_EVENT_INACTIVITY_TIMEOUT);
1105   ink_assert(thread == this_ethread());
1106 
1107   MUTEX_TRY_LOCK(hlock, get_NetHandler(thread)->mutex, e->ethread);
1108   MUTEX_TRY_LOCK(rlock, read.vio.mutex ? read.vio.mutex : e->ethread->mutex, e->ethread);
1109   MUTEX_TRY_LOCK(wlock, write.vio.mutex ? write.vio.mutex : e->ethread->mutex, e->ethread);
1110 
1111   if (!hlock.is_locked() || !rlock.is_locked() || !wlock.is_locked() ||
1112       (read.vio.mutex && rlock.get_mutex() != read.vio.mutex.get()) ||
1113       (write.vio.mutex && wlock.get_mutex() != write.vio.mutex.get())) {
1114     return EVENT_CONT;
1115   }
1116 
1117   if (e->cancelled) {
1118     return EVENT_DONE;
1119   }
1120 
1121   int signal_event;
1122   Continuation *reader_cont     = nullptr;
1123   Continuation *writer_cont     = nullptr;
1124   ink_hrtime *signal_timeout_at = nullptr;
1125   Event *t                      = nullptr;
1126   Event **signal_timeout        = &t;
1127 
1128   switch (event) {
1129   case VC_EVENT_INACTIVITY_TIMEOUT:
1130     signal_event      = VC_EVENT_INACTIVITY_TIMEOUT;
1131     signal_timeout_at = &next_inactivity_timeout_at;
1132     break;
1133   case VC_EVENT_ACTIVE_TIMEOUT:
1134     signal_event      = VC_EVENT_ACTIVE_TIMEOUT;
1135     signal_timeout_at = &next_activity_timeout_at;
1136     break;
1137   default:
1138     ink_release_assert(!"BUG: unexpected event in UnixNetVConnection::mainEvent");
1139     break;
1140   }
1141 
1142   *signal_timeout    = nullptr;
1143   *signal_timeout_at = 0;
1144   writer_cont        = write.vio.cont;
1145 
1146   if (closed) {
1147     nh->free_netevent(this);
1148     return EVENT_DONE;
1149   }
1150 
1151   if (read.vio.op == VIO::READ && !(f.shutdown & NET_VC_SHUTDOWN_READ)) {
1152     reader_cont = read.vio.cont;
1153     if (read_signal_and_update(signal_event, this) == EVENT_DONE) {
1154       return EVENT_DONE;
1155     }
1156   }
1157 
1158   if (!*signal_timeout && !*signal_timeout_at && !closed && write.vio.op == VIO::WRITE && !(f.shutdown & NET_VC_SHUTDOWN_WRITE) &&
1159       reader_cont != write.vio.cont && writer_cont == write.vio.cont) {
1160     if (write_signal_and_update(signal_event, this) == EVENT_DONE) {
1161       return EVENT_DONE;
1162     }
1163   }
1164   return EVENT_DONE;
1165 }
1166 
1167 int
populate(Connection & con_in,Continuation * c,void * arg)1168 UnixNetVConnection::populate(Connection &con_in, Continuation *c, void *arg)
1169 {
1170   this->con.move(con_in);
1171   this->mutex  = c->mutex;
1172   this->thread = this_ethread();
1173 
1174   EThread *t    = this_ethread();
1175   NetHandler *h = get_NetHandler(t);
1176 
1177   MUTEX_TRY_LOCK(lock, h->mutex, t);
1178   if (!lock.is_locked()) {
1179     // Clean up and go home
1180     return EVENT_ERROR;
1181   }
1182 
1183   if (h->startIO(this) < 0) {
1184     Debug("iocore_net", "populate : Failed to add to epoll list");
1185     return EVENT_ERROR;
1186   }
1187 
1188   ink_assert(this->nh != nullptr);
1189   SET_HANDLER(&UnixNetVConnection::mainEvent);
1190   this->nh->startCop(this);
1191   ink_assert(this->con.fd != NO_FD);
1192   return EVENT_DONE;
1193 }
1194 
1195 int
connectUp(EThread * t,int fd)1196 UnixNetVConnection::connectUp(EThread *t, int fd)
1197 {
1198   ink_assert(get_NetHandler(t)->mutex->thread_holding == this_ethread());
1199   int res;
1200 
1201   thread = t;
1202   if (check_net_throttle(CONNECT)) {
1203     check_throttle_warning(CONNECT);
1204     res = -ENET_THROTTLING;
1205     NET_INCREMENT_DYN_STAT(net_connections_throttled_out_stat);
1206     goto fail;
1207   }
1208 
1209   // Force family to agree with remote (server) address.
1210   options.ip_family = con.addr.sa.sa_family;
1211 
1212   //
1213   // Initialize this UnixNetVConnection
1214   //
1215   if (is_debug_tag_set("iocore_net")) {
1216     char addrbuf[INET6_ADDRSTRLEN];
1217     Debug("iocore_net", "connectUp:: local_addr=%s:%d [%s]",
1218           options.local_ip.isValid() ? options.local_ip.toString(addrbuf, sizeof(addrbuf)) : "*", options.local_port,
1219           NetVCOptions::toString(options.addr_binding));
1220   }
1221 
1222   // If this is getting called from the TS API, then we are wiring up a file descriptor
1223   // provided by the caller. In that case, we know that the socket is already connected.
1224   if (fd == NO_FD) {
1225     // Due to multi-threads system, the fd returned from con.open() may exceed the limitation of check_net_throttle().
1226     res = con.open(options);
1227     if (res != 0) {
1228       goto fail;
1229     }
1230   } else {
1231     int len = sizeof(con.sock_type);
1232 
1233     // This call will fail if fd is not a socket (e.g. it is a
1234     // eventfd or a regular file fd.  That is ok, because sock_type
1235     // is only used when setting up the socket.
1236     safe_getsockopt(fd, SOL_SOCKET, SO_TYPE, reinterpret_cast<char *>(&con.sock_type), &len);
1237     safe_nonblocking(fd);
1238     con.fd           = fd;
1239     con.is_connected = true;
1240     con.is_bound     = true;
1241   }
1242 
1243   // Must connect after EventIO::Start() to avoid a race condition
1244   // when edge triggering is used.
1245   if ((res = get_NetHandler(t)->startIO(this)) < 0) {
1246     goto fail;
1247   }
1248 
1249   if (fd == NO_FD) {
1250     res = con.connect(nullptr, options);
1251     if (res != 0) {
1252       // fast stopIO
1253       nh = nullptr;
1254       goto fail;
1255     }
1256   }
1257 
1258   // Did not fail, increment connection count
1259   NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
1260   ink_release_assert(con.fd != NO_FD);
1261 
1262   // Setup a timeout callback handler.
1263   SET_HANDLER(&UnixNetVConnection::mainEvent);
1264   // Send this netvc to InactivityCop.
1265   nh->startCop(this);
1266 
1267   set_inactivity_timeout(0);
1268   ink_assert(!active_timeout_in);
1269   this->set_local_addr();
1270   action_.continuation->handleEvent(NET_EVENT_OPEN, this);
1271   return CONNECT_SUCCESS;
1272 
1273 fail:
1274   lerrno = -res;
1275   action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)static_cast<intptr_t>(res));
1276   if (fd != NO_FD) {
1277     con.fd = NO_FD;
1278   }
1279   free(t);
1280   return CONNECT_FAILURE;
1281 }
1282 
1283 void
clear()1284 UnixNetVConnection::clear()
1285 {
1286   // clear timeout variables
1287   next_inactivity_timeout_at = 0;
1288   next_activity_timeout_at   = 0;
1289   inactivity_timeout_in      = 0;
1290   active_timeout_in          = 0;
1291 
1292   // clear variables for reuse
1293   this->mutex.clear();
1294   action_.mutex.clear();
1295   got_remote_addr = false;
1296   got_local_addr  = false;
1297   attributes      = 0;
1298   read.vio.mutex.clear();
1299   write.vio.mutex.clear();
1300   flags               = 0;
1301   nh                  = nullptr;
1302   read.triggered      = 0;
1303   write.triggered     = 0;
1304   read.enabled        = 0;
1305   write.enabled       = 0;
1306   read.vio.cont       = nullptr;
1307   write.vio.cont      = nullptr;
1308   read.vio.vc_server  = nullptr;
1309   write.vio.vc_server = nullptr;
1310   options.reset();
1311   closed        = 0;
1312   netvc_context = NET_VCONNECTION_UNSET;
1313   ink_assert(!read.ready_link.prev && !read.ready_link.next);
1314   ink_assert(!read.enable_link.next);
1315   ink_assert(!write.ready_link.prev && !write.ready_link.next);
1316   ink_assert(!write.enable_link.next);
1317   ink_assert(!link.next && !link.prev);
1318 }
1319 
1320 void
free(EThread * t)1321 UnixNetVConnection::free(EThread *t)
1322 {
1323   ink_release_assert(t == this_ethread());
1324 
1325   // cancel OOB
1326   cancel_OOB();
1327   // close socket fd
1328   if (con.fd != NO_FD) {
1329     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
1330   }
1331   con.close();
1332 
1333   clear();
1334   SET_CONTINUATION_HANDLER(this, (NetVConnHandler)&UnixNetVConnection::startEvent);
1335   ink_assert(con.fd == NO_FD);
1336   ink_assert(t == this_ethread());
1337 
1338   if (from_accept_thread) {
1339     netVCAllocator.free(this);
1340   } else {
1341     THREAD_FREE(this, netVCAllocator, t);
1342   }
1343 }
1344 
1345 void
apply_options()1346 UnixNetVConnection::apply_options()
1347 {
1348   con.apply_options(options);
1349 }
1350 
1351 TS_INLINE void
set_inactivity_timeout(ink_hrtime timeout_in)1352 UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout_in)
1353 {
1354   Debug("socket", "Set inactive timeout=%" PRId64 ", for NetVC=%p", timeout_in, this);
1355   if (timeout_in == 0) {
1356     // set default inactivity timeout
1357     timeout_in = HRTIME_SECONDS(nh->config.default_inactivity_timeout);
1358   }
1359   inactivity_timeout_in      = timeout_in;
1360   next_inactivity_timeout_at = Thread::get_hrtime() + inactivity_timeout_in;
1361 }
1362 
1363 /*
1364  * Close down the current netVC.  Save aside the socket and SSL information
1365  * and create new netVC in the current thread/netVC
1366  */
1367 UnixNetVConnection *
migrateToCurrentThread(Continuation * cont,EThread * t)1368 UnixNetVConnection::migrateToCurrentThread(Continuation *cont, EThread *t)
1369 {
1370   NetHandler *client_nh = get_NetHandler(t);
1371   ink_assert(client_nh);
1372   if (this->nh == client_nh) {
1373     // We're already there!
1374     return this;
1375   }
1376 
1377   Connection hold_con;
1378   hold_con.move(this->con);
1379   SSLNetVConnection *sslvc = dynamic_cast<SSLNetVConnection *>(this);
1380 
1381   SSL *save_ssl = (sslvc) ? sslvc->ssl : nullptr;
1382   if (save_ssl) {
1383     SSLNetVCDetach(sslvc->ssl);
1384     sslvc->ssl = nullptr;
1385   }
1386 
1387   // Do_io_close will signal the VC to be freed on the original thread
1388   // Since we moved the con context, the fd will not be closed
1389   // Go ahead and remove the fd from the original thread's epoll structure, so it is not
1390   // processed on two threads simultaneously
1391   this->ep.stop();
1392   this->do_io_close();
1393 
1394   // Create new VC:
1395   if (save_ssl) {
1396     SSLNetVConnection *sslvc = static_cast<SSLNetVConnection *>(sslNetProcessor.allocate_vc(t));
1397     if (sslvc->populate(hold_con, cont, save_ssl) != EVENT_DONE) {
1398       sslvc->do_io_close();
1399       sslvc = nullptr;
1400     } else {
1401       sslvc->set_context(get_context());
1402     }
1403     return sslvc;
1404     // Update the SSL fields
1405   } else {
1406     UnixNetVConnection *netvc = static_cast<UnixNetVConnection *>(netProcessor.allocate_vc(t));
1407     if (netvc->populate(hold_con, cont, save_ssl) != EVENT_DONE) {
1408       netvc->do_io_close();
1409       netvc = nullptr;
1410     } else {
1411       netvc->set_context(get_context());
1412     }
1413     return netvc;
1414   }
1415 }
1416 
1417 void
add_to_keep_alive_queue()1418 UnixNetVConnection::add_to_keep_alive_queue()
1419 {
1420   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1421   if (lock.is_locked()) {
1422     nh->add_to_keep_alive_queue(this);
1423   } else {
1424     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue.");
1425   }
1426 }
1427 
1428 void
remove_from_keep_alive_queue()1429 UnixNetVConnection::remove_from_keep_alive_queue()
1430 {
1431   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1432   if (lock.is_locked()) {
1433     nh->remove_from_keep_alive_queue(this);
1434   } else {
1435     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue.");
1436   }
1437 }
1438 
1439 bool
add_to_active_queue()1440 UnixNetVConnection::add_to_active_queue()
1441 {
1442   bool result = false;
1443 
1444   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1445   if (lock.is_locked()) {
1446     result = nh->add_to_active_queue(this);
1447   } else {
1448     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on active_queue.");
1449   }
1450   return result;
1451 }
1452 
1453 void
remove_from_active_queue()1454 UnixNetVConnection::remove_from_active_queue()
1455 {
1456   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1457   if (lock.is_locked()) {
1458     nh->remove_from_active_queue(this);
1459   } else {
1460     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on active_queue.");
1461   }
1462 }
1463 
1464 int
populate_protocol(std::string_view * results,int n) const1465 UnixNetVConnection::populate_protocol(std::string_view *results, int n) const
1466 {
1467   int retval = 0;
1468   if (n > retval) {
1469     if (!(results[retval] = options.get_proto_string()).empty()) {
1470       ++retval;
1471     }
1472     if (n > retval) {
1473       if (!(results[retval] = options.get_family_string()).empty()) {
1474         ++retval;
1475       }
1476     }
1477   }
1478   return retval;
1479 }
1480 
1481 const char *
protocol_contains(std::string_view tag) const1482 UnixNetVConnection::protocol_contains(std::string_view tag) const
1483 {
1484   std::string_view retval = options.get_proto_string();
1485   if (!IsNoCasePrefixOf(tag, retval)) { // didn't match IP level, check TCP level
1486     retval = options.get_family_string();
1487     if (!IsNoCasePrefixOf(tag, retval)) { // no match here either, return empty.
1488       ink_zero(retval);
1489     }
1490   }
1491   return retval.data();
1492 }
1493 
1494 int
set_tcp_congestion_control(int side)1495 UnixNetVConnection::set_tcp_congestion_control(int side)
1496 {
1497 #ifdef TCP_CONGESTION
1498   std::string_view ccp;
1499 
1500   if (side == CLIENT_SIDE) {
1501     ccp = net_ccp_in;
1502   } else {
1503     ccp = net_ccp_out;
1504   }
1505 
1506   if (!ccp.empty()) {
1507     int rv = setsockopt(con.fd, IPPROTO_TCP, TCP_CONGESTION, reinterpret_cast<const void *>(ccp.data()), ccp.size());
1508 
1509     if (rv < 0) {
1510       Error("Unable to set TCP congestion control on socket %d to \"%s\", errno=%d (%s)", con.fd, ccp.data(), errno,
1511             strerror(errno));
1512     } else {
1513       Debug("socket", "Setting TCP congestion control on socket [%d] to \"%s\" -> %d", con.fd, ccp.data(), rv);
1514     }
1515     return 0;
1516   }
1517   return -1;
1518 #else
1519   Debug("socket", "Setting TCP congestion control is not supported on this platform.");
1520   return -1;
1521 #endif
1522 }
1523