1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include <tscore/TSSystemState.h>
25 
26 //////////////////////////////////////////////////////////////////////
27 //
28 // The EThread Class
29 //
30 /////////////////////////////////////////////////////////////////////
31 #include "P_EventSystem.h"
32 
33 #if HAVE_EVENTFD
34 #include <sys/eventfd.h>
35 #endif
36 
37 struct AIOCallback;
38 
39 #define NO_HEARTBEAT -1
40 #define THREAD_MAX_HEARTBEAT_MSECONDS 60
41 
42 // !! THIS MUST BE IN THE ENUM ORDER !!
43 char const *const EThread::STAT_NAME[] = {"proxy.process.eventloop.count",      "proxy.process.eventloop.events",
44                                           "proxy.process.eventloop.events.min", "proxy.process.eventloop.events.max",
45                                           "proxy.process.eventloop.wait",       "proxy.process.eventloop.time.min",
46                                           "proxy.process.eventloop.time.max"};
47 
48 int const EThread::SAMPLE_COUNT[N_EVENT_TIMESCALES] = {10, 100, 1000};
49 
50 int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
51 
52 EThread::EThread()
53 {
54   memset(thread_private, 0, PER_THREAD_DATA);
55 }
56 
57 EThread::EThread(ThreadType att, int anid) : id(anid), tt(att)
58 {
59   ethreads_to_be_signalled = (EThread **)ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *));
60   memset(ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
61   memset(thread_private, 0, PER_THREAD_DATA);
62 #if HAVE_EVENTFD
63   evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
64   if (evfd < 0) {
65     if (errno == EINVAL) { // flags invalid for kernel <= 2.6.26
66       evfd = eventfd(0, 0);
67       if (evfd < 0) {
68         Fatal("EThread::EThread: %d=eventfd(0,0),errno(%d)", evfd, errno);
69       }
70     } else {
71       Fatal("EThread::EThread: %d=eventfd(0,EFD_NONBLOCK | EFD_CLOEXEC),errno(%d)", evfd, errno);
72     }
73   }
74 #elif TS_USE_PORT
75 /* Solaris ports requires no crutches to do cross thread signaling.
76  * We'll just port_send the event straight over the port.
77  */
78 #else
79   ink_release_assert(pipe(evpipe) >= 0);
80   fcntl(evpipe[0], F_SETFD, FD_CLOEXEC);
81   fcntl(evpipe[0], F_SETFL, O_NONBLOCK);
82   fcntl(evpipe[1], F_SETFD, FD_CLOEXEC);
83   fcntl(evpipe[1], F_SETFL, O_NONBLOCK);
84 #endif
85 }
86 
87 EThread::EThread(ThreadType att, Event *e) : tt(att), start_event(e)
88 {
89   ink_assert(att == DEDICATED);
90   memset(thread_private, 0, PER_THREAD_DATA);
91 }
92 
93 // Provide a destructor so that SDK functions which create and destroy
94 // threads won't have to deal with EThread memory deallocation.
95 EThread::~EThread()
96 {
97   if (n_ethreads_to_be_signalled > 0) {
98     flush_signals(this);
99   }
100   ats_free(ethreads_to_be_signalled);
101   // TODO: This can't be deleted ....
102   // delete[]l1_hash;
103 }
104 
105 bool
106 EThread::is_event_type(EventType et)
107 {
108   return (event_types & (1 << static_cast<int>(et))) != 0;
109 }
110 
111 void
112 EThread::set_event_type(EventType et)
113 {
114   event_types |= (1 << static_cast<int>(et));
115 }
116 
117 void
118 EThread::process_event(Event *e, int calling_code)
119 {
120   ink_assert((!e->in_the_prot_queue && !e->in_the_priority_queue));
121   MUTEX_TRY_LOCK(lock, e->mutex, this);
122   if (!lock.is_locked()) {
123     e->timeout_at = cur_time + DELAY_FOR_RETRY;
124     EventQueueExternal.enqueue_local(e);
125   } else {
126     if (e->cancelled) {
127       free_event(e);
128       return;
129     }
130     Continuation *c_temp = e->continuation;
131     // Make sure that the continuation is locked before calling the handler
132 
133     // Restore the client IP debugging flags
134     set_cont_flags(e->continuation->control_flags);
135 
136     e->continuation->handleEvent(calling_code, e);
137     ink_assert(!e->in_the_priority_queue);
138     ink_assert(c_temp == e->continuation);
139     MUTEX_RELEASE(lock);
140     if (e->period) {
141       if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
142         if (e->period < 0) {
143           e->timeout_at = e->period;
144         } else {
145           this->get_hrtime_updated();
146           e->timeout_at = cur_time + e->period;
147           if (e->timeout_at < cur_time) {
148             e->timeout_at = cur_time;
149           }
150         }
151         EventQueueExternal.enqueue_local(e);
152       }
153     } else if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
154       free_event(e);
155     }
156   }
157 }
158 
159 void
160 EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count)
161 {
162   Event *e;
163 
164   // Move events from the external thread safe queues to the local queue.
165   EventQueueExternal.dequeue_external();
166 
167   // execute all the available external events that have
168   // already been dequeued
169   while ((e = EventQueueExternal.dequeue_local())) {
170     ++(*ev_count);
171     if (e->cancelled) {
172       free_event(e);
173     } else if (!e->timeout_at) { // IMMEDIATE
174       ink_assert(e->period == 0);
175       process_event(e, e->callback_event);
176     } else if (e->timeout_at > 0) { // INTERVAL
177       EventQueue.enqueue(e, cur_time);
178     } else { // NEGATIVE
179       Event *p = nullptr;
180       Event *a = NegativeQueue->head;
181       while (a && a->timeout_at > e->timeout_at) {
182         p = a;
183         a = a->link.next;
184       }
185       if (!a) {
186         NegativeQueue->enqueue(e);
187       } else {
188         NegativeQueue->insert(e, p);
189       }
190     }
191     ++(*nq_count);
192   }
193 }
194 
195 void
196 EThread::execute_regular()
197 {
198   Event *e;
199   Que(Event, link) NegativeQueue;
200   ink_hrtime next_time;
201   ink_hrtime delta;            // time spent in the event loop
202   ink_hrtime loop_start_time;  // Time the loop started.
203   ink_hrtime loop_finish_time; // Time at the end of the loop.
204 
205   // Track this so we can update on boundary crossing.
206   EventMetrics *prev_metric = this->prev(metrics + (ink_get_hrtime_internal() / HRTIME_SECOND) % N_EVENT_METRICS);
207 
208   int nq_count;
209   int ev_count;
210 
211   // A statically initialized instance we can use as a prototype for initializing other instances.
212   static EventMetrics METRIC_INIT;
213 
214   // give priority to immediate events
215   for (;;) {
216     if (TSSystemState::is_event_system_shut_down()) {
217       return;
218     }
219 
220     loop_start_time = Thread::get_hrtime_updated();
221     nq_count        = 0; // count # of elements put on negative queue.
222     ev_count        = 0; // # of events handled.
223 
224     current_metric = metrics + (loop_start_time / HRTIME_SECOND) % N_EVENT_METRICS;
225     if (current_metric != prev_metric) {
226       // Mixed feelings - really this shouldn't be needed, but just in case more than one entry is
227       // skipped, clear them all.
228       do {
229         memcpy((prev_metric = this->next(prev_metric)), &METRIC_INIT, sizeof(METRIC_INIT));
230       } while (current_metric != prev_metric);
231       current_metric->_loop_time._start = loop_start_time;
232     }
233     ++(current_metric->_count);
234 
235     process_queue(&NegativeQueue, &ev_count, &nq_count);
236 
237     bool done_one;
238     do {
239       done_one = false;
240       // execute all the eligible internal events
241       EventQueue.check_ready(cur_time, this);
242       while ((e = EventQueue.dequeue_ready(cur_time))) {
243         ink_assert(e);
244         ink_assert(e->timeout_at > 0);
245         if (e->cancelled) {
246           free_event(e);
247         } else {
248           done_one = true;
249           process_event(e, e->callback_event);
250         }
251       }
252     } while (done_one);
253 
254     // execute any negative (poll) events
255     if (NegativeQueue.head) {
256       process_queue(&NegativeQueue, &ev_count, &nq_count);
257 
258       // execute poll events
259       while ((e = NegativeQueue.dequeue())) {
260         process_event(e, EVENT_POLL);
261       }
262     }
263 
264     next_time             = EventQueue.earliest_timeout();
265     ink_hrtime sleep_time = next_time - Thread::get_hrtime_updated();
266     if (sleep_time > 0) {
267       sleep_time = std::min(sleep_time, HRTIME_MSECONDS(thread_max_heartbeat_mseconds));
268       ++(current_metric->_wait);
269     } else {
270       sleep_time = 0;
271     }
272 
273     if (n_ethreads_to_be_signalled) {
274       flush_signals(this);
275     }
276 
277     tail_cb->waitForActivity(sleep_time);
278 
279     // loop cleanup
280     loop_finish_time = this->get_hrtime_updated();
281     delta            = loop_finish_time - loop_start_time;
282 
283     // This can happen due to time of day adjustments (which apparently happen quite frequently). I
284     // tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds
285     // of milliseconds), far too much to be actually used.
286     if (delta > 0) {
287       if (delta > current_metric->_loop_time._max) {
288         current_metric->_loop_time._max = delta;
289       }
290       if (delta < current_metric->_loop_time._min) {
291         current_metric->_loop_time._min = delta;
292       }
293     }
294     if (ev_count < current_metric->_events._min) {
295       current_metric->_events._min = ev_count;
296     }
297     if (ev_count > current_metric->_events._max) {
298       current_metric->_events._max = ev_count;
299     }
300     current_metric->_events._total += ev_count;
301   }
302 }
303 
304 //
305 // void  EThread::execute()
306 //
307 // Execute loops forever on:
308 // Find the earliest event.
309 // Sleep until the event time or until an earlier event is inserted
310 // When its time for the event, try to get the appropriate continuation
311 // lock. If successful, call the continuation, otherwise put the event back
312 // into the queue.
313 //
314 
315 void
316 EThread::execute()
317 {
318   // Do the start event first.
319   // coverity[lock]
320   if (start_event) {
321     MUTEX_TAKE_LOCK_FOR(start_event->mutex, this, start_event->continuation);
322     start_event->continuation->handleEvent(EVENT_IMMEDIATE, start_event);
323     MUTEX_UNTAKE_LOCK(start_event->mutex, this);
324     free_event(start_event);
325     start_event = nullptr;
326   }
327 
328   switch (tt) {
329   case REGULAR: {
330     /* The Event Thread has two status: busy and sleep:
331      *   - Keep `EThread::lock` locked while Event Thread is busy,
332      *   - The `EThread::lock` is released while Event Thread is sleep.
333      * When other threads try to acquire the `EThread::lock` of the target Event Thread:
334      *   - Acquired, indicating that the target Event Thread is sleep,
335      *   - Failed, indicating that the target Event Thread is busy.
336      */
337     ink_mutex_acquire(&EventQueueExternal.lock);
338     this->execute_regular();
339     ink_mutex_release(&EventQueueExternal.lock);
340     break;
341   }
342   case DEDICATED: {
343     break;
344   }
345   default:
346     ink_assert(!"bad case value (execute)");
347     break;
348   } /* End switch */
349   // coverity[missing_unlock]
350 }
351 
352 EThread::EventMetrics &
353 EThread::EventMetrics::operator+=(EventMetrics const &that)
354 {
355   this->_events._max = std::max(this->_events._max, that._events._max);
356   this->_events._min = std::min(this->_events._min, that._events._min);
357   this->_events._total += that._events._total;
358   this->_loop_time._min = std::min(this->_loop_time._min, that._loop_time._min);
359   this->_loop_time._max = std::max(this->_loop_time._max, that._loop_time._max);
360   this->_count += that._count;
361   this->_wait += that._wait;
362   return *this;
363 }
364 
365 void
366 EThread::summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES])
367 {
368   // Accumulate in local first so each sample only needs to be processed once,
369   // not N_EVENT_TIMESCALES times.
370   EventMetrics sum;
371 
372   // To avoid race conditions, we back up one from the current metric block. It's close enough
373   // and won't be updated during the time this method runs so it should be thread safe.
374   EventMetrics *m = this->prev(current_metric);
375 
376   for (int t = 0; t < N_EVENT_TIMESCALES; ++t) {
377     int count = SAMPLE_COUNT[t];
378     if (t > 0) {
379       count -= SAMPLE_COUNT[t - 1];
380     }
381     while (--count >= 0) {
382       if (0 != m->_loop_time._start) {
383         sum += *m;
384       }
385       m = this->prev(m);
386     }
387     summary[t] += sum; // push out to return vector.
388   }
389 }
390