xref: /trafficserver/iocore/net/UnixUDPNet.cc (revision 4cfd5a73)
1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26   UnixUDPNet.cc
27   UDPNet implementation
28 
29 
30  ****************************************************************************/
31 
32 #if defined(darwin)
33 /* This is for IPV6_PKTINFO and IPV6_RECVPKTINFO */
34 #define __APPLE_USE_RFC_3542
35 #endif
36 
37 #include "P_Net.h"
38 #include "P_UDPNet.h"
39 
40 using UDPNetContHandler = int (UDPNetHandler::*)(int, void *);
41 
42 inkcoreapi ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator");
43 EventType ET_UDP;
44 
45 //
46 // Global Data
47 //
48 
49 UDPNetProcessorInternal udpNetInternal;
50 UDPNetProcessor &udpNet = udpNetInternal;
51 
52 int32_t g_udp_periodicCleanupSlots;
53 int32_t g_udp_periodicFreeCancelledPkts;
54 int32_t g_udp_numSendRetries;
55 
56 //
57 // Public functions
58 // See header for documentation
59 //
60 int G_bwGrapherFd;
61 sockaddr_in6 G_bwGrapherLoc;
62 
63 void
initialize_thread_for_udp_net(EThread * thread)64 initialize_thread_for_udp_net(EThread *thread)
65 {
66   UDPNetHandler *nh = get_UDPNetHandler(thread);
67 
68   new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler;
69   new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread))) PollCont(thread->mutex);
70   // The UDPNetHandler cannot be accessed across EThreads.
71   // Because the UDPNetHandler should be called back immediately after UDPPollCont.
72   nh->mutex  = thread->mutex.get();
73   nh->thread = thread;
74 
75   PollCont *upc       = get_UDPPollCont(thread);
76   PollDescriptor *upd = upc->pollDescriptor;
77   // due to ET_UDP is really simple, it should sleep for a long time
78   // TODO: fixed size
79   upc->poll_timeout = 100;
80   // This variable controls how often we cleanup the cancelled packets.
81   // If it is set to 0, then cleanup never occurs.
82   REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.free_cancelled_pkts_sec");
83 
84   // This variable controls how many "slots" of the udp calendar queue we cleanup.
85   // If it is set to 0, then cleanup never occurs.  This value makes sense
86   // only if the above variable is set.
87   REC_ReadConfigInt32(g_udp_periodicCleanupSlots, "proxy.config.udp.periodic_cleanup");
88 
89   // UDP sends can fail with errno=EAGAIN.  This variable determines the # of
90   // times the UDP thread retries before giving up.  Set to 0 to keep trying forever.
91   REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries");
92   g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries;
93 
94   thread->set_tail_handler(nh);
95   thread->ep = static_cast<EventIO *>(ats_malloc(sizeof(EventIO)));
96   new (thread->ep) EventIO();
97   thread->ep->type = EVENTIO_ASYNC_SIGNAL;
98 #if HAVE_EVENTFD
99   thread->ep->start(upd, thread->evfd, nullptr, EVENTIO_READ);
100 #else
101   thread->ep->start(upd, thread->evpipe[0], nullptr, EVENTIO_READ);
102 #endif
103 }
104 
105 int
start(int n_upd_threads,size_t stacksize)106 UDPNetProcessorInternal::start(int n_upd_threads, size_t stacksize)
107 {
108   if (n_upd_threads < 1) {
109     return -1;
110   }
111 
112   pollCont_offset      = eventProcessor.allocate(sizeof(PollCont));
113   udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
114 
115   ET_UDP = eventProcessor.register_event_type("ET_UDP");
116   eventProcessor.schedule_spawn(&initialize_thread_for_udp_net, ET_UDP);
117   eventProcessor.spawn_event_threads(ET_UDP, n_upd_threads, stacksize);
118 
119   return 0;
120 }
121 
122 void
udp_read_from_net(UDPNetHandler * nh,UDPConnection * xuc)123 UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc)
124 {
125   UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
126 
127   // receive packet and queue onto UDPConnection.
128   // don't call back connection at this time.
129   int64_t r;
130   int iters         = 0;
131   unsigned max_niov = 32;
132 
133   struct msghdr msg;
134   Ptr<IOBufferBlock> chain, next_chain;
135   struct iovec tiovec[max_niov];
136   int64_t size_index  = BUFFER_SIZE_INDEX_2K;
137   int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index);
138   // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes.
139   // Because the 'UDP Length' is type of uint16_t defined in RFC 768.
140   // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes.
141   do {
142     // create IOBufferBlock chain to receive data
143     unsigned int niov;
144     IOBufferBlock *b, *last;
145 
146     // build struct iov
147     // reuse the block in chain if available
148     b    = chain.get();
149     last = nullptr;
150     for (niov = 0; niov < max_niov; niov++) {
151       if (b == nullptr) {
152         b = new_IOBufferBlock();
153         b->alloc(size_index);
154         if (last == nullptr) {
155           chain = b;
156         } else {
157           last->next = b;
158         }
159       }
160 
161       tiovec[niov].iov_base = b->buf();
162       tiovec[niov].iov_len  = b->block_size();
163 
164       last = b;
165       b    = b->next.get();
166     }
167 
168     // build struct msghdr
169     sockaddr_in6 fromaddr;
170     sockaddr_in6 toaddr;
171     int toaddr_len = sizeof(toaddr);
172     char *cbuf[1024];
173     msg.msg_name       = &fromaddr;
174     msg.msg_namelen    = sizeof(fromaddr);
175     msg.msg_iov        = tiovec;
176     msg.msg_iovlen     = niov;
177     msg.msg_control    = cbuf;
178     msg.msg_controllen = sizeof(cbuf);
179 
180     // receive data by recvmsg
181     r = socketManager.recvmsg(uc->getFd(), &msg, 0);
182     if (r <= 0) {
183       // error
184       break;
185     }
186 
187     // truncated check
188     if (msg.msg_flags & MSG_TRUNC) {
189       Debug("udp-read", "The UDP packet is truncated");
190     }
191 
192     // fill the IOBufferBlock chain
193     int64_t saved = r;
194     b             = chain.get();
195     while (b && saved > 0) {
196       if (saved > buffer_size) {
197         b->fill(buffer_size);
198         saved -= buffer_size;
199         b = b->next.get();
200       } else {
201         b->fill(saved);
202         saved      = 0;
203         next_chain = b->next.get();
204         b->next    = nullptr;
205       }
206     }
207 
208     safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr *>(&toaddr), &toaddr_len);
209     for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
210       switch (cmsg->cmsg_type) {
211 #ifdef IP_PKTINFO
212       case IP_PKTINFO:
213         if (cmsg->cmsg_level == IPPROTO_IP) {
214           struct in_pktinfo *pktinfo                                = reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg));
215           reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr;
216         }
217         break;
218 #endif
219 #ifdef IP_RECVDSTADDR
220       case IP_RECVDSTADDR:
221         if (cmsg->cmsg_level == IPPROTO_IP) {
222           struct in_addr *addr                                      = reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg));
223           reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = addr->s_addr;
224         }
225         break;
226 #endif
227 #if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
228       case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too
229         if (cmsg->cmsg_level == IPPROTO_IPV6) {
230           struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo *>(CMSG_DATA(cmsg));
231           memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16);
232         }
233         break;
234 #endif
235       }
236     }
237 
238     // create packet
239     UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain);
240     p->setConnection(uc);
241     // queue onto the UDPConnection
242     uc->inQueue.push((UDPPacketInternal *)p);
243 
244     // reload the unused block
245     chain      = next_chain;
246     next_chain = nullptr;
247     iters++;
248   } while (r > 0);
249   if (iters >= 1) {
250     Debug("udp-read", "read %d at a time", iters);
251   }
252   // if not already on to-be-called-back queue, then add it.
253   if (!uc->onCallbackQueue) {
254     ink_assert(uc->callback_link.next == nullptr);
255     ink_assert(uc->callback_link.prev == nullptr);
256     uc->AddRef();
257     nh->udp_callbacks.enqueue(uc);
258     uc->onCallbackQueue = 1;
259   }
260 }
261 
262 int
udp_callback(UDPNetHandler * nh,UDPConnection * xuc,EThread * thread)263 UDPNetProcessorInternal::udp_callback(UDPNetHandler *nh, UDPConnection *xuc, EThread *thread)
264 {
265   (void)nh;
266   UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
267 
268   if (uc->continuation && uc->mutex) {
269     MUTEX_TRY_LOCK(lock, uc->mutex, thread);
270     if (!lock.is_locked()) {
271       return 1;
272     }
273     uc->AddRef();
274     uc->callbackHandler(0, nullptr);
275     return 0;
276   } else {
277     ink_assert(!"doesn't reach here");
278     if (!uc->callbackAction) {
279       uc->AddRef();
280       uc->callbackAction = eventProcessor.schedule_imm(uc);
281     }
282     return 0;
283   }
284 }
285 
286 #define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef
287 
288 // cheesy implementation of a asynchronous read and callback for Unix
289 class UDPReadContinuation : public Continuation
290 {
291 public:
292   UDPReadContinuation(Event *completionToken);
293   UDPReadContinuation();
294   ~UDPReadContinuation() override;
295   inline void free();
296   inline void init_token(Event *completionToken);
297   inline void init_read(int fd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen);
298 
299   void
set_timer(int seconds)300   set_timer(int seconds)
301   {
302     timeout_interval = HRTIME_SECONDS(seconds);
303   }
304 
305   void cancel();
306   int readPollEvent(int event, Event *e);
307 
308   Action *
getAction()309   getAction()
310   {
311     return event;
312   }
313 
314   void setupPollDescriptor();
315 
316 private:
317   Event *event = UNINITIALIZED_EVENT_PTR; // the completion event token created
318   // on behalf of the client
319   Ptr<IOBufferBlock> readbuf{nullptr};
320   int readlen                   = 0;
321   struct sockaddr_in6 *fromaddr = nullptr;
322   socklen_t *fromaddrlen        = nullptr;
323   int fd                        = NO_FD; // fd we are reading from
324   int ifd                       = NO_FD; // poll fd index
325   ink_hrtime period             = 0;     // polling period
326   ink_hrtime elapsed_time       = 0;
327   ink_hrtime timeout_interval   = 0;
328 };
329 
330 ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator");
331 
UDPReadContinuation(Event * completionToken)332 UDPReadContinuation::UDPReadContinuation(Event *completionToken)
333   : Continuation(nullptr),
334     event(completionToken),
335     readbuf(nullptr),
336 
337     fd(-1),
338     ifd(-1)
339 
340 {
341   if (completionToken->continuation) {
342     this->mutex = completionToken->continuation->mutex;
343   } else {
344     this->mutex = new_ProxyMutex();
345   }
346 }
347 
UDPReadContinuation()348 UDPReadContinuation::UDPReadContinuation() : Continuation(nullptr) {}
349 
350 inline void
free()351 UDPReadContinuation::free()
352 {
353   ink_assert(event != nullptr);
354   completionUtil::destroy(event);
355   event            = nullptr;
356   readbuf          = nullptr;
357   readlen          = 0;
358   fromaddrlen      = nullptr;
359   fd               = -1;
360   ifd              = -1;
361   period           = 0;
362   elapsed_time     = 0;
363   timeout_interval = 0;
364   mutex            = nullptr;
365   udpReadContAllocator.free(this);
366 }
367 
368 inline void
init_token(Event * completionToken)369 UDPReadContinuation::init_token(Event *completionToken)
370 {
371   if (completionToken->continuation) {
372     this->mutex = completionToken->continuation->mutex;
373   } else {
374     this->mutex = new_ProxyMutex();
375   }
376   event = completionToken;
377 }
378 
379 inline void
init_read(int rfd,IOBufferBlock * buf,int len,struct sockaddr * fromaddr_,socklen_t * fromaddrlen_)380 UDPReadContinuation::init_read(int rfd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr_, socklen_t *fromaddrlen_)
381 {
382   ink_assert(rfd >= 0 && buf != nullptr && fromaddr_ != nullptr && fromaddrlen_ != nullptr);
383   fd          = rfd;
384   readbuf     = buf;
385   readlen     = len;
386   fromaddr    = ats_ip6_cast(fromaddr_);
387   fromaddrlen = fromaddrlen_;
388   SET_HANDLER(&UDPReadContinuation::readPollEvent);
389   period = -HRTIME_MSECONDS(net_event_period);
390   setupPollDescriptor();
391   this_ethread()->schedule_every(this, period);
392 }
393 
~UDPReadContinuation()394 UDPReadContinuation::~UDPReadContinuation()
395 {
396   if (event != UNINITIALIZED_EVENT_PTR) {
397     ink_assert(event != nullptr);
398     completionUtil::destroy(event);
399     event = nullptr;
400   }
401 }
402 
403 void
cancel()404 UDPReadContinuation::cancel()
405 {
406   // I don't think this actually cancels it correctly right now.
407   event->cancel();
408 }
409 
410 void
setupPollDescriptor()411 UDPReadContinuation::setupPollDescriptor()
412 {
413 #if TS_USE_EPOLL
414   Pollfd *pfd;
415   EThread *et  = (EThread *)this_thread();
416   PollCont *pc = get_PollCont(et);
417   if (pc->nextPollDescriptor == nullptr) {
418     pc->nextPollDescriptor = new PollDescriptor();
419   }
420   pfd     = pc->nextPollDescriptor->alloc();
421   pfd->fd = fd;
422   ifd     = pfd - pc->nextPollDescriptor->pfd;
423   ink_assert(pc->nextPollDescriptor->nfds > ifd);
424   pfd->events  = POLLIN;
425   pfd->revents = 0;
426 #endif
427 }
428 
429 int
readPollEvent(int event_,Event * e)430 UDPReadContinuation::readPollEvent(int event_, Event *e)
431 {
432   (void)event_;
433   (void)e;
434 
435   // PollCont *pc = get_PollCont(e->ethread);
436   Continuation *c;
437 
438   if (event->cancelled) {
439     e->cancel();
440     free();
441     return EVENT_DONE;
442   }
443 
444   // See if the request has timed out
445   if (timeout_interval) {
446     elapsed_time += -period;
447     if (elapsed_time >= timeout_interval) {
448       c = completionUtil::getContinuation(event);
449       // TODO: Should we deal with the return code?
450       c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
451       e->cancel();
452       free();
453       return EVENT_DONE;
454     }
455   }
456 
457   c = completionUtil::getContinuation(event);
458   // do read
459   socklen_t tmp_fromlen = *fromaddrlen;
460   int rlen              = socketManager.recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen);
461 
462   completionUtil::setThread(event, e->ethread);
463   // call back user with their event
464   if (rlen > 0) {
465     // do callback if read is successful
466     *fromaddrlen = tmp_fromlen;
467     completionUtil::setInfo(event, fd, readbuf, rlen, errno);
468     readbuf->fill(rlen);
469     // TODO: Should we deal with the return code?
470     c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
471     e->cancel();
472     free();
473 
474     return EVENT_DONE;
475   } else if (rlen < 0 && rlen != -EAGAIN) {
476     // signal error.
477     *fromaddrlen = tmp_fromlen;
478     completionUtil::setInfo(event, fd, readbuf, rlen, errno);
479     c = completionUtil::getContinuation(event);
480     // TODO: Should we deal with the return code?
481     c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
482     e->cancel();
483     free();
484 
485     return EVENT_DONE;
486   } else {
487     completionUtil::setThread(event, nullptr);
488   }
489 
490   if (event->cancelled) {
491     e->cancel();
492     free();
493 
494     return EVENT_DONE;
495   }
496   // reestablish poll
497   setupPollDescriptor();
498 
499   return EVENT_CONT;
500 }
501 
502 /* recvfrom:
503  * Unix:
504  *   assert(buf->write_avail() >= len);
505  *   *actual_len = recvfrom(fd,addr,buf->end(),len)
506  *   if successful then
507  *      buf->fill(*actual_len);
508  *	    return ACTION_RESULT_DONE
509  *   else if nothing read
510  *      *actual_len is 0
511  *      create "UDP read continuation" C with 'cont's lock
512  *         set user callback to 'cont'
513  *      return C's action.
514  *   else
515  *      return error;
516  */
517 Action *
recvfrom_re(Continuation * cont,void * token,int fd,struct sockaddr * fromaddr,socklen_t * fromaddrlen,IOBufferBlock * buf,int len,bool useReadCont,int timeout)518 UDPNetProcessor::recvfrom_re(Continuation *cont, void *token, int fd, struct sockaddr *fromaddr, socklen_t *fromaddrlen,
519                              IOBufferBlock *buf, int len, bool useReadCont, int timeout)
520 {
521   (void)useReadCont;
522   ink_assert(buf->write_avail() >= len);
523   int actual;
524   Event *event = completionUtil::create();
525 
526   completionUtil::setContinuation(event, cont);
527   completionUtil::setHandle(event, token);
528   actual = socketManager.recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen);
529 
530   if (actual > 0) {
531     completionUtil::setThread(event, this_ethread());
532     completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno);
533     buf->fill(actual);
534     cont->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
535     completionUtil::destroy(event);
536     return ACTION_RESULT_DONE;
537   } else if (actual == 0 || (actual < 0 && actual == -EAGAIN)) {
538     UDPReadContinuation *c = udpReadContAllocator.alloc();
539     c->init_token(event);
540     c->init_read(fd, buf, len, fromaddr, fromaddrlen);
541     if (timeout) {
542       c->set_timer(timeout);
543     }
544     return event;
545   } else {
546     completionUtil::setThread(event, this_ethread());
547     completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno);
548     cont->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
549     completionUtil::destroy(event);
550     return ACTION_IO_ERROR;
551   }
552 }
553 
554 /* sendmsg:
555  * Unix:
556  *   *actual_len = sendmsg(fd,msg,default-flags);
557  *   if successful,
558  *      return ACTION_RESULT_DONE
559  *   else
560  *      return error
561  */
562 Action *
sendmsg_re(Continuation * cont,void * token,int fd,struct msghdr * msg)563 UDPNetProcessor::sendmsg_re(Continuation *cont, void *token, int fd, struct msghdr *msg)
564 {
565   int actual;
566   Event *event = completionUtil::create();
567 
568   completionUtil::setContinuation(event, cont);
569   completionUtil::setHandle(event, token);
570 
571   actual = socketManager.sendmsg(fd, msg, 0);
572   if (actual >= 0) {
573     completionUtil::setThread(event, this_ethread());
574     completionUtil::setInfo(event, fd, msg, actual, errno);
575     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, event);
576     completionUtil::destroy(event);
577     return ACTION_RESULT_DONE;
578   } else {
579     completionUtil::setThread(event, this_ethread());
580     completionUtil::setInfo(event, fd, msg, actual, errno);
581     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, event);
582     completionUtil::destroy(event);
583     return ACTION_IO_ERROR;
584   }
585 }
586 
587 /* sendto:
588  * If this were implemented, it might be implemented like this:
589  * Unix:
590  *   call sendto(fd,addr,buf->reader()->start(),len);
591  *   if successful,
592  *      buf->consume(len);
593  *      return ACTION_RESULT_DONE
594  *   else
595  *      return error
596  *
597  */
598 Action *
sendto_re(Continuation * cont,void * token,int fd,struct sockaddr const * toaddr,int toaddrlen,IOBufferBlock * buf,int len)599 UDPNetProcessor::sendto_re(Continuation *cont, void *token, int fd, struct sockaddr const *toaddr, int toaddrlen,
600                            IOBufferBlock *buf, int len)
601 {
602   (void)token;
603   ink_assert(buf->read_avail() >= len);
604   int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, toaddr, toaddrlen);
605 
606   if (nbytes_sent >= 0) {
607     ink_assert(nbytes_sent == len);
608     buf->consume(nbytes_sent);
609     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, (void *)-1);
610     return ACTION_RESULT_DONE;
611   } else {
612     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, (void *)static_cast<intptr_t>(nbytes_sent));
613     return ACTION_IO_ERROR;
614   }
615 }
616 
617 bool
CreateUDPSocket(int * resfd,sockaddr const * remote_addr,Action ** status,NetVCOptions & opt)618 UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const *remote_addr, Action **status, NetVCOptions &opt)
619 {
620   int res = 0, fd = -1;
621   int local_addr_len;
622   IpEndpoint local_addr;
623 
624   // Need to do address calculations first, so we can determine the
625   // address family for socket creation.
626   ink_zero(local_addr);
627 
628   bool is_any_address = false;
629   if (NetVCOptions::FOREIGN_ADDR == opt.addr_binding || NetVCOptions::INTF_ADDR == opt.addr_binding) {
630     // Same for now, transparency for foreign addresses must be handled
631     // *after* the socket is created, and we need to do this calculation
632     // before the socket to get the IP family correct.
633     ink_release_assert(opt.local_ip.isValid());
634     local_addr.assign(opt.local_ip, htons(opt.local_port));
635     ink_assert(ats_ip_are_compatible(remote_addr, &local_addr.sa));
636   } else {
637     // No local address specified, so use family option if possible.
638     int family = ats_is_ip(opt.ip_family) ? opt.ip_family : AF_INET;
639     local_addr.setToAnyAddr(family);
640     is_any_address    = true;
641     local_addr.port() = htons(opt.local_port);
642   }
643 
644   *resfd = -1;
645   if ((res = socketManager.socket(remote_addr->sa_family, SOCK_DGRAM, 0)) < 0) {
646     goto HardError;
647   }
648 
649   fd = res;
650   if ((res = safe_fcntl(fd, F_SETFL, O_NONBLOCK)) < 0) {
651     goto HardError;
652   }
653 
654   if (opt.socket_recv_bufsize > 0) {
655     if (unlikely(socketManager.set_rcvbuf_size(fd, opt.socket_recv_bufsize))) {
656       Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_recv_bufsize);
657     }
658   }
659   if (opt.socket_send_bufsize > 0) {
660     if (unlikely(socketManager.set_sndbuf_size(fd, opt.socket_send_bufsize))) {
661       Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_send_bufsize);
662     }
663   }
664 
665   if (opt.ip_family == AF_INET) {
666     bool succeeded = false;
667     int enable     = 1;
668 #ifdef IP_PKTINFO
669     if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
670       succeeded = true;
671     }
672 #endif
673 #ifdef IP_RECVDSTADDR
674     if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
675       succeeded = true;
676     }
677 #endif
678     if (!succeeded) {
679       Debug("udpnet", "setsockeopt for pktinfo failed");
680       goto HardError;
681     }
682   } else if (opt.ip_family == AF_INET6) {
683     bool succeeded = false;
684     int enable     = 1;
685 #ifdef IPV6_PKTINFO
686     if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
687       succeeded = true;
688     }
689 #endif
690 #ifdef IPV6_RECVPKTINFO
691     if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
692       succeeded = true;
693     }
694 #endif
695     if (!succeeded) {
696       Debug("udpnet", "setsockeopt for pktinfo failed");
697       goto HardError;
698     }
699   }
700 
701   if (local_addr.port() || !is_any_address) {
702     if (-1 == socketManager.ink_bind(fd, &local_addr.sa, ats_ip_size(&local_addr.sa))) {
703       char buff[INET6_ADDRPORTSTRLEN];
704       Debug("udpnet", "ink bind failed on %s", ats_ip_nptop(local_addr, buff, sizeof(buff)));
705       goto SoftError;
706     }
707 
708     if ((res = safe_getsockname(fd, &local_addr.sa, &local_addr_len)) < 0) {
709       Debug("udpnet", "CreateUdpsocket: getsockname didn't work");
710       goto HardError;
711     }
712   }
713 
714   *resfd  = fd;
715   *status = nullptr;
716   Debug("udpnet", "creating a udp socket port = %d, %d---success", ats_ip_port_host_order(remote_addr),
717         ats_ip_port_host_order(local_addr));
718   return true;
719 SoftError:
720   Debug("udpnet", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(local_addr));
721   if (fd != -1) {
722     socketManager.close(fd);
723   }
724   *resfd  = -1;
725   *status = nullptr;
726   return false;
727 HardError:
728   Debug("udpnet", "creating a udp socket port = %d---hard failure", ats_ip_port_host_order(local_addr));
729   if (fd != -1) {
730     socketManager.close(fd);
731   }
732   *resfd  = -1;
733   *status = ACTION_IO_ERROR;
734   return false;
735 }
736 
737 Action *
UDPBind(Continuation * cont,sockaddr const * addr,int send_bufsize,int recv_bufsize)738 UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int send_bufsize, int recv_bufsize)
739 {
740   int res              = 0;
741   int fd               = -1;
742   UnixUDPConnection *n = nullptr;
743   IpEndpoint myaddr;
744   int myaddr_len     = sizeof(myaddr);
745   PollCont *pc       = nullptr;
746   PollDescriptor *pd = nullptr;
747 
748   if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) {
749     goto Lerror;
750   }
751   fd = res;
752   if ((res = fcntl(fd, F_SETFL, O_NONBLOCK) < 0)) {
753     goto Lerror;
754   }
755 
756   if (addr->sa_family == AF_INET) {
757     bool succeeded = false;
758     int enable     = 1;
759 #ifdef IP_PKTINFO
760     if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
761       succeeded = true;
762     }
763 #endif
764 #ifdef IP_RECVDSTADDR
765     if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
766       succeeded = true;
767     }
768 #endif
769     if (!succeeded) {
770       Debug("udpnet", "setsockeopt for pktinfo failed");
771       goto Lerror;
772     }
773   } else if (addr->sa_family == AF_INET6) {
774     bool succeeded = false;
775     int enable     = 1;
776 #ifdef IPV6_PKTINFO
777     if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
778       succeeded = true;
779     }
780 #endif
781 #ifdef IPV6_RECVPKTINFO
782     if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable))) == 0) {
783       succeeded = true;
784     }
785 #endif
786     if (!succeeded) {
787       Debug("udpnet", "setsockeopt for pktinfo failed");
788       goto Lerror;
789     }
790   }
791 
792   // If this is a class D address (i.e. multicast address), use REUSEADDR.
793   if (ats_is_ip_multicast(addr)) {
794     int enable_reuseaddr = 1;
795 
796     if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&enable_reuseaddr),
797                                sizeof(enable_reuseaddr)) < 0)) {
798       goto Lerror;
799     }
800   }
801 
802   if (ats_is_ip6(addr) && (res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) {
803     goto Lerror;
804   }
805 
806   if ((res = socketManager.ink_bind(fd, addr, ats_ip_size(addr))) < 0) {
807     goto Lerror;
808   }
809 
810   if (recv_bufsize) {
811     if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize))) {
812       Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
813     }
814   }
815   if (send_bufsize) {
816     if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize))) {
817       Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
818     }
819   }
820   if ((res = safe_getsockname(fd, &myaddr.sa, &myaddr_len)) < 0) {
821     goto Lerror;
822   }
823   n = new UnixUDPConnection(fd);
824 
825   Debug("udpnet", "UDPNetProcessor::UDPBind: %p fd=%d", n, fd);
826   n->setBinding(&myaddr.sa);
827   n->bindToThread(cont);
828 
829   pc = get_UDPPollCont(n->ethread);
830   pd = pc->pollDescriptor;
831 
832   n->ep.start(pd, n, EVENTIO_READ);
833 
834   cont->handleEvent(NET_EVENT_DATAGRAM_OPEN, n);
835   return ACTION_RESULT_DONE;
836 Lerror:
837   if (fd != NO_FD) {
838     socketManager.close(fd);
839   }
840   Debug("udpnet", "Error: %s (%d)", strerror(errno), errno);
841 
842   cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, nullptr);
843   return ACTION_IO_ERROR;
844 }
845 
846 // send out all packets that need to be sent out as of time=now
UDPQueue()847 UDPQueue::UDPQueue() {}
848 
~UDPQueue()849 UDPQueue::~UDPQueue() {}
850 
851 /*
852  * Driver function that aggregates packets across cont's and sends them
853  */
854 void
service(UDPNetHandler * nh)855 UDPQueue::service(UDPNetHandler *nh)
856 {
857   (void)nh;
858   ink_hrtime now     = Thread::get_hrtime_updated();
859   uint64_t timeSpent = 0;
860   uint64_t pktSendStartTime;
861   ink_hrtime pktSendTime;
862   UDPPacketInternal *p = nullptr;
863 
864   SList(UDPPacketInternal, alink) aq(outQueue.popall());
865   Queue<UDPPacketInternal> stk;
866   while ((p = aq.pop())) {
867     stk.push(p);
868   }
869 
870   // walk backwards down list since this is actually an atomic stack.
871   while ((p = stk.pop())) {
872     ink_assert(p->link.prev == nullptr);
873     ink_assert(p->link.next == nullptr);
874     // insert into our queue.
875     Debug("udp-send", "Adding %p", p);
876     if (p->conn->lastPktStartTime == 0) {
877       pktSendStartTime = std::max(now, p->delivery_time);
878     } else {
879       pktSendTime      = p->delivery_time;
880       pktSendStartTime = std::max(std::max(now, pktSendTime), p->delivery_time);
881     }
882     p->conn->lastPktStartTime = pktSendStartTime;
883     p->delivery_time          = pktSendStartTime;
884 
885     pipeInfo.addPacket(p, now);
886   }
887 
888   pipeInfo.advanceNow(now);
889   SendPackets();
890 
891   timeSpent = ink_hrtime_to_msec(now - last_report);
892   if (timeSpent > 10000) {
893     last_report = now;
894     added       = 0;
895     packets     = 0;
896   }
897   last_service = now;
898 }
899 
900 void
SendPackets()901 UDPQueue::SendPackets()
902 {
903   UDPPacketInternal *p;
904   static ink_hrtime lastCleanupTime = Thread::get_hrtime_updated();
905   ink_hrtime now                    = Thread::get_hrtime_updated();
906   ink_hrtime send_threshold_time    = now + SLOT_TIME;
907   int32_t bytesThisSlot = INT_MAX, bytesUsed = 0;
908   int32_t bytesThisPipe, sentOne;
909   int64_t pktLen;
910 
911   bytesThisSlot = INT_MAX;
912 
913 sendPackets:
914   sentOne       = false;
915   bytesThisPipe = bytesThisSlot;
916 
917   while ((bytesThisPipe > 0) && (pipeInfo.firstPacket(send_threshold_time))) {
918     p      = pipeInfo.getFirstPacket();
919     pktLen = p->getPktLength();
920 
921     if (p->conn->shouldDestroy()) {
922       goto next_pkt;
923     }
924     if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) {
925       goto next_pkt;
926     }
927 
928     SendUDPPacket(p, pktLen);
929     bytesUsed += pktLen;
930     bytesThisPipe -= pktLen;
931   next_pkt:
932     sentOne = true;
933     p->free();
934 
935     if (bytesThisPipe < 0) {
936       break;
937     }
938   }
939 
940   bytesThisSlot -= bytesUsed;
941 
942   if ((bytesThisSlot > 0) && sentOne) {
943     // redistribute the slack...
944     now = Thread::get_hrtime_updated();
945     if (pipeInfo.firstPacket(now) == nullptr) {
946       pipeInfo.advanceNow(now);
947     }
948     goto sendPackets;
949   }
950 
951   if ((g_udp_periodicFreeCancelledPkts) && (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
952     pipeInfo.FreeCancelledPackets(g_udp_periodicCleanupSlots);
953     lastCleanupTime = now;
954   }
955 }
956 
957 void
SendUDPPacket(UDPPacketInternal * p,int32_t)958 UDPQueue::SendUDPPacket(UDPPacketInternal *p, int32_t /* pktLen ATS_UNUSED */)
959 {
960   struct msghdr msg;
961   struct iovec iov[32];
962   int real_len = 0;
963   int n, count, iov_len = 0;
964 
965   p->conn->lastSentPktStartTime = p->delivery_time;
966   Debug("udp-send", "Sending %p", p);
967 
968 #if !defined(solaris)
969   msg.msg_control    = nullptr;
970   msg.msg_controllen = 0;
971   msg.msg_flags      = 0;
972 #endif
973   msg.msg_name    = reinterpret_cast<caddr_t>(&p->to.sa);
974   msg.msg_namelen = ats_ip_size(p->to);
975   iov_len         = 0;
976 
977   for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) {
978     iov[iov_len].iov_base = static_cast<caddr_t>(b->start());
979     iov[iov_len].iov_len  = b->size();
980     real_len += iov[iov_len].iov_len;
981     iov_len++;
982   }
983   msg.msg_iov    = iov;
984   msg.msg_iovlen = iov_len;
985 
986   count = 0;
987   while (true) {
988     // stupid Linux problem: sendmsg can return EAGAIN
989     n = ::sendmsg(p->conn->getFd(), &msg, 0);
990     if ((n >= 0) || ((n < 0) && (errno != EAGAIN))) {
991       // send succeeded or some random error happened.
992       if (n < 0) {
993         Debug("udp-send", "Error: %s (%d)", strerror(errno), errno);
994       }
995 
996       break;
997     }
998     if (errno == EAGAIN) {
999       ++count;
1000       if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
1001         // tried too many times; give up
1002         Debug("udpnet", "Send failed: too many retries");
1003         break;
1004       }
1005     }
1006   }
1007 }
1008 
1009 void
send(UDPPacket * p)1010 UDPQueue::send(UDPPacket *p)
1011 {
1012   // XXX: maybe fastpath for immediate send?
1013   outQueue.push((UDPPacketInternal *)p);
1014 }
1015 
1016 #undef LINK
1017 
1018 static void
net_signal_hook_callback(EThread * thread)1019 net_signal_hook_callback(EThread *thread)
1020 {
1021 #if HAVE_EVENTFD
1022   uint64_t counter;
1023   ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
1024 #elif TS_USE_PORT
1025 /* Nothing to drain or do */
1026 #else
1027   char dummy[1024];
1028   ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
1029 #endif
1030 }
1031 
UDPNetHandler()1032 UDPNetHandler::UDPNetHandler()
1033 {
1034   nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
1035   lastCheck = 0;
1036   SET_HANDLER((UDPNetContHandler)&UDPNetHandler::startNetEvent);
1037 }
1038 
1039 int
startNetEvent(int event,Event * e)1040 UDPNetHandler::startNetEvent(int event, Event *e)
1041 {
1042   (void)event;
1043   SET_HANDLER((UDPNetContHandler)&UDPNetHandler::mainNetEvent);
1044   trigger_event = e;
1045   e->schedule_every(-HRTIME_MSECONDS(UDP_NH_PERIOD));
1046   return EVENT_CONT;
1047 }
1048 
1049 int
mainNetEvent(int event,Event * e)1050 UDPNetHandler::mainNetEvent(int event, Event *e)
1051 {
1052   ink_assert(trigger_event == e && event == EVENT_POLL);
1053   return this->waitForActivity(net_config_poll_timeout);
1054 }
1055 
1056 int
waitForActivity(ink_hrtime timeout)1057 UDPNetHandler::waitForActivity(ink_hrtime timeout)
1058 {
1059   UnixUDPConnection *uc;
1060   PollCont *pc = get_UDPPollCont(this->thread);
1061   pc->do_poll(timeout);
1062 
1063   /* Notice: the race between traversal of newconn_list and UDPBind()
1064    *
1065    * If the UDPBind() is called after the traversal of newconn_list,
1066    * the UDPConnection, the one from the pollDescriptor->result, did not push into the open_list.
1067    *
1068    * TODO:
1069    *
1070    * Take UnixNetVConnection::acceptEvent() as reference to create UnixUDPConnection::newconnEvent().
1071    */
1072 
1073   // handle new UDP connection
1074   SList(UnixUDPConnection, newconn_alink) ncq(newconn_list.popall());
1075   while ((uc = ncq.pop())) {
1076     if (uc->shouldDestroy()) {
1077       open_list.remove(uc); // due to the above race
1078       uc->Release();
1079     } else {
1080       ink_assert(uc->mutex && uc->continuation);
1081       open_list.in_or_enqueue(uc); // due to the above race
1082     }
1083   }
1084 
1085   // handle UDP outgoing engine
1086   udpOutQueue.service(this);
1087 
1088   // handle UDP read operations
1089   int i        = 0;
1090   EventIO *epd = nullptr;
1091   for (i = 0; i < pc->pollDescriptor->result; i++) {
1092     epd = static_cast<EventIO *> get_ev_data(pc->pollDescriptor, i);
1093     if (epd->type == EVENTIO_UDP_CONNECTION) {
1094       // TODO: handle EVENTIO_ERROR
1095       if (get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) {
1096         uc = epd->data.uc;
1097         ink_assert(uc && uc->mutex && uc->continuation);
1098         ink_assert(uc->refcount >= 1);
1099         open_list.in_or_enqueue(uc); // due to the above race
1100         if (uc->shouldDestroy()) {
1101           open_list.remove(uc);
1102           uc->Release();
1103         } else {
1104           udpNetInternal.udp_read_from_net(this, uc);
1105         }
1106       } else {
1107         Debug("iocore_udp_main", "Unhandled epoll event: 0x%04x", get_ev_events(pc->pollDescriptor, i));
1108       }
1109     } else if (epd->type == EVENTIO_DNS_CONNECTION) {
1110       // TODO: handle DNS conn if there is ET_UDP
1111       if (epd->data.dnscon != nullptr) {
1112         epd->data.dnscon->trigger();
1113 #if defined(USE_EDGE_TRIGGER)
1114         epd->refresh(EVENTIO_READ);
1115 #endif
1116       }
1117     } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
1118       net_signal_hook_callback(this->thread);
1119     }
1120   } // end for
1121 
1122   // remove dead UDP connections
1123   ink_hrtime now = Thread::get_hrtime_updated();
1124   if (now >= nextCheck) {
1125     forl_LL(UnixUDPConnection, xuc, open_list)
1126     {
1127       ink_assert(xuc->mutex && xuc->continuation);
1128       ink_assert(xuc->refcount >= 1);
1129       if (xuc->shouldDestroy()) {
1130         open_list.remove(xuc);
1131         xuc->Release();
1132       }
1133     }
1134     nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
1135   }
1136   // service UDPConnections with data ready for callback.
1137   Que(UnixUDPConnection, callback_link) q = udp_callbacks;
1138   udp_callbacks.clear();
1139   while ((uc = q.dequeue())) {
1140     ink_assert(uc->mutex && uc->continuation);
1141     if (udpNetInternal.udp_callback(this, uc, this->thread)) { // not successful
1142       // schedule on a thread of its own.
1143       ink_assert(uc->callback_link.next == nullptr);
1144       ink_assert(uc->callback_link.prev == nullptr);
1145       udp_callbacks.enqueue(uc);
1146     } else {
1147       ink_assert(uc->callback_link.next == nullptr);
1148       ink_assert(uc->callback_link.prev == nullptr);
1149       uc->onCallbackQueue = 0;
1150       uc->Release();
1151     }
1152   }
1153 
1154   return EVENT_CONT;
1155 }
1156 
1157 void
signalActivity()1158 UDPNetHandler::signalActivity()
1159 {
1160 #if HAVE_EVENTFD
1161   uint64_t counter = 1;
1162   ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
1163 #elif TS_USE_PORT
1164   PollDescriptor *pd = get_PollDescriptor(thread);
1165   ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
1166 #else
1167   char dummy = 1;
1168   ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
1169 #endif
1170 }
1171