1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include <tscore/TSSystemState.h>
25 
26 //////////////////////////////////////////////////////////////////////
27 //
28 // The EThread Class
29 //
30 /////////////////////////////////////////////////////////////////////
31 #include "P_EventSystem.h"
32 
33 #if HAVE_EVENTFD
34 #include <sys/eventfd.h>
35 #endif
36 
37 struct AIOCallback;
38 
39 #define NO_HEARTBEAT -1
40 #define THREAD_MAX_HEARTBEAT_MSECONDS 60
41 
42 // !! THIS MUST BE IN THE ENUM ORDER !!
43 char const *const EThread::STAT_NAME[] = {"proxy.process.eventloop.count",      "proxy.process.eventloop.events",
44                                           "proxy.process.eventloop.events.min", "proxy.process.eventloop.events.max",
45                                           "proxy.process.eventloop.wait",       "proxy.process.eventloop.time.min",
46                                           "proxy.process.eventloop.time.max"};
47 
48 int const EThread::SAMPLE_COUNT[N_EVENT_TIMESCALES] = {10, 100, 1000};
49 
50 int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
51 
52 EThread::EThread()
53 {
54   memset(thread_private, 0, PER_THREAD_DATA);
55 }
56 
57 EThread::EThread(ThreadType att, int anid) : id(anid), tt(att)
58 {
59   ethreads_to_be_signalled = (EThread **)ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *));
60   memset(ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
61   memset(thread_private, 0, PER_THREAD_DATA);
62 #if HAVE_EVENTFD
63   evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
64   if (evfd < 0) {
65     if (errno == EINVAL) { // flags invalid for kernel <= 2.6.26
66       evfd = eventfd(0, 0);
67       if (evfd < 0) {
68         Fatal("EThread::EThread: %d=eventfd(0,0),errno(%d)", evfd, errno);
69       }
70     } else {
71       Fatal("EThread::EThread: %d=eventfd(0,EFD_NONBLOCK | EFD_CLOEXEC),errno(%d)", evfd, errno);
72     }
73   }
74 #elif TS_USE_PORT
75 /* Solaris ports requires no crutches to do cross thread signaling.
76  * We'll just port_send the event straight over the port.
77  */
78 #else
79   ink_release_assert(pipe(evpipe) >= 0);
80   fcntl(evpipe[0], F_SETFD, FD_CLOEXEC);
81   fcntl(evpipe[0], F_SETFL, O_NONBLOCK);
82   fcntl(evpipe[1], F_SETFD, FD_CLOEXEC);
83   fcntl(evpipe[1], F_SETFL, O_NONBLOCK);
84 #endif
85 }
86 
87 EThread::EThread(ThreadType att, Event *e) : tt(att), start_event(e)
88 {
89   ink_assert(att == DEDICATED);
90   memset(thread_private, 0, PER_THREAD_DATA);
91 }
92 
93 // Provide a destructor so that SDK functions which create and destroy
94 // threads won't have to deal with EThread memory deallocation.
95 EThread::~EThread()
96 {
97   if (n_ethreads_to_be_signalled > 0) {
98     flush_signals(this);
99   }
100   ats_free(ethreads_to_be_signalled);
101   // TODO: This can't be deleted ....
102   // delete[]l1_hash;
103 }
104 
105 bool
106 EThread::is_event_type(EventType et)
107 {
108   return (event_types & (1 << static_cast<int>(et))) != 0;
109 }
110 
111 void
112 EThread::set_event_type(EventType et)
113 {
114   event_types |= (1 << static_cast<int>(et));
115 }
116 
117 void
118 EThread::process_event(Event *e, int calling_code)
119 {
120   ink_assert((!e->in_the_prot_queue && !e->in_the_priority_queue));
121   MUTEX_TRY_LOCK(lock, e->mutex, this);
122   if (!lock.is_locked()) {
123     e->timeout_at = cur_time + DELAY_FOR_RETRY;
124     EventQueueExternal.enqueue_local(e);
125   } else {
126     if (e->cancelled) {
127       free_event(e);
128       return;
129     }
130     Continuation *c_temp = e->continuation;
131     // Make sure that the contination is locked before calling the handler
132     e->continuation->handleEvent(calling_code, e);
133     ink_assert(!e->in_the_priority_queue);
134     ink_assert(c_temp == e->continuation);
135     MUTEX_RELEASE(lock);
136     if (e->period) {
137       if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
138         if (e->period < 0) {
139           e->timeout_at = e->period;
140         } else {
141           this->get_hrtime_updated();
142           e->timeout_at = cur_time + e->period;
143           if (e->timeout_at < cur_time) {
144             e->timeout_at = cur_time;
145           }
146         }
147         EventQueueExternal.enqueue_local(e);
148       }
149     } else if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
150       free_event(e);
151     }
152   }
153 }
154 
155 void
156 EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count)
157 {
158   Event *e;
159 
160   // Move events from the external thread safe queues to the local queue.
161   EventQueueExternal.dequeue_external();
162 
163   // execute all the available external events that have
164   // already been dequeued
165   while ((e = EventQueueExternal.dequeue_local())) {
166     ++(*ev_count);
167     if (e->cancelled) {
168       free_event(e);
169     } else if (!e->timeout_at) { // IMMEDIATE
170       ink_assert(e->period == 0);
171       process_event(e, e->callback_event);
172     } else if (e->timeout_at > 0) { // INTERVAL
173       EventQueue.enqueue(e, cur_time);
174     } else { // NEGATIVE
175       Event *p = nullptr;
176       Event *a = NegativeQueue->head;
177       while (a && a->timeout_at > e->timeout_at) {
178         p = a;
179         a = a->link.next;
180       }
181       if (!a) {
182         NegativeQueue->enqueue(e);
183       } else {
184         NegativeQueue->insert(e, p);
185       }
186     }
187     ++(*nq_count);
188   }
189 }
190 
191 void
192 EThread::execute_regular()
193 {
194   Event *e;
195   Que(Event, link) NegativeQueue;
196   ink_hrtime next_time = 0;
197   ink_hrtime delta     = 0;    // time spent in the event loop
198   ink_hrtime loop_start_time;  // Time the loop started.
199   ink_hrtime loop_finish_time; // Time at the end of the loop.
200 
201   // Track this so we can update on boundary crossing.
202   EventMetrics *prev_metric = this->prev(metrics + (ink_get_hrtime_internal() / HRTIME_SECOND) % N_EVENT_METRICS);
203 
204   int nq_count = 0;
205   int ev_count = 0;
206 
207   // A statically initialized instance we can use as a prototype for initializing other instances.
208   static EventMetrics METRIC_INIT;
209 
210   // give priority to immediate events
211   for (;;) {
212     if (TSSystemState::is_event_system_shut_down()) {
213       return;
214     }
215 
216     loop_start_time = Thread::get_hrtime_updated();
217     nq_count        = 0; // count # of elements put on negative queue.
218     ev_count        = 0; // # of events handled.
219 
220     current_metric = metrics + (loop_start_time / HRTIME_SECOND) % N_EVENT_METRICS;
221     if (current_metric != prev_metric) {
222       // Mixed feelings - really this shouldn't be needed, but just in case more than one entry is
223       // skipped, clear them all.
224       do {
225         memcpy((prev_metric = this->next(prev_metric)), &METRIC_INIT, sizeof(METRIC_INIT));
226       } while (current_metric != prev_metric);
227       current_metric->_loop_time._start = loop_start_time;
228     }
229     ++(current_metric->_count);
230 
231     process_queue(&NegativeQueue, &ev_count, &nq_count);
232 
233     bool done_one;
234     do {
235       done_one = false;
236       // execute all the eligible internal events
237       EventQueue.check_ready(cur_time, this);
238       while ((e = EventQueue.dequeue_ready(cur_time))) {
239         ink_assert(e);
240         ink_assert(e->timeout_at > 0);
241         if (e->cancelled) {
242           free_event(e);
243         } else {
244           done_one = true;
245           process_event(e, e->callback_event);
246         }
247       }
248     } while (done_one);
249 
250     // execute any negative (poll) events
251     if (NegativeQueue.head) {
252       process_queue(&NegativeQueue, &ev_count, &nq_count);
253 
254       // execute poll events
255       while ((e = NegativeQueue.dequeue())) {
256         process_event(e, EVENT_POLL);
257       }
258     }
259 
260     next_time             = EventQueue.earliest_timeout();
261     ink_hrtime sleep_time = next_time - Thread::get_hrtime_updated();
262     if (sleep_time > 0) {
263       sleep_time = std::min(sleep_time, HRTIME_MSECONDS(thread_max_heartbeat_mseconds));
264       ++(current_metric->_wait);
265     } else {
266       sleep_time = 0;
267     }
268 
269     if (n_ethreads_to_be_signalled) {
270       flush_signals(this);
271     }
272 
273     tail_cb->waitForActivity(sleep_time);
274 
275     // loop cleanup
276     loop_finish_time = this->get_hrtime_updated();
277     delta            = loop_finish_time - loop_start_time;
278 
279     // This can happen due to time of day adjustments (which apparently happen quite frequently). I
280     // tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds
281     // of milliseconds), far too much to be actually used.
282     if (delta > 0) {
283       if (delta > current_metric->_loop_time._max) {
284         current_metric->_loop_time._max = delta;
285       }
286       if (delta < current_metric->_loop_time._min) {
287         current_metric->_loop_time._min = delta;
288       }
289     }
290     if (ev_count < current_metric->_events._min) {
291       current_metric->_events._min = ev_count;
292     }
293     if (ev_count > current_metric->_events._max) {
294       current_metric->_events._max = ev_count;
295     }
296     current_metric->_events._total += ev_count;
297   }
298 }
299 
300 //
301 // void  EThread::execute()
302 //
303 // Execute loops forever on:
304 // Find the earliest event.
305 // Sleep until the event time or until an earlier event is inserted
306 // When its time for the event, try to get the appropriate continuation
307 // lock. If successful, call the continuation, otherwise put the event back
308 // into the queue.
309 //
310 
311 void
312 EThread::execute()
313 {
314   // Do the start event first.
315   // coverity[lock]
316   if (start_event) {
317     MUTEX_TAKE_LOCK_FOR(start_event->mutex, this, start_event->continuation);
318     start_event->continuation->handleEvent(EVENT_IMMEDIATE, start_event);
319     MUTEX_UNTAKE_LOCK(start_event->mutex, this);
320     free_event(start_event);
321     start_event = nullptr;
322   }
323 
324   switch (tt) {
325   case REGULAR: {
326     /* The Event Thread has two status: busy and sleep:
327      *   - Keep `EThread::lock` locked while Event Thread is busy,
328      *   - The `EThread::lock` is released while Event Thread is sleep.
329      * When other threads try to acquire the `EThread::lock` of the target Event Thread:
330      *   - Acquired, indicating that the target Event Thread is sleep,
331      *   - Failed, indicating that the target Event Thread is busy.
332      */
333     ink_mutex_acquire(&EventQueueExternal.lock);
334     this->execute_regular();
335     ink_mutex_release(&EventQueueExternal.lock);
336     break;
337   }
338   case DEDICATED: {
339     break;
340   }
341   default:
342     ink_assert(!"bad case value (execute)");
343     break;
344   } /* End switch */
345   // coverity[missing_unlock]
346 }
347 
348 EThread::EventMetrics &
349 EThread::EventMetrics::operator+=(EventMetrics const &that)
350 {
351   this->_events._max = std::max(this->_events._max, that._events._max);
352   this->_events._min = std::min(this->_events._min, that._events._min);
353   this->_events._total += that._events._total;
354   this->_loop_time._min = std::min(this->_loop_time._min, that._loop_time._min);
355   this->_loop_time._max = std::max(this->_loop_time._max, that._loop_time._max);
356   this->_count += that._count;
357   this->_wait += that._wait;
358   return *this;
359 }
360 
361 void
362 EThread::summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES])
363 {
364   // Accumulate in local first so each sample only needs to be processed once,
365   // not N_EVENT_TIMESCALES times.
366   EventMetrics sum;
367 
368   // To avoid race conditions, we back up one from the current metric block. It's close enough
369   // and won't be updated during the time this method runs so it should be thread safe.
370   EventMetrics *m = this->prev(current_metric);
371 
372   for (int t = 0; t < N_EVENT_TIMESCALES; ++t) {
373     int count = SAMPLE_COUNT[t];
374     if (t > 0) {
375       count -= SAMPLE_COUNT[t - 1];
376     }
377     while (--count >= 0) {
378       if (0 != m->_loop_time._start) {
379         sum += *m;
380       }
381       m = this->prev(m);
382     }
383     summary[t] += sum; // push out to return vector.
384   }
385 }
386