1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "tscore/ink_config.h"
25 #include "tscore/ink_defs.h"
26 #include "tscore/ink_sock.h"
27 #include "tscore/ink_string.h"
28 #include "tscore/ink_memory.h"
29 #include "tscore/I_Layout.h"
30 #include "NetworkUtilsRemote.h"
31 #include "CoreAPI.h"
32 #include "CoreAPIShared.h"
33 #include "MgmtSocket.h"
34 #include "MgmtMarshall.h"
35 
36 CallbackTable *remote_event_callbacks;
37 
38 int main_socket_fd  = -1;
39 int event_socket_fd = -1;
40 
41 // need to store for reconnecting scenario
42 char *main_socket_path  = nullptr; // "<path>/mgmtapi.sock"
43 char *event_socket_path = nullptr; // "<path>/eventapi.sock"
44 
45 static void *event_callback_thread(void *arg);
46 
47 /**********************************************************************
48  * Socket Helper Functions
49  **********************************************************************/
50 void
set_socket_paths(const char * path)51 set_socket_paths(const char *path)
52 {
53   // free previously set paths if needed
54   ats_free(main_socket_path);
55   ats_free(event_socket_path);
56 
57   // construct paths based on user input
58   // form by replacing "mgmtapi.sock" with "eventapi.sock"
59   if (path) {
60     main_socket_path  = ats_stringdup(Layout::relative_to(path, MGMTAPI_MGMT_SOCKET_NAME));
61     event_socket_path = ats_stringdup(Layout::relative_to(path, MGMTAPI_EVENT_SOCKET_NAME));
62   } else {
63     main_socket_path  = nullptr;
64     event_socket_path = nullptr;
65   }
66 
67   return;
68 }
69 
70 /**********************************************************************
71  * socket_test
72  *
73  * purpose: performs socket write to check status of other end of connection
74  * input: None
75  * output: return false if socket write failed due to some other error
76  *         return true if socket write successful
77  * notes: send the API_PING test msg
78  **********************************************************************/
79 static bool
socket_test(int fd)80 socket_test(int fd)
81 {
82   OpType optype       = OpType::API_PING;
83   MgmtMarshallInt now = time(nullptr);
84 
85   if (MGMTAPI_SEND_MESSAGE(fd, OpType::API_PING, &optype, &now) == TS_ERR_OKAY) {
86     return true; // write was successful; connection still open
87   }
88 
89   return false;
90 }
91 
92 /***************************************************************************
93  * connect
94  *
95  * purpose: connects to the port on traffic server that listens to mgmt
96  *          requests & issues out responses and alerts
97  * 1) create and set the client socket_fd; connect to TM
98  * 2) create and set the client's event_socket_fd; connect to TM
99  * output: TS_ERR_OKAY          - if both sockets successfully connect to TM
100  *         TS_ERR_NET_ESTABLISH - at least one unsuccessful connection
101  * notes: If connection breaks it is responsibility of client to reconnect
102  *        otherwise traffic server will assume mgmt stopped request and
103  *        goes back to just sitting and listening for connection.
104  ***************************************************************************/
105 TSMgmtError
ts_connect()106 ts_connect()
107 {
108   struct sockaddr_un client_sock;
109   struct sockaddr_un client_event_sock;
110 
111   int sockaddr_len;
112 
113   // make sure a socket path is set up
114   if (!main_socket_path || !event_socket_path) {
115     goto ERROR;
116   }
117   // make sure the length of main_socket_path do not exceed the sizeof(sun_path)
118   if (strlen(main_socket_path) > sizeof(client_sock.sun_path) - 1) {
119     goto ERROR;
120   }
121   // make sure the length of event_socket_path do not exceed the sizeof(sun_path)
122   if (strlen(event_socket_path) > sizeof(client_event_sock.sun_path) - 1) {
123     goto ERROR;
124   }
125   // create a socket
126   main_socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
127   if (main_socket_fd < 0) {
128     goto ERROR; // ERROR - can't open socket
129   }
130   // setup Unix domain socket
131   memset(&client_sock, 0, sizeof(sockaddr_un));
132   client_sock.sun_family = AF_UNIX;
133   ink_strlcpy(client_sock.sun_path, main_socket_path, sizeof(client_sock.sun_path));
134 #if defined(darwin) || defined(freebsd)
135   sockaddr_len = sizeof(sockaddr_un);
136 #else
137   sockaddr_len = sizeof(client_sock.sun_family) + strlen(client_sock.sun_path);
138 #endif
139   // connect call
140   if (connect(main_socket_fd, reinterpret_cast<struct sockaddr *>(&client_sock), sockaddr_len) < 0) {
141     close(main_socket_fd);
142     main_socket_fd = -1;
143     goto ERROR; // connection is down
144   }
145   // -------- set up the event socket ------------------
146   // create a socket
147   event_socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
148   if (event_socket_fd < 0) {
149     close(main_socket_fd); // close the other socket too!
150     main_socket_fd = -1;
151     goto ERROR; // ERROR - can't open socket
152   }
153   // setup Unix domain socket
154   memset(&client_event_sock, 0, sizeof(sockaddr_un));
155   client_event_sock.sun_family = AF_UNIX;
156   ink_strlcpy(client_event_sock.sun_path, event_socket_path, sizeof(client_event_sock.sun_path));
157 #if defined(darwin) || defined(freebsd)
158   sockaddr_len = sizeof(sockaddr_un);
159 #else
160   sockaddr_len = sizeof(client_event_sock.sun_family) + strlen(client_event_sock.sun_path);
161 #endif
162   // connect call
163   if (connect(event_socket_fd, reinterpret_cast<struct sockaddr *>(&client_event_sock), sockaddr_len) < 0) {
164     close(event_socket_fd);
165     close(main_socket_fd);
166     event_socket_fd = -1;
167     main_socket_fd  = -1;
168     goto ERROR; // connection is down
169   }
170 
171   return TS_ERR_OKAY;
172 
173 ERROR:
174   return TS_ERR_NET_ESTABLISH;
175 }
176 
177 /***************************************************************************
178  * disconnect
179  *
180  * purpose: disconnect from traffic server; closes sockets and resets their values
181  * input: None
182  * output: TS_ERR_FAIL, TS_ERR_OKAY
183  * notes: doesn't do clean up - all cleanup should be done before here
184  ***************************************************************************/
185 TSMgmtError
disconnect()186 disconnect()
187 {
188   int ret;
189 
190   if (main_socket_fd > 0) {
191     ret            = close(main_socket_fd);
192     main_socket_fd = -1;
193     if (ret < 0) {
194       return TS_ERR_FAIL;
195     }
196   }
197 
198   if (event_socket_fd > 0) {
199     ret             = close(event_socket_fd);
200     event_socket_fd = -1;
201     if (ret < 0) {
202       return TS_ERR_FAIL;
203     }
204   }
205 
206   return TS_ERR_OKAY;
207 }
208 
209 /***************************************************************************
210  * reconnect
211  *
212  * purpose: reconnects to TM (eg. when TM restarts); does all the necessary
213  *          set up for reconnection
214  * input: None
215  * output: TS_ERR_FAIL, TS_ERR_OKAY
216  * notes: necessary events for a new client-TM connection:
217  * 1) get new socket_fd using old socket_path by calling connect()
218  * 2) relaunch event_poll_thread_main with new socket_fd
219  * 3) re-notify TM of all the client's registered callbacks by send msg
220  ***************************************************************************/
221 TSMgmtError
reconnect()222 reconnect()
223 {
224   TSMgmtError err;
225 
226   err = disconnect();
227   if (err != TS_ERR_OKAY) { // problem disconnecting
228     return err;
229   }
230 
231   // use the socket_path that was called by remote client on first init
232   // use connect instead of TSInit() b/c if TM restarted, client-side tables
233   // would be recreated; just want to reconnect to same socket_path
234   err = ts_connect();
235   if (err != TS_ERR_OKAY) { // problem establishing connection
236     return err;
237   }
238 
239   // relaunch a new event thread since socket_fd changed
240   if (0 == (ts_init_options & TS_MGMT_OPT_NO_EVENTS)) {
241     ink_thread_create(&ts_event_thread, event_poll_thread_main, &event_socket_fd, 0, 0, nullptr);
242     // re-register the callbacks on the TM side for this new client connection
243     if (remote_event_callbacks) {
244       err = send_register_all_callbacks(event_socket_fd, remote_event_callbacks);
245       if (err != TS_ERR_OKAY) { // problem establishing connection
246         return err;
247       }
248     }
249   } else {
250     ts_event_thread = ink_thread_null();
251   }
252 
253   return TS_ERR_OKAY;
254 }
255 
256 /***************************************************************************
257  * reconnect_loop
258  *
259  * purpose: attempts to reconnect to TM (eg. when TM restarts) for the
260  *          specified number of times
261  * input:  num_attempts - number of reconnection attempts to try before quit
262  * output: TS_ERR_OKAY - if successfully reconnected within num_attempts
263  *         TS_ERR_xx - the reason the reconnection failed
264  * notes:
265  ***************************************************************************/
266 TSMgmtError
reconnect_loop(int num_attempts)267 reconnect_loop(int num_attempts)
268 {
269   int numTries    = 0;
270   TSMgmtError err = TS_ERR_FAIL;
271 
272   while (numTries < num_attempts) {
273     numTries++;
274     err = reconnect();
275     if (err == TS_ERR_OKAY) {
276       return TS_ERR_OKAY; // successful connection
277     }
278     sleep(1); // to make it slower
279   }
280 
281   return err; // unsuccessful connection after num_attempts
282 }
283 
284 /*************************************************************************
285  * connect_and_send
286  *
287  * purpose:
288  * When sending a request, it's possible that the user had restarted
289  * Traffic Manager. This means that the connection between TM and
290  * the remote client has been broken, so the client needs to re-"connect"
291  * to Traffic Manager. So, after "writing" to the socket in each
292  * "send_xx_request" function, need to check if the TM socket has
293  * been closed or not; the "write" function's errno will indicate if
294  * the other end of the socket has been closed or not. If it is closed,
295  * then need to try to re"connect", then resend the message request if
296  * the "connect" was successful.
297  * 1) try connect()
298  * 2) if connect() success, then resend the request.
299  * output: TS_ERR_NET_xx - connection problem or TS_ERR_OKAY
300  * notes:
301  * This function is basically called by the special "socket_write_conn" fn
302  * which will call this fn if it tries to write to the socket and discovers
303  * the local end of the socket is closed
304  * Warning: system also sends a SIGPIPE error when try to write to socket
305  * which is not open; which will by default terminate the process;
306  * client needs to "ignore" the SIGPIPE signal
307  **************************************************************************/
308 static TSMgmtError
main_socket_reconnect()309 main_socket_reconnect()
310 {
311   TSMgmtError err;
312 
313   // connects to TM and does all necessary event updates required
314   err = reconnect();
315   if (err != TS_ERR_OKAY) {
316     return err;
317   }
318 
319   // makes sure the descriptor is writable
320   if (mgmt_write_timeout(main_socket_fd, MAX_TIME_WAIT, 0) <= 0) {
321     return TS_ERR_NET_TIMEOUT;
322   }
323 
324   return TS_ERR_OKAY;
325 }
326 
327 static TSMgmtError
socket_write_conn(int fd,const void * msg_buf,size_t bytes)328 socket_write_conn(int fd, const void *msg_buf, size_t bytes)
329 {
330   size_t byte_wrote = 0;
331 
332   // makes sure the descriptor is writable
333   if (mgmt_write_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
334     return TS_ERR_NET_TIMEOUT;
335   }
336 
337   // write until we fulfill the number
338   while (byte_wrote < bytes) {
339     ssize_t ret = write(fd, static_cast<const char *>(msg_buf) + byte_wrote, bytes - byte_wrote);
340 
341     if (ret == 0) {
342       return TS_ERR_NET_EOF;
343     }
344 
345     if (ret < 0) {
346       if (mgmt_transient_error()) {
347         continue;
348       } else {
349         return TS_ERR_NET_WRITE;
350       }
351     }
352 
353     // we are all good here
354     byte_wrote += ret;
355   }
356 
357   return TS_ERR_OKAY;
358 }
359 
360 TSMgmtError
send(void * msg,size_t msglen) const361 mgmtapi_sender::send(void *msg, size_t msglen) const
362 {
363   const unsigned tries = 5;
364   TSMgmtError err;
365 
366   for (unsigned i = 0; i < tries; ++i) {
367     err = socket_write_conn(this->fd, msg, msglen);
368     if (err == TS_ERR_OKAY) {
369       return err;
370     }
371 
372     // clean-up sockets
373     close(main_socket_fd);
374     close(event_socket_fd);
375     main_socket_fd  = -1;
376     event_socket_fd = -1;
377 
378     err = main_socket_reconnect();
379     if (err != TS_ERR_OKAY) {
380       return err;
381     }
382   }
383 
384   return TS_ERR_NET_ESTABLISH; // can't establish connection
385 }
386 
387 /**********************************************************************
388  * socket_test_thread
389  *
390  * purpose: continually polls to check if local end of socket connection
391  *          is still open; this thread is created when the client calls
392  *          Init() to initialize the API; and will not
393  *          die until the client process dies
394  * input: none
395  * output: if other end is closed, it reconnects to TM
396  * notes: uses the current main_socket_fd because the main_socket_fd could be
397  *        in flux; basically it is possible that the client will reconnect
398  *        from some other call, thus making the main_socket_fd actually
399  *        valid when socket_test is called
400  * reason: decided to create this "watcher" thread for the socket
401  *         connection because if TM is restarted or the client process
402  *         is started before the TM process, then the client will not
403  *         be able to receive any event notifications until a "request"
404  *         is issued. In order to prevent losing an event notifications
405  *         that are called in between the time TM is restarted and
406  *         client issues a first request, we just run this thread which
407  *         will try to reconnect to TM if it is not already connected
408  **********************************************************************/
409 void *
socket_test_thread(void *)410 socket_test_thread(void *)
411 {
412   // loop until client process dies
413   while (true) {
414     if (main_socket_fd == -1 || !socket_test(main_socket_fd)) {
415       // ASSUMES that in between the time the socket_test is made
416       // and this reconnect call is made, the main_socket_fd remains
417       // the same (eg. no one else called reconnect to TM successfully!!
418       // WHAT IF in between this time, the client had issued a request
419       // calling socket_write_conn which then calls reconnect(); then
420       // reconnect will return an "ALREADY CONNECTED" error when it
421       // tries to connect, and on the next loop iteration, the socket_test
422       // will actually pass because main_socket_fd is valid!!
423       reconnect();
424     }
425 
426     sleep(5);
427   }
428 
429   ink_thread_exit(nullptr);
430   return nullptr;
431 }
432 
433 /**********************************************************************
434  * MARSHALL REQUESTS
435  **********************************************************************/
436 
437 /*------ events -------------------------------------------------------*/
438 
439 /**********************************************************************
440  * send_register_all_callbacks
441  *
442  * purpose: determines all events which have at least one callback registered
443  *          and sends message to notify TM that this client has a callback
444  *          registered for each event
445  * input: None
446  * output: return TS_ERR_OKAY only if ALL events sent okay
447  * 1) get list of all events with callbacks
448  * 2) for each event, send a EVENT_REG_CALLBACK message
449  **********************************************************************/
450 TSMgmtError
send_register_all_callbacks(int fd,CallbackTable * cb_table)451 send_register_all_callbacks(int fd, CallbackTable *cb_table)
452 {
453   LLQ *events_with_cb;
454   TSMgmtError err, send_err = TS_ERR_FAIL;
455   bool no_errors = true; // set to false if one send is not okay
456 
457   events_with_cb = get_events_with_callbacks(cb_table);
458   // need to check that the list has all the events registered
459   if (!events_with_cb) { // all events have registered callback
460     OpType optype                 = OpType::EVENT_REG_CALLBACK;
461     MgmtMarshallString event_name = nullptr;
462 
463     err = MGMTAPI_SEND_MESSAGE(fd, OpType::EVENT_REG_CALLBACK, &optype, &event_name);
464     if (err != TS_ERR_OKAY) {
465       return err;
466     }
467   } else {
468     int num_events = queue_len(events_with_cb);
469     // iterate through the LLQ and send request for each event
470     for (int i = 0; i < num_events; i++) {
471       OpType optype                 = OpType::EVENT_REG_CALLBACK;
472       MgmtMarshallInt event_id      = *static_cast<int *>(dequeue(events_with_cb));
473       MgmtMarshallString event_name = get_event_name(event_id);
474 
475       if (event_name) {
476         err = MGMTAPI_SEND_MESSAGE(fd, OpType::EVENT_REG_CALLBACK, &optype, &event_name);
477         ats_free(event_name); // free memory
478         if (err != TS_ERR_OKAY) {
479           send_err  = err; // save the type of send error
480           no_errors = false;
481         }
482       }
483       // REMEMBER: WON"T GET A REPLY from TM side!
484     }
485   }
486 
487   if (events_with_cb) {
488     delete_queue(events_with_cb);
489   }
490 
491   if (no_errors) {
492     return TS_ERR_OKAY;
493   } else {
494     return send_err;
495   }
496 }
497 
498 /**********************************************************************
499  * send_unregister_all_callbacks
500  *
501  * purpose: determines all events which have no callback registered
502  *          and sends message to notify TM that this client has no
503  *          callbacks registered for that event
504  * input: None
505  * output: TS_ERR_OKAY only if all send requests are okay
506  **********************************************************************/
507 TSMgmtError
send_unregister_all_callbacks(int fd,CallbackTable * cb_table)508 send_unregister_all_callbacks(int fd, CallbackTable *cb_table)
509 {
510   LLQ *events_with_cb; // list of events with at least one callback
511   int reg_callback[NUM_EVENTS];
512   TSMgmtError err, send_err = TS_ERR_FAIL;
513   bool no_errors = true; // set to false if at least one send fails
514 
515   // init array so that all events don't have any callbacks
516   for (int &i : reg_callback) {
517     i = 0;
518   }
519 
520   events_with_cb = get_events_with_callbacks(cb_table);
521   if (!events_with_cb) { // all events have a registered callback
522     return TS_ERR_OKAY;
523   } else {
524     int num_events = queue_len(events_with_cb);
525     // iterate through the LLQ and mark events that have a callback
526     for (int i = 0; i < num_events; i++) {
527       int event_id           = *static_cast<int *>(dequeue(events_with_cb));
528       reg_callback[event_id] = 1; // mark the event as having a callback
529     }
530     delete_queue(events_with_cb);
531   }
532 
533   // send message to TM to mark unregister
534   for (int k = 0; k < NUM_EVENTS; k++) {
535     if (reg_callback[k] == 0) { // event has no registered callbacks
536       OpType optype                 = OpType::EVENT_UNREG_CALLBACK;
537       MgmtMarshallString event_name = get_event_name(k);
538 
539       err = MGMTAPI_SEND_MESSAGE(fd, OpType::EVENT_UNREG_CALLBACK, &optype, &event_name);
540       ats_free(event_name);
541       if (err != TS_ERR_OKAY) {
542         send_err  = err; // save the type of the sending error
543         no_errors = false;
544       }
545       // REMEMBER: WON"T GET A REPLY!
546       // only the event_poll_thread_main does any reading of the event_socket;
547       // so DO NOT parse reply b/c a reply won't be sent
548     }
549   }
550 
551   if (no_errors) {
552     return TS_ERR_OKAY;
553   } else {
554     return send_err;
555   }
556 }
557 
558 /**********************************************************************
559  * UNMARSHAL REPLIES
560  **********************************************************************/
561 
562 TSMgmtError
parse_generic_response(OpType optype,int fd)563 parse_generic_response(OpType optype, int fd)
564 {
565   TSMgmtError err;
566   MgmtMarshallInt ival;
567   MgmtMarshallData data = {nullptr, 0};
568 
569   err = recv_mgmt_message(fd, data);
570   if (err != TS_ERR_OKAY) {
571     return err;
572   }
573 
574   err = recv_mgmt_response(data.ptr, data.len, optype, &ival);
575   ats_free(data.ptr);
576 
577   if (err != TS_ERR_OKAY) {
578     return err;
579   }
580 
581   return static_cast<TSMgmtError>(ival);
582 }
583 
584 /**********************************************************************
585  * event_poll_thread_main
586  *
587  * purpose: thread listens on the client's event socket connection;
588  *          only reads from the event_socket connection and
589  *          processes EVENT_NOTIFY messages; each time client
590  *          makes new event-socket connection to TM, must launch
591  *          a new event_poll_thread_main thread
592  * input:   arg - contains the socket_fd to listen on
593  * output:  NULL - if error
594  * notes:   each time the client's socket connection to TM is reset
595  *          a new thread will be launched as old one dies; there are
596  *          only two places where a new thread is created:
597  *          1) when client first connects (TSInit call)
598  *          2) client reconnects() due to a TM restart
599  * Uses blocking socket; so blocks until receives an event notification.
600  * Shouldn't need to use select since only waiting for a notification
601  * message from event_callback_main thread!
602  **********************************************************************/
603 void *
event_poll_thread_main(void * arg)604 event_poll_thread_main(void *arg)
605 {
606   int sock_fd;
607 
608   sock_fd = *(static_cast<int *>(arg)); // should be same as event_socket_fd
609 
610   // the sock_fd is going to be the one we listen for events on
611   while (true) {
612     TSMgmtError ret;
613     TSMgmtEvent *event = nullptr;
614 
615     MgmtMarshallData reply = {nullptr, 0};
616     OpType optype;
617     MgmtMarshallString name = nullptr;
618     MgmtMarshallString desc = nullptr;
619 
620     // possible sock_fd is invalid if TM restarts and client reconnects
621     if (sock_fd < 0) {
622       break;
623     }
624 
625     // Just wait until we get an event or error. The 0 return from select(2)
626     // means we timed out ...
627     if (mgmt_read_timeout(main_socket_fd, MAX_TIME_WAIT, 0) == 0) {
628       continue;
629     }
630 
631     ret = recv_mgmt_message(main_socket_fd, reply);
632     if (ret != TS_ERR_OKAY) {
633       break;
634     }
635 
636     ret = recv_mgmt_request(reply.ptr, reply.len, OpType::EVENT_NOTIFY, &optype, &name, &desc);
637     ats_free(reply.ptr);
638 
639     if (ret != TS_ERR_OKAY) {
640       ats_free(name);
641       ats_free(desc);
642       break;
643     }
644 
645     ink_assert(optype == OpType::EVENT_NOTIFY);
646 
647     // The new event takes ownership of the message strings.
648     event              = TSEventCreate();
649     event->name        = name;
650     event->id          = get_event_id(name);
651     event->description = desc;
652 
653     // got event notice; spawn new thread to handle the event's callback functions
654     ink_thread_create(nullptr, event_callback_thread, (void *)event, 0, 0, nullptr);
655   }
656 
657   ink_thread_exit(nullptr);
658   return nullptr;
659 }
660 
661 /**********************************************************************
662  * event_callback_thread
663  *
664  * purpose: Given an event, determines and calls the registered cb functions
665  *          in the CallbackTable for remote events
666  * input: arg - should be an TSMgmtEvent with the event info sent from TM msg
667  * output: returns when done calling all the callbacks
668  * notes: None
669  **********************************************************************/
670 static void *
event_callback_thread(void * arg)671 event_callback_thread(void *arg)
672 {
673   TSMgmtEvent *event_notice;
674   EventCallbackT *event_cb;
675   int index;
676 
677   event_notice = static_cast<TSMgmtEvent *>(arg);
678   index        = event_notice->id;
679   LLQ *func_q; // list of callback functions need to call
680 
681   func_q = create_queue();
682   if (!func_q) {
683     TSEventDestroy(event_notice);
684     return nullptr;
685   }
686 
687   // obtain lock
688   ink_mutex_acquire(&remote_event_callbacks->event_callback_lock);
689 
690   TSEventSignalFunc cb;
691 
692   // check if we have functions to call
693   if (remote_event_callbacks->event_callback_l[index] && (!queue_is_empty(remote_event_callbacks->event_callback_l[index]))) {
694     int queue_depth = queue_len(remote_event_callbacks->event_callback_l[index]);
695 
696     for (int i = 0; i < queue_depth; i++) {
697       event_cb = static_cast<EventCallbackT *>(dequeue(remote_event_callbacks->event_callback_l[index]));
698       cb       = event_cb->func;
699       enqueue(remote_event_callbacks->event_callback_l[index], event_cb);
700       enqueue(func_q, reinterpret_cast<void *>(cb)); // add callback function only to list
701     }
702   }
703   // release lock
704   ink_mutex_release(&remote_event_callbacks->event_callback_lock);
705 
706   // execute the callback function
707   while (!queue_is_empty(func_q)) {
708     cb = reinterpret_cast<TSEventSignalFunc>(dequeue(func_q));
709     (*cb)(event_notice->name, event_notice->description, event_notice->priority, nullptr);
710   }
711 
712   // clean up event notice
713   TSEventDestroy(event_notice);
714   delete_queue(func_q);
715 
716   // all done!
717   ink_thread_exit(nullptr);
718   return nullptr;
719 }
720