1 /** @file
2 
3     A brief file description
4 
5     @section license License
6 
7     Licensed to the Apache Software Foundation (ASF) under one
8     or more contributor license agreements.  See the NOTICE file
9     distributed with this work for additional information
10     regarding copyright ownership.  The ASF licenses this file
11     to you under the Apache License, Version 2.0 (the
12     "License"); you may not use this file except in compliance
13     with the License.  You may obtain a copy of the License at
14 
15     http://www.apache.org/licenses/LICENSE-2.0
16 
17     Unless required by applicable law or agreed to in writing, software
18     distributed under the License is distributed on an "AS IS" BASIS,
19     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20     See the License for the specific language governing permissions and
21     limitations under the License.
22 */
23 
24 #include "P_Net.h"
25 #include "tscore/ink_platform.h"
26 #include "tscore/InkErrno.h"
27 #include "Log.h"
28 
29 #include <termios.h>
30 
31 #define STATE_VIO_OFFSET ((uintptr_t) & ((NetState *)0)->vio)
32 #define STATE_FROM_VIO(_x) ((NetState *)(((char *)(_x)) - STATE_VIO_OFFSET))
33 
34 // Global
35 ClassAllocator<UnixNetVConnection> netVCAllocator("netVCAllocator");
36 
37 //
38 // Reschedule a UnixNetVConnection by moving it
39 // onto or off of the ready_list
40 //
41 static inline void
read_reschedule(NetHandler * nh,UnixNetVConnection * vc)42 read_reschedule(NetHandler *nh, UnixNetVConnection *vc)
43 {
44   vc->ep.refresh(EVENTIO_READ);
45   if (vc->read.triggered && vc->read.enabled) {
46     nh->read_ready_list.in_or_enqueue(vc);
47   } else {
48     nh->read_ready_list.remove(vc);
49   }
50 }
51 
52 static inline void
write_reschedule(NetHandler * nh,UnixNetVConnection * vc)53 write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
54 {
55   vc->ep.refresh(EVENTIO_WRITE);
56   if (vc->write.triggered && vc->write.enabled) {
57     nh->write_ready_list.in_or_enqueue(vc);
58   } else {
59     nh->write_ready_list.remove(vc);
60   }
61 }
62 
63 void
net_activity(UnixNetVConnection * vc,EThread * thread)64 net_activity(UnixNetVConnection *vc, EThread *thread)
65 {
66   Debug("socket", "net_activity updating inactivity %" PRId64 ", NetVC=%p", vc->inactivity_timeout_in, vc);
67   (void)thread;
68   if (vc->inactivity_timeout_in) {
69     vc->next_inactivity_timeout_at = Thread::get_hrtime() + vc->inactivity_timeout_in;
70   } else {
71     vc->next_inactivity_timeout_at = 0;
72   }
73 }
74 
75 //
76 // Signal an event
77 //
78 static inline int
read_signal_and_update(int event,UnixNetVConnection * vc)79 read_signal_and_update(int event, UnixNetVConnection *vc)
80 {
81   vc->recursion++;
82   if (vc->read.vio.cont) {
83     vc->read.vio.cont->handleEvent(event, &vc->read.vio);
84   } else {
85     switch (event) {
86     case VC_EVENT_EOS:
87     case VC_EVENT_ERROR:
88     case VC_EVENT_ACTIVE_TIMEOUT:
89     case VC_EVENT_INACTIVITY_TIMEOUT:
90       Debug("inactivity_cop", "event %d: null read.vio cont, closing vc %p", event, vc);
91       Warning("read: Closing orphaned vc %p", vc);
92       vc->closed = 1;
93       break;
94     default:
95       Error("Unexpected event %d for vc %p", event, vc);
96       ink_release_assert(0);
97       break;
98     }
99   }
100   if (!--vc->recursion && vc->closed) {
101     /* BZ  31932 */
102     ink_assert(vc->thread == this_ethread());
103     vc->nh->free_netevent(vc);
104     return EVENT_DONE;
105   } else {
106     return EVENT_CONT;
107   }
108 }
109 
110 static inline int
write_signal_and_update(int event,UnixNetVConnection * vc)111 write_signal_and_update(int event, UnixNetVConnection *vc)
112 {
113   vc->recursion++;
114   if (vc->write.vio.cont) {
115     vc->write.vio.cont->handleEvent(event, &vc->write.vio);
116   } else {
117     switch (event) {
118     case VC_EVENT_EOS:
119     case VC_EVENT_ERROR:
120     case VC_EVENT_ACTIVE_TIMEOUT:
121     case VC_EVENT_INACTIVITY_TIMEOUT:
122       Debug("inactivity_cop", "event %d: null write.vio cont, closing vc %p", event, vc);
123       Warning("write: Closing orphaned vc %p", vc);
124       vc->closed = 1;
125       break;
126     default:
127       Error("Unexpected event %d for vc %p", event, vc);
128       ink_release_assert(0);
129       break;
130     }
131   }
132   if (!--vc->recursion && vc->closed) {
133     /* BZ  31932 */
134     ink_assert(vc->thread == this_ethread());
135     vc->nh->free_netevent(vc);
136     return EVENT_DONE;
137   } else {
138     return EVENT_CONT;
139   }
140 }
141 
142 static inline int
read_signal_done(int event,NetHandler * nh,UnixNetVConnection * vc)143 read_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
144 {
145   vc->read.enabled = 0;
146   if (read_signal_and_update(event, vc) == EVENT_DONE) {
147     return EVENT_DONE;
148   } else {
149     read_reschedule(nh, vc);
150     return EVENT_CONT;
151   }
152 }
153 
154 static inline int
write_signal_done(int event,NetHandler * nh,UnixNetVConnection * vc)155 write_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
156 {
157   vc->write.enabled = 0;
158   if (write_signal_and_update(event, vc) == EVENT_DONE) {
159     return EVENT_DONE;
160   } else {
161     write_reschedule(nh, vc);
162     return EVENT_CONT;
163   }
164 }
165 
166 static inline int
read_signal_error(NetHandler * nh,UnixNetVConnection * vc,int lerrno)167 read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
168 {
169   vc->lerrno = lerrno;
170   return read_signal_done(VC_EVENT_ERROR, nh, vc);
171 }
172 
173 static inline int
write_signal_error(NetHandler * nh,UnixNetVConnection * vc,int lerrno)174 write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
175 {
176   vc->lerrno = lerrno;
177   return write_signal_done(VC_EVENT_ERROR, nh, vc);
178 }
179 
180 // Read the data for a UnixNetVConnection.
181 // Rescheduling the UnixNetVConnection by moving the VC
182 // onto or off of the ready_list.
183 // Had to wrap this function with net_read_io for SSL.
184 static void
read_from_net(NetHandler * nh,UnixNetVConnection * vc,EThread * thread)185 read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
186 {
187   NetState *s       = &vc->read;
188   ProxyMutex *mutex = thread->mutex.get();
189   int64_t r         = 0;
190 
191   MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
192 
193   if (!lock.is_locked()) {
194     read_reschedule(nh, vc);
195     return;
196   }
197 
198   // It is possible that the closed flag got set from HttpSessionManager in the
199   // global session pool case.  If so, the closed flag should be stable once we get the
200   // s->vio.mutex (the global session pool mutex).
201   if (vc->closed) {
202     vc->nh->free_netevent(vc);
203     return;
204   }
205   // if it is not enabled.
206   if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) {
207     read_disable(nh, vc);
208     return;
209   }
210 
211   MIOBufferAccessor &buf = s->vio.buffer;
212   ink_assert(buf.writer());
213 
214   // if there is nothing to do, disable connection
215   int64_t ntodo = s->vio.ntodo();
216   if (ntodo <= 0) {
217     read_disable(nh, vc);
218     return;
219   }
220   int64_t toread = buf.writer()->write_avail();
221   if (toread > ntodo) {
222     toread = ntodo;
223   }
224 
225   // read data
226   int64_t rattempted = 0, total_read = 0;
227   unsigned niov = 0;
228   IOVec tiovec[NET_MAX_IOV];
229   if (toread) {
230     IOBufferBlock *b = buf.writer()->first_write_block();
231     do {
232       niov       = 0;
233       rattempted = 0;
234       while (b && niov < NET_MAX_IOV) {
235         int64_t a = b->write_avail();
236         if (a > 0) {
237           tiovec[niov].iov_base = b->_end;
238           int64_t togo          = toread - total_read - rattempted;
239           if (a > togo) {
240             a = togo;
241           }
242           tiovec[niov].iov_len = a;
243           rattempted += a;
244           niov++;
245           if (a >= togo) {
246             break;
247           }
248         }
249         b = b->next.get();
250       }
251 
252       ink_assert(niov > 0);
253       ink_assert(niov <= countof(tiovec));
254       r = socketManager.readv(vc->con.fd, &tiovec[0], niov);
255 
256       NET_INCREMENT_DYN_STAT(net_calls_to_read_stat);
257 
258       total_read += rattempted;
259     } while (rattempted && r == rattempted && total_read < toread);
260 
261     // if we have already moved some bytes successfully, summarize in r
262     if (total_read != rattempted) {
263       if (r <= 0) {
264         r = total_read - rattempted;
265       } else {
266         r = total_read - rattempted + r;
267       }
268     }
269     // check for errors
270     if (r <= 0) {
271       if (r == -EAGAIN || r == -ENOTCONN) {
272         NET_INCREMENT_DYN_STAT(net_calls_to_read_nodata_stat);
273         vc->read.triggered = 0;
274         nh->read_ready_list.remove(vc);
275         return;
276       }
277 
278       if (!r || r == -ECONNRESET) {
279         vc->read.triggered = 0;
280         nh->read_ready_list.remove(vc);
281         read_signal_done(VC_EVENT_EOS, nh, vc);
282         return;
283       }
284       vc->read.triggered = 0;
285       read_signal_error(nh, vc, static_cast<int>(-r));
286       return;
287     }
288     NET_SUM_DYN_STAT(net_read_bytes_stat, r);
289 
290     // Add data to buffer and signal continuation.
291     buf.writer()->fill(r);
292 #ifdef DEBUG
293     if (buf.writer()->write_avail() <= 0)
294       Debug("iocore_net", "read_from_net, read buffer full");
295 #endif
296     s->vio.ndone += r;
297     net_activity(vc, thread);
298   } else {
299     r = 0;
300   }
301 
302   // Signal read ready, check if user is not done
303   if (r) {
304     // If there are no more bytes to read, signal read complete
305     ink_assert(ntodo >= 0);
306     if (s->vio.ntodo() <= 0) {
307       read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc);
308       Debug("iocore_net", "read_from_net, read finished - signal done");
309       return;
310     } else {
311       if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT) {
312         return;
313       }
314 
315       // change of lock... don't look at shared variables!
316       if (lock.get_mutex() != s->vio.mutex.get()) {
317         read_reschedule(nh, vc);
318         return;
319       }
320     }
321   }
322   // If here are is no more room, or nothing to do, disable the connection
323   if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
324     read_disable(nh, vc);
325     return;
326   }
327 
328   read_reschedule(nh, vc);
329 }
330 
331 //
332 // Write the data for a UnixNetVConnection.
333 // Rescheduling the UnixNetVConnection when necessary.
334 //
335 void
write_to_net(NetHandler * nh,UnixNetVConnection * vc,EThread * thread)336 write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
337 {
338   ProxyMutex *mutex = thread->mutex.get();
339 
340   NET_INCREMENT_DYN_STAT(net_calls_to_writetonet_stat);
341   NET_INCREMENT_DYN_STAT(net_calls_to_writetonet_afterpoll_stat);
342 
343   write_to_net_io(nh, vc, thread);
344 }
345 
346 void
write_to_net_io(NetHandler * nh,UnixNetVConnection * vc,EThread * thread)347 write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
348 {
349   NetState *s       = &vc->write;
350   ProxyMutex *mutex = thread->mutex.get();
351 
352   MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
353 
354   if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) {
355     write_reschedule(nh, vc);
356     return;
357   }
358 
359   // This function will always return true unless
360   // vc is an SSLNetVConnection.
361   if (!vc->getSSLHandShakeComplete()) {
362     if (vc->trackFirstHandshake()) {
363       // Eat the first write-ready.  Until the TLS handshake is complete,
364       // we should still be under the connect timeout and shouldn't bother
365       // the state machine until the TLS handshake is complete
366       vc->write.triggered = 0;
367       nh->write_ready_list.remove(vc);
368     }
369 
370     int err, ret;
371 
372     if (vc->get_context() == NET_VCONNECTION_OUT) {
373       ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
374     } else {
375       ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
376     }
377 
378     if (ret == EVENT_ERROR) {
379       vc->write.triggered = 0;
380       write_signal_error(nh, vc, err);
381     } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT) {
382       vc->read.triggered = 0;
383       nh->read_ready_list.remove(vc);
384       read_reschedule(nh, vc);
385     } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret == SSL_HANDSHAKE_WANT_WRITE) {
386       vc->write.triggered = 0;
387       nh->write_ready_list.remove(vc);
388       write_reschedule(nh, vc);
389     } else if (ret == EVENT_DONE) {
390       vc->write.triggered = 1;
391       if (vc->write.enabled) {
392         nh->write_ready_list.in_or_enqueue(vc);
393       }
394     } else {
395       write_reschedule(nh, vc);
396     }
397 
398     return;
399   }
400 
401   // If it is not enabled,add to WaitList.
402   if (!s->enabled || s->vio.op != VIO::WRITE) {
403     write_disable(nh, vc);
404     return;
405   }
406 
407   // If there is nothing to do, disable
408   int64_t ntodo = s->vio.ntodo();
409   if (ntodo <= 0) {
410     write_disable(nh, vc);
411     return;
412   }
413 
414   MIOBufferAccessor &buf = s->vio.buffer;
415   ink_assert(buf.writer());
416 
417   // Calculate the amount to write.
418   int64_t towrite = buf.reader()->read_avail();
419   if (towrite > ntodo) {
420     towrite = ntodo;
421   }
422 
423   int signalled = 0;
424 
425   // signal write ready to allow user to fill the buffer
426   if (towrite != ntodo && buf.writer()->write_avail()) {
427     if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
428       return;
429     }
430 
431     ntodo = s->vio.ntodo();
432     if (ntodo <= 0) {
433       write_disable(nh, vc);
434       return;
435     }
436 
437     signalled = 1;
438 
439     // Recalculate amount to write
440     towrite = buf.reader()->read_avail();
441     if (towrite > ntodo) {
442       towrite = ntodo;
443     }
444   }
445 
446   // if there is nothing to do, disable
447   ink_assert(towrite >= 0);
448   if (towrite <= 0) {
449     write_disable(nh, vc);
450     return;
451   }
452 
453   int needs             = 0;
454   int64_t total_written = 0;
455   int64_t r             = vc->load_buffer_and_write(towrite, buf, total_written, needs);
456 
457   if (total_written > 0) {
458     NET_SUM_DYN_STAT(net_write_bytes_stat, total_written);
459     s->vio.ndone += total_written;
460     net_activity(vc, thread);
461   }
462 
463   // A write of 0 makes no sense since we tried to write more than 0.
464   ink_assert(r != 0);
465   // Either we wrote something or got an error.
466   // check for errors
467   if (r < 0) { // if the socket was not ready, add to WaitList
468     if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) {
469       NET_INCREMENT_DYN_STAT(net_calls_to_write_nodata_stat);
470       if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
471         vc->write.triggered = 0;
472         nh->write_ready_list.remove(vc);
473         write_reschedule(nh, vc);
474       }
475 
476       if ((needs & EVENTIO_READ) == EVENTIO_READ) {
477         vc->read.triggered = 0;
478         nh->read_ready_list.remove(vc);
479         read_reschedule(nh, vc);
480       }
481 
482       return;
483     }
484 
485     vc->write.triggered = 0;
486     write_signal_error(nh, vc, static_cast<int>(-total_written));
487     return;
488   } else {                                        // Wrote data.  Finished without error
489     int wbe_event = vc->write_buffer_empty_event; // save so we can clear if needed.
490 
491     // If the empty write buffer trap is set, clear it.
492     if (!(buf.reader()->is_read_avail_more_than(0))) {
493       vc->write_buffer_empty_event = 0;
494     }
495 
496     // If there are no more bytes to write, signal write complete,
497     ink_assert(ntodo >= 0);
498     if (s->vio.ntodo() <= 0) {
499       write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc);
500       return;
501     }
502 
503     int e = 0;
504     if (!signalled) {
505       e = VC_EVENT_WRITE_READY;
506     } else if (wbe_event != vc->write_buffer_empty_event) {
507       // @a signalled means we won't send an event, and the event values differing means we
508       // had a write buffer trap and cleared it, so we need to send it now.
509       e = wbe_event;
510     }
511 
512     if (e) {
513       if (write_signal_and_update(e, vc) != EVENT_CONT) {
514         return;
515       }
516 
517       // change of lock... don't look at shared variables!
518       if (lock.get_mutex() != s->vio.mutex.get()) {
519         write_reschedule(nh, vc);
520         return;
521       }
522     }
523 
524     if ((needs & EVENTIO_READ) == EVENTIO_READ) {
525       read_reschedule(nh, vc);
526     }
527 
528     if (!(buf.reader()->is_read_avail_more_than(0))) {
529       write_disable(nh, vc);
530       return;
531     }
532 
533     if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
534       write_reschedule(nh, vc);
535     }
536 
537     return;
538   }
539 }
540 
541 bool
get_data(int id,void * data)542 UnixNetVConnection::get_data(int id, void *data)
543 {
544   union {
545     TSVIO *vio;
546     void *data;
547     int *n;
548   } ptr;
549 
550   ptr.data = data;
551 
552   switch (id) {
553   case TS_API_DATA_READ_VIO:
554     *ptr.vio = reinterpret_cast<TSVIO>(&this->read.vio);
555     return true;
556   case TS_API_DATA_WRITE_VIO:
557     *ptr.vio = reinterpret_cast<TSVIO>(&this->write.vio);
558     return true;
559   case TS_API_DATA_CLOSED:
560     *ptr.n = this->closed;
561     return true;
562   default:
563     return false;
564   }
565 }
566 
567 int64_t
outstanding()568 UnixNetVConnection::outstanding()
569 {
570   int n;
571   int ret = ioctl(this->get_socket(), TIOCOUTQ, &n);
572   // if there was an error (such as ioctl doesn't support this call on this platform) then
573   // we return -1
574   if (ret == -1) {
575     return ret;
576   }
577   return n;
578 }
579 
580 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)581 UnixNetVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
582 {
583   if (closed && !(c == nullptr && nbytes == 0 && buf == nullptr)) {
584     Error("do_io_read invoked on closed vc %p, cont %p, nbytes %" PRId64 ", buf %p", this, c, nbytes, buf);
585     return nullptr;
586   }
587   read.vio.op        = VIO::READ;
588   read.vio.mutex     = c ? c->mutex : this->mutex;
589   read.vio.cont      = c;
590   read.vio.nbytes    = nbytes;
591   read.vio.ndone     = 0;
592   read.vio.vc_server = (VConnection *)this;
593   if (buf) {
594     read.vio.buffer.writer_for(buf);
595     if (!read.enabled) {
596       read.vio.reenable();
597     }
598   } else {
599     read.vio.buffer.clear();
600     read.enabled = 0;
601   }
602   return &read.vio;
603 }
604 
605 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * reader,bool owner)606 UnixNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *reader, bool owner)
607 {
608   if (closed && !(c == nullptr && nbytes == 0 && reader == nullptr)) {
609     Error("do_io_write invoked on closed vc %p, cont %p, nbytes %" PRId64 ", reader %p", this, c, nbytes, reader);
610     return nullptr;
611   }
612   write.vio.op        = VIO::WRITE;
613   write.vio.mutex     = c ? c->mutex : this->mutex;
614   write.vio.cont      = c;
615   write.vio.nbytes    = nbytes;
616   write.vio.ndone     = 0;
617   write.vio.vc_server = (VConnection *)this;
618   if (reader) {
619     ink_assert(!owner);
620     write.vio.buffer.reader_for(reader);
621     if (nbytes && !write.enabled) {
622       write.vio.reenable();
623     }
624   } else {
625     write.enabled = 0;
626   }
627   return &write.vio;
628 }
629 
630 void
do_io_close(int alerrno)631 UnixNetVConnection::do_io_close(int alerrno /* = -1 */)
632 {
633   // FIXME: the nh must not nullptr.
634   ink_assert(nh);
635 
636   read.enabled  = 0;
637   write.enabled = 0;
638   read.vio.buffer.clear();
639   read.vio.nbytes = 0;
640   read.vio.op     = VIO::NONE;
641   read.vio.cont   = nullptr;
642   write.vio.buffer.clear();
643   write.vio.nbytes = 0;
644   write.vio.op     = VIO::NONE;
645   write.vio.cont   = nullptr;
646 
647   EThread *t        = this_ethread();
648   bool close_inline = !recursion && (!nh || nh->mutex->thread_holding == t);
649 
650   INK_WRITE_MEMORY_BARRIER;
651   if (alerrno && alerrno != -1) {
652     this->lerrno = alerrno;
653   }
654   if (alerrno == -1) {
655     closed = 1;
656   } else {
657     closed = -1;
658   }
659 
660   if (close_inline) {
661     if (nh) {
662       nh->free_netevent(this);
663     } else {
664       free(t);
665     }
666   }
667 }
668 
669 void
do_io_shutdown(ShutdownHowTo_t howto)670 UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t howto)
671 {
672   switch (howto) {
673   case IO_SHUTDOWN_READ:
674     socketManager.shutdown((this)->con.fd, 0);
675     read.enabled = 0;
676     read.vio.buffer.clear();
677     read.vio.nbytes = 0;
678     read.vio.cont   = nullptr;
679     f.shutdown |= NET_VC_SHUTDOWN_READ;
680     break;
681   case IO_SHUTDOWN_WRITE:
682     socketManager.shutdown((this)->con.fd, 1);
683     write.enabled = 0;
684     write.vio.buffer.clear();
685     write.vio.nbytes = 0;
686     write.vio.cont   = nullptr;
687     f.shutdown |= NET_VC_SHUTDOWN_WRITE;
688     break;
689   case IO_SHUTDOWN_READWRITE:
690     socketManager.shutdown((this)->con.fd, 2);
691     read.enabled  = 0;
692     write.enabled = 0;
693     read.vio.buffer.clear();
694     read.vio.nbytes = 0;
695     write.vio.buffer.clear();
696     write.vio.nbytes = 0;
697     read.vio.cont    = nullptr;
698     write.vio.cont   = nullptr;
699     f.shutdown       = NET_VC_SHUTDOWN_READ | NET_VC_SHUTDOWN_WRITE;
700     break;
701   default:
702     ink_assert(!"not reached");
703   }
704 }
705 
706 int
retry_OOB_send(int,Event *)707 OOB_callback::retry_OOB_send(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
708 {
709   ink_assert(mutex->thread_holding == this_ethread());
710   // the NetVC and the OOB_callback share a mutex
711   server_vc->oob_ptr = nullptr;
712   server_vc->send_OOB(server_cont, data, length);
713   delete this;
714   return EVENT_DONE;
715 }
716 
717 void
cancel_OOB()718 UnixNetVConnection::cancel_OOB()
719 {
720   UnixNetVConnection *u = this;
721   if (u->oob_ptr) {
722     if (u->oob_ptr->trigger) {
723       u->oob_ptr->trigger->cancel_action();
724       u->oob_ptr->trigger = nullptr;
725     }
726     delete u->oob_ptr;
727     u->oob_ptr = nullptr;
728   }
729 }
730 
731 Action *
send_OOB(Continuation * cont,char * buf,int len)732 UnixNetVConnection::send_OOB(Continuation *cont, char *buf, int len)
733 {
734   UnixNetVConnection *u = this;
735   ink_assert(len > 0);
736   ink_assert(buf);
737   ink_assert(!u->oob_ptr);
738   int written;
739   ink_assert(cont->mutex->thread_holding == this_ethread());
740   written = socketManager.send(u->con.fd, buf, len, MSG_OOB);
741   if (written == len) {
742     cont->handleEvent(VC_EVENT_OOB_COMPLETE, nullptr);
743     return ACTION_RESULT_DONE;
744   } else if (!written) {
745     cont->handleEvent(VC_EVENT_EOS, nullptr);
746     return ACTION_RESULT_DONE;
747   }
748   if (written > 0 && written < len) {
749     u->oob_ptr          = new OOB_callback(mutex, this, cont, buf + written, len - written);
750     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
751     return u->oob_ptr->trigger;
752   } else {
753     // should be a rare case : taking a new continuation should not be
754     // expensive for this
755     written = -errno;
756     ink_assert(written == -EAGAIN || written == -ENOTCONN);
757     u->oob_ptr          = new OOB_callback(mutex, this, cont, buf, len);
758     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
759     return u->oob_ptr->trigger;
760   }
761 }
762 
763 //
764 // Function used to reenable the VC for reading or
765 // writing.
766 //
767 void
reenable(VIO * vio)768 UnixNetVConnection::reenable(VIO *vio)
769 {
770   if (STATE_FROM_VIO(vio)->enabled) {
771     return;
772   }
773   set_enabled(vio);
774   if (!thread) {
775     return;
776   }
777   EThread *t = vio->mutex->thread_holding;
778   ink_assert(t == this_ethread());
779   ink_release_assert(!closed);
780   if (nh->mutex->thread_holding == t) {
781     if (vio == &read.vio) {
782       ep.modify(EVENTIO_READ);
783       ep.refresh(EVENTIO_READ);
784       if (read.triggered) {
785         nh->read_ready_list.in_or_enqueue(this);
786       } else {
787         nh->read_ready_list.remove(this);
788       }
789     } else {
790       ep.modify(EVENTIO_WRITE);
791       ep.refresh(EVENTIO_WRITE);
792       if (write.triggered) {
793         nh->write_ready_list.in_or_enqueue(this);
794       } else {
795         nh->write_ready_list.remove(this);
796       }
797     }
798   } else {
799     MUTEX_TRY_LOCK(lock, nh->mutex, t);
800     if (!lock.is_locked()) {
801       if (vio == &read.vio) {
802         int isin = ink_atomic_swap(&read.in_enabled_list, 1);
803         if (!isin) {
804           nh->read_enable_list.push(this);
805         }
806       } else {
807         int isin = ink_atomic_swap(&write.in_enabled_list, 1);
808         if (!isin) {
809           nh->write_enable_list.push(this);
810         }
811       }
812       if (likely(nh->thread)) {
813         nh->thread->tail_cb->signalActivity();
814       } else if (nh->trigger_event) {
815         nh->trigger_event->ethread->tail_cb->signalActivity();
816       }
817     } else {
818       if (vio == &read.vio) {
819         ep.modify(EVENTIO_READ);
820         ep.refresh(EVENTIO_READ);
821         if (read.triggered) {
822           nh->read_ready_list.in_or_enqueue(this);
823         } else {
824           nh->read_ready_list.remove(this);
825         }
826       } else {
827         ep.modify(EVENTIO_WRITE);
828         ep.refresh(EVENTIO_WRITE);
829         if (write.triggered) {
830           nh->write_ready_list.in_or_enqueue(this);
831         } else {
832           nh->write_ready_list.remove(this);
833         }
834       }
835     }
836   }
837 }
838 
839 void
reenable_re(VIO * vio)840 UnixNetVConnection::reenable_re(VIO *vio)
841 {
842   if (!thread) {
843     return;
844   }
845   EThread *t = vio->mutex->thread_holding;
846   ink_assert(t == this_ethread());
847   if (nh->mutex->thread_holding == t) {
848     set_enabled(vio);
849     if (vio == &read.vio) {
850       ep.modify(EVENTIO_READ);
851       ep.refresh(EVENTIO_READ);
852       if (read.triggered) {
853         net_read_io(nh, t);
854       } else {
855         nh->read_ready_list.remove(this);
856       }
857     } else {
858       ep.modify(EVENTIO_WRITE);
859       ep.refresh(EVENTIO_WRITE);
860       if (write.triggered) {
861         write_to_net(nh, this, t);
862       } else {
863         nh->write_ready_list.remove(this);
864       }
865     }
866   } else {
867     reenable(vio);
868   }
869 }
870 
UnixNetVConnection()871 UnixNetVConnection::UnixNetVConnection()
872 {
873   SET_HANDLER((NetVConnHandler)&UnixNetVConnection::startEvent);
874 }
875 
876 // Private methods
877 
878 void
set_enabled(VIO * vio)879 UnixNetVConnection::set_enabled(VIO *vio)
880 {
881   ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
882   ink_release_assert(!closed);
883   STATE_FROM_VIO(vio)->enabled = 1;
884   if (!next_inactivity_timeout_at && inactivity_timeout_in) {
885     next_inactivity_timeout_at = Thread::get_hrtime() + inactivity_timeout_in;
886   }
887 }
888 
889 void
net_read_io(NetHandler * nh,EThread * lthread)890 UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
891 {
892   read_from_net(nh, this, lthread);
893 }
894 
895 void
net_write_io(NetHandler * nh,EThread * lthread)896 UnixNetVConnection::net_write_io(NetHandler *nh, EThread *lthread)
897 {
898   write_to_net(nh, this, lthread);
899 }
900 
901 // This code was pulled out of write_to_net so
902 // I could overwrite it for the SSL implementation
903 // (SSL read does not support overlapped i/o)
904 // without duplicating all the code in write_to_net.
905 int64_t
load_buffer_and_write(int64_t towrite,MIOBufferAccessor & buf,int64_t & total_written,int & needs)906 UnixNetVConnection::load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs)
907 {
908   int64_t r                  = 0;
909   int64_t try_to_write       = 0;
910   IOBufferReader *tmp_reader = buf.reader()->clone();
911 
912   do {
913     IOVec tiovec[NET_MAX_IOV];
914     unsigned niov = 0;
915     try_to_write  = 0;
916 
917     while (niov < NET_MAX_IOV) {
918       int64_t wavail = towrite - total_written - try_to_write;
919       int64_t len    = tmp_reader->block_read_avail();
920 
921       // Check if we have done this block.
922       if (len <= 0) {
923         break;
924       }
925 
926       // Check if the amount to write exceeds that in this buffer.
927       if (len > wavail) {
928         len = wavail;
929       }
930 
931       if (len == 0) {
932         break;
933       }
934 
935       // build an iov entry
936       tiovec[niov].iov_len  = len;
937       tiovec[niov].iov_base = tmp_reader->start();
938       niov++;
939 
940       try_to_write += len;
941       tmp_reader->consume(len);
942     }
943 
944     ink_assert(niov > 0);
945     ink_assert(niov <= countof(tiovec));
946 
947     // If the platform doesn't support TCP Fast Open, verify that we
948     // correctly disabled support in the socket option configuration.
949     ink_assert(MSG_FASTOPEN != 0 || this->options.f_tcp_fastopen == false);
950 
951     if (!this->con.is_connected && this->options.f_tcp_fastopen) {
952       struct msghdr msg;
953 
954       ink_zero(msg);
955       msg.msg_name    = const_cast<sockaddr *>(this->get_remote_addr());
956       msg.msg_namelen = ats_ip_size(this->get_remote_addr());
957       msg.msg_iov     = &tiovec[0];
958       msg.msg_iovlen  = niov;
959 
960       NET_INCREMENT_DYN_STAT(net_fastopen_attempts_stat);
961 
962       r = socketManager.sendmsg(con.fd, &msg, MSG_FASTOPEN);
963       if (r < 0) {
964         if (r == -EINPROGRESS || r == -EWOULDBLOCK) {
965           this->con.is_connected = true;
966         }
967       } else {
968         NET_INCREMENT_DYN_STAT(net_fastopen_successes_stat);
969         this->con.is_connected = true;
970       }
971 
972     } else {
973       r = socketManager.writev(con.fd, &tiovec[0], niov);
974     }
975 
976     if (r > 0) {
977       buf.reader()->consume(r);
978       total_written += r;
979     }
980 
981     ProxyMutex *mutex = thread->mutex.get();
982     NET_INCREMENT_DYN_STAT(net_calls_to_write_stat);
983   } while (r == try_to_write && total_written < towrite);
984 
985   tmp_reader->dealloc();
986 
987   needs |= EVENTIO_WRITE;
988 
989   return r;
990 }
991 
992 void
readDisable(NetHandler * nh)993 UnixNetVConnection::readDisable(NetHandler *nh)
994 {
995   read_disable(nh, this);
996 }
997 
998 void
readSignalError(NetHandler * nh,int err)999 UnixNetVConnection::readSignalError(NetHandler *nh, int err)
1000 {
1001   read_signal_error(nh, this, err);
1002 }
1003 
1004 int
readSignalDone(int event,NetHandler * nh)1005 UnixNetVConnection::readSignalDone(int event, NetHandler *nh)
1006 {
1007   return (read_signal_done(event, nh, this));
1008 }
1009 
1010 int
readSignalAndUpdate(int event)1011 UnixNetVConnection::readSignalAndUpdate(int event)
1012 {
1013   return (read_signal_and_update(event, this));
1014 }
1015 
1016 // Interface so SSL inherited class can call some static in-line functions
1017 // without affecting regular net stuff or copying a bunch of code into
1018 // the header files.
1019 void
readReschedule(NetHandler * nh)1020 UnixNetVConnection::readReschedule(NetHandler *nh)
1021 {
1022   read_reschedule(nh, this);
1023 }
1024 
1025 void
writeReschedule(NetHandler * nh)1026 UnixNetVConnection::writeReschedule(NetHandler *nh)
1027 {
1028   write_reschedule(nh, this);
1029 }
1030 
1031 void
netActivity(EThread * lthread)1032 UnixNetVConnection::netActivity(EThread *lthread)
1033 {
1034   net_activity(this, lthread);
1035 }
1036 
1037 int
startEvent(int,Event * e)1038 UnixNetVConnection::startEvent(int /* event ATS_UNUSED */, Event *e)
1039 {
1040   MUTEX_TRY_LOCK(lock, get_NetHandler(e->ethread)->mutex, e->ethread);
1041   if (!lock.is_locked()) {
1042     e->schedule_in(HRTIME_MSECONDS(net_retry_delay));
1043     return EVENT_CONT;
1044   }
1045   if (!action_.cancelled) {
1046     connectUp(e->ethread, NO_FD);
1047   } else {
1048     free(e->ethread);
1049   }
1050   return EVENT_DONE;
1051 }
1052 
1053 int
acceptEvent(int event,Event * e)1054 UnixNetVConnection::acceptEvent(int event, Event *e)
1055 {
1056   EThread *t    = (e == nullptr) ? this_ethread() : e->ethread;
1057   NetHandler *h = get_NetHandler(t);
1058 
1059   thread = t;
1060 
1061   // Send this NetVC to NetHandler and start to polling read & write event.
1062   if (h->startIO(this) < 0) {
1063     free(t);
1064     return EVENT_DONE;
1065   }
1066 
1067   // Switch vc->mutex from NetHandler->mutex to new mutex
1068   mutex = new_ProxyMutex();
1069   SCOPED_MUTEX_LOCK(lock2, mutex, t);
1070 
1071   // Setup a timeout callback handler.
1072   SET_HANDLER((NetVConnHandler)&UnixNetVConnection::mainEvent);
1073 
1074   // Send this netvc to InactivityCop.
1075   nh->startCop(this);
1076 
1077   if (inactivity_timeout_in) {
1078     UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
1079   } else {
1080     set_inactivity_timeout(0);
1081   }
1082 
1083   if (active_timeout_in) {
1084     UnixNetVConnection::set_active_timeout(active_timeout_in);
1085   }
1086   if (action_.continuation->mutex != nullptr) {
1087     MUTEX_TRY_LOCK(lock3, action_.continuation->mutex, t);
1088     if (!lock3.is_locked()) {
1089       ink_release_assert(0);
1090     }
1091     action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
1092   } else {
1093     action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
1094   }
1095   return EVENT_DONE;
1096 }
1097 
1098 //
1099 // The main event for UnixNetVConnections.
1100 // This is called by the Event subsystem to initialize the UnixNetVConnection
1101 // and for active and inactivity timeouts.
1102 //
1103 int
mainEvent(int event,Event * e)1104 UnixNetVConnection::mainEvent(int event, Event *e)
1105 {
1106   ink_assert(event == VC_EVENT_ACTIVE_TIMEOUT || event == VC_EVENT_INACTIVITY_TIMEOUT);
1107   ink_assert(thread == this_ethread());
1108 
1109   MUTEX_TRY_LOCK(hlock, get_NetHandler(thread)->mutex, e->ethread);
1110   MUTEX_TRY_LOCK(rlock, read.vio.mutex ? read.vio.mutex : e->ethread->mutex, e->ethread);
1111   MUTEX_TRY_LOCK(wlock, write.vio.mutex ? write.vio.mutex : e->ethread->mutex, e->ethread);
1112 
1113   if (!hlock.is_locked() || !rlock.is_locked() || !wlock.is_locked() ||
1114       (read.vio.mutex && rlock.get_mutex() != read.vio.mutex.get()) ||
1115       (write.vio.mutex && wlock.get_mutex() != write.vio.mutex.get())) {
1116     return EVENT_CONT;
1117   }
1118 
1119   if (e->cancelled) {
1120     return EVENT_DONE;
1121   }
1122 
1123   int signal_event;
1124   Continuation *reader_cont     = nullptr;
1125   Continuation *writer_cont     = nullptr;
1126   ink_hrtime *signal_timeout_at = nullptr;
1127   Event *t                      = nullptr;
1128   Event **signal_timeout        = &t;
1129 
1130   switch (event) {
1131   // Treating immediate as inactivity timeout for any
1132   // deprecated remaining immediates. The previous code was using EVENT_INTERVAL
1133   // and EVENT_IMMEDIATE to distinguish active and inactive timeouts.
1134   // There appears to be some stray EVENT_IMMEDIATEs floating around
1135   case EVENT_IMMEDIATE:
1136   case VC_EVENT_INACTIVITY_TIMEOUT:
1137     signal_event      = VC_EVENT_INACTIVITY_TIMEOUT;
1138     signal_timeout_at = &next_inactivity_timeout_at;
1139     break;
1140   case VC_EVENT_ACTIVE_TIMEOUT:
1141     signal_event      = VC_EVENT_ACTIVE_TIMEOUT;
1142     signal_timeout_at = &next_activity_timeout_at;
1143     break;
1144   default:
1145     ink_release_assert(!"BUG: unexpected event in UnixNetVConnection::mainEvent");
1146     break;
1147   }
1148 
1149   *signal_timeout    = nullptr;
1150   *signal_timeout_at = 0;
1151   writer_cont        = write.vio.cont;
1152 
1153   if (closed) {
1154     nh->free_netevent(this);
1155     return EVENT_DONE;
1156   }
1157 
1158   if (read.vio.op == VIO::READ && !(f.shutdown & NET_VC_SHUTDOWN_READ)) {
1159     reader_cont = read.vio.cont;
1160     if (read_signal_and_update(signal_event, this) == EVENT_DONE) {
1161       return EVENT_DONE;
1162     }
1163   }
1164 
1165   if (!*signal_timeout && !*signal_timeout_at && !closed && write.vio.op == VIO::WRITE && !(f.shutdown & NET_VC_SHUTDOWN_WRITE) &&
1166       reader_cont != write.vio.cont && writer_cont == write.vio.cont) {
1167     if (write_signal_and_update(signal_event, this) == EVENT_DONE) {
1168       return EVENT_DONE;
1169     }
1170   }
1171   return EVENT_DONE;
1172 }
1173 
1174 int
populate(Connection & con_in,Continuation * c,void * arg)1175 UnixNetVConnection::populate(Connection &con_in, Continuation *c, void *arg)
1176 {
1177   this->con.move(con_in);
1178   this->mutex  = c->mutex;
1179   this->thread = this_ethread();
1180 
1181   EThread *t    = this_ethread();
1182   NetHandler *h = get_NetHandler(t);
1183 
1184   MUTEX_TRY_LOCK(lock, h->mutex, t);
1185   if (!lock.is_locked()) {
1186     // Clean up and go home
1187     return EVENT_ERROR;
1188   }
1189 
1190   if (h->startIO(this) < 0) {
1191     Debug("iocore_net", "populate : Failed to add to epoll list");
1192     return EVENT_ERROR;
1193   }
1194 
1195   ink_assert(this->nh != nullptr);
1196   SET_HANDLER(&UnixNetVConnection::mainEvent);
1197   this->nh->startCop(this);
1198   ink_assert(this->con.fd != NO_FD);
1199   return EVENT_DONE;
1200 }
1201 
1202 int
connectUp(EThread * t,int fd)1203 UnixNetVConnection::connectUp(EThread *t, int fd)
1204 {
1205   ink_assert(get_NetHandler(t)->mutex->thread_holding == this_ethread());
1206   int res;
1207 
1208   thread = t;
1209   if (check_net_throttle(CONNECT)) {
1210     check_throttle_warning(CONNECT);
1211     res = -ENET_THROTTLING;
1212     NET_INCREMENT_DYN_STAT(net_connections_throttled_out_stat);
1213     goto fail;
1214   }
1215 
1216   // Force family to agree with remote (server) address.
1217   options.ip_family = con.addr.sa.sa_family;
1218 
1219   //
1220   // Initialize this UnixNetVConnection
1221   //
1222   if (is_debug_tag_set("iocore_net")) {
1223     char addrbuf[INET6_ADDRSTRLEN];
1224     Debug("iocore_net", "connectUp:: local_addr=%s:%d [%s]",
1225           options.local_ip.isValid() ? options.local_ip.toString(addrbuf, sizeof(addrbuf)) : "*", options.local_port,
1226           NetVCOptions::toString(options.addr_binding));
1227   }
1228 
1229   // If this is getting called from the TS API, then we are wiring up a file descriptor
1230   // provided by the caller. In that case, we know that the socket is already connected.
1231   if (fd == NO_FD) {
1232     // Due to multi-threads system, the fd returned from con.open() may exceed the limitation of check_net_throttle().
1233     res = con.open(options);
1234     if (res != 0) {
1235       goto fail;
1236     }
1237   } else {
1238     int len = sizeof(con.sock_type);
1239 
1240     // This call will fail if fd is not a socket (e.g. it is a
1241     // eventfd or a regular file fd.  That is ok, because sock_type
1242     // is only used when setting up the socket.
1243     safe_getsockopt(fd, SOL_SOCKET, SO_TYPE, reinterpret_cast<char *>(&con.sock_type), &len);
1244     safe_nonblocking(fd);
1245     con.fd           = fd;
1246     con.is_connected = true;
1247     con.is_bound     = true;
1248   }
1249 
1250   // Must connect after EventIO::Start() to avoid a race condition
1251   // when edge triggering is used.
1252   if ((res = get_NetHandler(t)->startIO(this)) < 0) {
1253     goto fail;
1254   }
1255 
1256   if (fd == NO_FD) {
1257     res = con.connect(nullptr, options);
1258     if (res != 0) {
1259       // fast stopIO
1260       nh = nullptr;
1261       goto fail;
1262     }
1263   }
1264 
1265   // Did not fail, increment connection count
1266   NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
1267   ink_release_assert(con.fd != NO_FD);
1268 
1269   // Setup a timeout callback handler.
1270   SET_HANDLER(&UnixNetVConnection::mainEvent);
1271   // Send this netvc to InactivityCop.
1272   nh->startCop(this);
1273 
1274   set_inactivity_timeout(0);
1275   ink_assert(!active_timeout_in);
1276   this->set_local_addr();
1277   action_.continuation->handleEvent(NET_EVENT_OPEN, this);
1278   return CONNECT_SUCCESS;
1279 
1280 fail:
1281   lerrno = -res;
1282   action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)static_cast<intptr_t>(res));
1283   if (fd != NO_FD) {
1284     con.fd = NO_FD;
1285   }
1286   free(t);
1287   return CONNECT_FAILURE;
1288 }
1289 
1290 void
clear()1291 UnixNetVConnection::clear()
1292 {
1293   // clear timeout variables
1294   next_inactivity_timeout_at = 0;
1295   next_activity_timeout_at   = 0;
1296   inactivity_timeout_in      = 0;
1297   active_timeout_in          = 0;
1298 
1299   // clear variables for reuse
1300   this->mutex.clear();
1301   action_.mutex.clear();
1302   got_remote_addr = false;
1303   got_local_addr  = false;
1304   attributes      = 0;
1305   read.vio.mutex.clear();
1306   write.vio.mutex.clear();
1307   flags               = 0;
1308   nh                  = nullptr;
1309   read.triggered      = 0;
1310   write.triggered     = 0;
1311   read.enabled        = 0;
1312   write.enabled       = 0;
1313   read.vio.cont       = nullptr;
1314   write.vio.cont      = nullptr;
1315   read.vio.vc_server  = nullptr;
1316   write.vio.vc_server = nullptr;
1317   options.reset();
1318   closed        = 0;
1319   netvc_context = NET_VCONNECTION_UNSET;
1320   ink_assert(!read.ready_link.prev && !read.ready_link.next);
1321   ink_assert(!read.enable_link.next);
1322   ink_assert(!write.ready_link.prev && !write.ready_link.next);
1323   ink_assert(!write.enable_link.next);
1324   ink_assert(!link.next && !link.prev);
1325 }
1326 
1327 void
free(EThread * t)1328 UnixNetVConnection::free(EThread *t)
1329 {
1330   ink_release_assert(t == this_ethread());
1331 
1332   // cancel OOB
1333   cancel_OOB();
1334   // close socket fd
1335   if (con.fd != NO_FD) {
1336     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
1337   }
1338   con.close();
1339 
1340   clear();
1341   SET_CONTINUATION_HANDLER(this, (NetVConnHandler)&UnixNetVConnection::startEvent);
1342   ink_assert(con.fd == NO_FD);
1343   ink_assert(t == this_ethread());
1344 
1345   if (from_accept_thread) {
1346     netVCAllocator.free(this);
1347   } else {
1348     THREAD_FREE(this, netVCAllocator, t);
1349   }
1350 }
1351 
1352 void
apply_options()1353 UnixNetVConnection::apply_options()
1354 {
1355   con.apply_options(options);
1356 }
1357 
1358 TS_INLINE void
set_inactivity_timeout(ink_hrtime timeout_in)1359 UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout_in)
1360 {
1361   Debug("socket", "Set inactive timeout=%" PRId64 ", for NetVC=%p", timeout_in, this);
1362   inactivity_timeout_in      = timeout_in;
1363   next_inactivity_timeout_at = (timeout_in > 0) ? Thread::get_hrtime() + inactivity_timeout_in : 0;
1364 }
1365 
1366 TS_INLINE void
set_default_inactivity_timeout(ink_hrtime timeout_in)1367 UnixNetVConnection::set_default_inactivity_timeout(ink_hrtime timeout_in)
1368 {
1369   Debug("socket", "Set default inactive timeout=%" PRId64 ", for NetVC=%p", timeout_in, this);
1370   inactivity_timeout_in      = 0;
1371   default_inactivity_timeout = true;
1372   next_inactivity_timeout_at = Thread::get_hrtime() + timeout_in;
1373 }
1374 
1375 TS_INLINE bool
is_default_inactivity_timeout()1376 UnixNetVConnection::is_default_inactivity_timeout()
1377 {
1378   return (default_inactivity_timeout && inactivity_timeout_in == 0);
1379 }
1380 
1381 /*
1382  * Close down the current netVC.  Save aside the socket and SSL information
1383  * and create new netVC in the current thread/netVC
1384  */
1385 UnixNetVConnection *
migrateToCurrentThread(Continuation * cont,EThread * t)1386 UnixNetVConnection::migrateToCurrentThread(Continuation *cont, EThread *t)
1387 {
1388   NetHandler *client_nh = get_NetHandler(t);
1389   ink_assert(client_nh);
1390   if (this->nh == client_nh) {
1391     // We're already there!
1392     return this;
1393   }
1394 
1395   Connection hold_con;
1396   hold_con.move(this->con);
1397   SSLNetVConnection *sslvc = dynamic_cast<SSLNetVConnection *>(this);
1398 
1399   SSL *save_ssl = nullptr;
1400   if (sslvc) {
1401     save_ssl = sslvc->ssl;
1402     SSLNetVCDetach(sslvc->ssl);
1403     sslvc->ssl = nullptr;
1404   }
1405 
1406   // Do_io_close will signal the VC to be freed on the original thread
1407   // Since we moved the con context, the fd will not be closed
1408   // Go ahead and remove the fd from the original thread's epoll structure, so it is not
1409   // processed on two threads simultaneously
1410   this->ep.stop();
1411 
1412   // Create new VC:
1413   UnixNetVConnection *netvc = nullptr;
1414   if (save_ssl) {
1415     sslvc = static_cast<SSLNetVConnection *>(sslNetProcessor.allocate_vc(t));
1416     if (sslvc->populate(hold_con, cont, save_ssl) != EVENT_DONE) {
1417       sslvc->do_io_close();
1418       sslvc = nullptr;
1419     } else {
1420       // Update the SSL fields
1421       sslvc->set_context(get_context());
1422     }
1423     netvc = sslvc;
1424   } else {
1425     netvc = static_cast<UnixNetVConnection *>(netProcessor.allocate_vc(t));
1426     if (netvc->populate(hold_con, cont, save_ssl) != EVENT_DONE) {
1427       netvc->do_io_close();
1428       netvc = nullptr;
1429     } else {
1430       netvc->set_context(get_context());
1431     }
1432   }
1433   if (netvc) {
1434     netvc->options = this->options;
1435   }
1436   // Do not mark this closed until the end so it does not get freed by the other thread too soon
1437   this->do_io_close();
1438   return netvc;
1439 }
1440 
1441 void
add_to_keep_alive_queue()1442 UnixNetVConnection::add_to_keep_alive_queue()
1443 {
1444   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1445   if (lock.is_locked()) {
1446     nh->add_to_keep_alive_queue(this);
1447   } else {
1448     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue.");
1449   }
1450 }
1451 
1452 void
remove_from_keep_alive_queue()1453 UnixNetVConnection::remove_from_keep_alive_queue()
1454 {
1455   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1456   if (lock.is_locked()) {
1457     nh->remove_from_keep_alive_queue(this);
1458   } else {
1459     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue.");
1460   }
1461 }
1462 
1463 bool
add_to_active_queue()1464 UnixNetVConnection::add_to_active_queue()
1465 {
1466   bool result = false;
1467 
1468   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1469   if (lock.is_locked()) {
1470     result = nh->add_to_active_queue(this);
1471   } else {
1472     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on active_queue.");
1473   }
1474   return result;
1475 }
1476 
1477 void
remove_from_active_queue()1478 UnixNetVConnection::remove_from_active_queue()
1479 {
1480   MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
1481   if (lock.is_locked()) {
1482     nh->remove_from_active_queue(this);
1483   } else {
1484     ink_release_assert(!"BUG: It must have acquired the NetHandler's lock before doing anything on active_queue.");
1485   }
1486 }
1487 
1488 int
populate_protocol(std::string_view * results,int n) const1489 UnixNetVConnection::populate_protocol(std::string_view *results, int n) const
1490 {
1491   int retval = 0;
1492   if (n > retval) {
1493     if (!(results[retval] = options.get_proto_string()).empty()) {
1494       ++retval;
1495     }
1496     if (n > retval) {
1497       if (!(results[retval] = options.get_family_string()).empty()) {
1498         ++retval;
1499       }
1500     }
1501   }
1502   return retval;
1503 }
1504 
1505 const char *
protocol_contains(std::string_view tag) const1506 UnixNetVConnection::protocol_contains(std::string_view tag) const
1507 {
1508   std::string_view retval = options.get_proto_string();
1509   if (!IsNoCasePrefixOf(tag, retval)) { // didn't match IP level, check TCP level
1510     retval = options.get_family_string();
1511     if (!IsNoCasePrefixOf(tag, retval)) { // no match here either, return empty.
1512       ink_zero(retval);
1513     }
1514   }
1515   return retval.data();
1516 }
1517 
1518 int
set_tcp_congestion_control(int side)1519 UnixNetVConnection::set_tcp_congestion_control(int side)
1520 {
1521 #ifdef TCP_CONGESTION
1522   std::string_view ccp;
1523 
1524   if (side == CLIENT_SIDE) {
1525     ccp = net_ccp_in;
1526   } else {
1527     ccp = net_ccp_out;
1528   }
1529 
1530   if (!ccp.empty()) {
1531     int rv = setsockopt(con.fd, IPPROTO_TCP, TCP_CONGESTION, reinterpret_cast<const void *>(ccp.data()), ccp.size());
1532 
1533     if (rv < 0) {
1534       Error("Unable to set TCP congestion control on socket %d to \"%s\", errno=%d (%s)", con.fd, ccp.data(), errno,
1535             strerror(errno));
1536     } else {
1537       Debug("socket", "Setting TCP congestion control on socket [%d] to \"%s\" -> %d", con.fd, ccp.data(), rv);
1538     }
1539     return 0;
1540   }
1541   return -1;
1542 #else
1543   Debug("socket", "Setting TCP congestion control is not supported on this platform.");
1544   return -1;
1545 #endif
1546 }
1547