1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22 
23  */
24 
25 #pragma once
26 
27 #include "tscore/ink_platform.h"
28 #include "tscore/ink_rand.h"
29 #include "tscore/I_Version.h"
30 #include "I_Thread.h"
31 #include "I_PriorityEventQueue.h"
32 #include "I_ProtectedQueue.h"
33 
34 // TODO: This would be much nicer to have "run-time" configurable (or something),
35 // perhaps based on proxy.config.stat_api.max_stats_allowed or other configs. XXX
36 #define PER_THREAD_DATA (1024 * 1024)
37 
38 // This is not used by the cache anymore, it uses proxy.config.cache.mutex_retry_delay
39 // instead.
40 #define MUTEX_RETRY_DELAY HRTIME_MSECONDS(20)
41 
42 struct DiskHandler;
43 struct EventIO;
44 
45 class ServerSessionPool;
46 class Event;
47 class Continuation;
48 
49 enum ThreadType {
50   REGULAR = 0,
51   DEDICATED,
52 };
53 
54 extern bool shutdown_event_system;
55 
56 /**
57   Event System specific type of thread.
58 
59   The EThread class is the type of thread created and managed by
60   the Event System. It is one of the available interfaces for
61   schedulling events in the event system (another two are the Event
62   and EventProcessor classes).
63 
64   In order to handle events, each EThread object has two event
65   queues, one external and one internal. The external queue is
66   provided for users of the EThread (clients) to append events to
67   that particular thread. Since it can be accessed by other threads
68   at the same time, operations using it must proceed in an atomic
69   fashion.
70 
71   The internal queue, in the other hand, is used exclusively by the
72   EThread to process timed events within a certain time frame. These
73   events are queued internally and they may come from the external
74   queue as well.
75 
76   Scheduling Interface:
77 
78   There are eight schedulling functions provided by EThread and
79   they are a wrapper around their counterparts in EventProcessor.
80 
81   @see EventProcessor
82   @see Event
83 
84 */
85 class EThread : public Thread
86 {
87 public:
88   /** Handler for tail of event loop.
89 
90       The event loop should not spin. To avoid that a tail handler is called to block for a limited time.
91       This is a protocol class that defines the interface to the handler.
92   */
93   class LoopTailHandler
94   {
95   public:
96     /** Called at the end of the event loop to block.
97         @a timeout is the maximum length of time (in ns) to block.
98     */
99     virtual int waitForActivity(ink_hrtime timeout) = 0;
100     /** Unblock.
101 
102         This is required to unblock (wake up) the block created by calling @a cb.
103     */
104     virtual void signalActivity() = 0;
105 
106     virtual ~LoopTailHandler() {}
107   };
108 
109   /*-------------------------------------------------------*\
110   |  Common Interface                                       |
111   \*-------------------------------------------------------*/
112 
113   /**
114     Schedules the continuation on this EThread to receive an event
115     as soon as possible.
116 
117     Forwards to the EventProcessor the schedule of the callback to
118     the continuation 'c' as soon as possible. The event is assigned
119     to EThread.
120 
121     @param c Continuation to be called back as soon as possible.
122     @param callback_event Event code to be passed back to the
123       continuation's handler. See the the EventProcessor class.
124     @param cookie User-defined value or pointer to be passed back
125       in the Event's object cookie field.
126     @return Reference to an Event object representing the schedulling
127       of this callback.
128 
129   */
130   Event *schedule_imm(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
131   Event *schedule_imm_signal(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
132 
133   /**
134     Schedules the continuation on this EThread to receive an event
135     at the given timeout.
136 
137     Forwards the request to the EventProcessor to schedule the
138     callback to the continuation 'c' at the time specified in
139     'atimeout_at'. The event is assigned to this EThread.
140 
141     @param c Continuation to be called back at the time specified
142       in 'atimeout_at'.
143     @param atimeout_at Time value at which to callback.
144     @param callback_event Event code to be passed back to the
145       continuation's handler. See the EventProcessor class.
146     @param cookie User-defined value or pointer to be passed back
147       in the Event's object cookie field.
148     @return A reference to an Event object representing the schedulling
149       of this callback.
150 
151   */
152   Event *schedule_at(Continuation *c, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
153 
154   /**
155     Schedules the continuation on this EThread to receive an event
156     after the timeout elapses.
157 
158     Instructs the EventProcessor to schedule the callback to the
159     continuation 'c' after the time specified in atimeout_in elapses.
160     The event is assigned to this EThread.
161 
162     @param c Continuation to be called back after the timeout elapses.
163     @param atimeout_in Amount of time after which to callback.
164     @param callback_event Event code to be passed back to the
165       continuation's handler. See the EventProcessor class.
166     @param cookie User-defined value or pointer to be passed back
167       in the Event's object cookie field.
168     @return A reference to an Event object representing the schedulling
169       of this callback.
170 
171   */
172   Event *schedule_in(Continuation *c, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
173 
174   /**
175     Schedules the continuation on this EThread to receive an event
176     periodically.
177 
178     Schedules the callback to the continuation 'c' in the EventProcessor
179     to occur every time 'aperiod' elapses. It is scheduled on this
180     EThread.
181 
182     @param c Continuation to call back everytime 'aperiod' elapses.
183     @param aperiod Duration of the time period between callbacks.
184     @param callback_event Event code to be passed back to the
185       continuation's handler. See the Remarks section in the
186       EventProcessor class.
187     @param cookie User-defined value or pointer to be passed back
188       in the Event's object cookie field.
189     @return A reference to an Event object representing the schedulling
190       of this callback.
191 
192   */
193   Event *schedule_every(Continuation *c, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
194 
195   /**
196     Schedules the continuation on this EThread to receive an event
197     as soon as possible.
198 
199     Schedules the callback to the continuation 'c' as soon as
200     possible. The event is assigned to this EThread.
201 
202     @param c Continuation to be called back as soon as possible.
203     @param callback_event Event code to be passed back to the
204       continuation's handler. See the EventProcessor class.
205     @param cookie User-defined value or pointer to be passed back
206       in the Event's object cookie field.
207     @return A reference to an Event object representing the schedulling
208       of this callback.
209 
210   */
211   Event *schedule_imm_local(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
212 
213   /**
214     Schedules the continuation on this EThread to receive an event
215     at the given timeout.
216 
217     Schedules the callback to the continuation 'c' at the time
218     specified in 'atimeout_at'. The event is assigned to this
219     EThread.
220 
221     @param c Continuation to be called back at the time specified
222       in 'atimeout_at'.
223     @param atimeout_at Time value at which to callback.
224     @param callback_event Event code to be passed back to the
225       continuation's handler. See the EventProcessor class.
226     @param cookie User-defined value or pointer to be passed back
227       in the Event's object cookie field.
228     @return A reference to an Event object representing the schedulling
229       of this callback.
230 
231   */
232   Event *schedule_at_local(Continuation *c, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
233 
234   /**
235     Schedules the continuation on this EThread to receive an event
236     after the timeout elapses.
237 
238     Schedules the callback to the continuation 'c' after the time
239     specified in atimeout_in elapses. The event is assigned to this
240     EThread.
241 
242     @param c Continuation to be called back after the timeout elapses.
243     @param atimeout_in Amount of time after which to callback.
244     @param callback_event Event code to be passed back to the
245       continuation's handler. See the Remarks section in the
246       EventProcessor class.
247     @param cookie User-defined value or pointer to be passed back
248       in the Event's object cookie field.
249     @return A reference to an Event object representing the schedulling
250       of this callback.
251 
252   */
253   Event *schedule_in_local(Continuation *c, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
254 
255   /**
256     Schedules the continuation on this EThread to receive an event
257     periodically.
258 
259     Schedules the callback to the continuation 'c' to occur every
260     time 'aperiod' elapses. It is scheduled on this EThread.
261 
262     @param c Continuation to call back everytime 'aperiod' elapses.
263     @param aperiod Duration of the time period between callbacks.
264     @param callback_event Event code to be passed back to the
265       continuation's handler. See the Remarks section in the
266       EventProcessor class.
267     @param cookie User-defined value or pointer to be passed back
268       in the Event's object cookie field.
269     @return A reference to an Event object representing the schedulling
270       of this callback.
271 
272   */
273   Event *schedule_every_local(Continuation *c, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
274 
275   /** Schedule an event called once when the thread is spawned.
276 
277       This is useful only for regular threads and if called before @c Thread::start. The event will be
278       called first before the event loop.
279 
280       @Note This will override the event for a dedicate thread so that this is called instead of the
281       event passed to the constructor.
282   */
283   Event *schedule_spawn(Continuation *c, int ev = EVENT_IMMEDIATE, void *cookie = nullptr);
284 
285   // Set the tail handler.
286   void set_tail_handler(LoopTailHandler *handler);
287 
288   /* private */
289 
290   Event *schedule_local(Event *e);
291 
292   InkRand generator = static_cast<uint64_t>(Thread::get_hrtime_updated() ^ reinterpret_cast<uintptr_t>(this));
293 
294   /*-------------------------------------------------------*\
295   |  UNIX Interface                                         |
296   \*-------------------------------------------------------*/
297 
298   EThread();
299   EThread(ThreadType att, int anid);
300   EThread(ThreadType att, Event *e);
301   EThread(const EThread &) = delete;
302   EThread &operator=(const EThread &) = delete;
303   ~EThread() override;
304 
305   Event *schedule(Event *e, bool fast_signal = false);
306 
307   /** Block of memory to allocate thread specific data e.g. stat system arrays. */
308   char thread_private[PER_THREAD_DATA];
309 
310   /** Private Data for the Disk Processor. */
311   DiskHandler *diskHandler = nullptr;
312 
313   /** Private Data for AIO. */
314   Que(Continuation, link) aio_ops;
315 
316   ProtectedQueue EventQueueExternal;
317   PriorityEventQueue EventQueue;
318 
319   EThread **ethreads_to_be_signalled = nullptr;
320   int n_ethreads_to_be_signalled     = 0;
321 
322   static constexpr int NO_ETHREAD_ID = -1;
323   int id                             = NO_ETHREAD_ID;
324   unsigned int event_types           = 0;
325   bool is_event_type(EventType et);
326   void set_event_type(EventType et);
327 
328   // Private Interface
329 
330   void execute() override;
331   void execute_regular();
332   void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count);
333   void process_event(Event *e, int calling_code);
334   void free_event(Event *e);
335   LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;
336 
337 #if HAVE_EVENTFD
338   int evfd = ts::NO_FD;
339 #else
340   int evpipe[2];
341 #endif
342   EventIO *ep = nullptr;
343 
344   ThreadType tt = REGULAR;
345   /** Initial event to call, before any scheduling.
346 
347       For dedicated threads this is the only event called.
348       For regular threads this is called first before the event loop starts.
349       @internal For regular threads this is used by the EventProcessor to get called back after
350       the thread starts but before any other events can be dispatched to provide initializations
351       needed for the thread.
352   */
353   Event *start_event = nullptr;
354 
355   ServerSessionPool *server_session_pool = nullptr;
356 
357   /** Default handler used until it is overridden.
358 
359       This uses the cond var wait in @a ExternalQueue.
360   */
361   class DefaultTailHandler : public LoopTailHandler
362   {
363     DefaultTailHandler(ProtectedQueue &q) : _q(q) {}
364 
365     int
366     waitForActivity(ink_hrtime timeout) override
367     {
368       _q.wait(Thread::get_hrtime() + timeout);
369       return 0;
370     }
371     void
372     signalActivity() override
373     {
374       /* Try to acquire the `EThread::lock` of the Event Thread:
375        *   - Acquired, indicating that the Event Thread is sleep,
376        *               must send a wakeup signal to the Event Thread.
377        *   - Failed, indicating that the Event Thread is busy, do nothing.
378        */
379       (void)_q.try_signal();
380     }
381 
382     ProtectedQueue &_q;
383 
384     friend class EThread;
385   } DEFAULT_TAIL_HANDLER = EventQueueExternal;
386 
387   /// Statistics data for event dispatching.
388   struct EventMetrics {
389     /// Time the loop was active, not including wait time but including event dispatch time.
390     struct LoopTimes {
391       ink_hrtime _start = 0;         ///< The time of the first loop for this sample. Used to mark valid entries.
392       ink_hrtime _min   = INT64_MAX; ///< Shortest loop time.
393       ink_hrtime _max   = 0;         ///< Longest loop time.
394       LoopTimes() {}
395     } _loop_time;
396 
397     struct Events {
398       int _min   = INT_MAX;
399       int _max   = 0;
400       int _total = 0;
401       Events() {}
402     } _events;
403 
404     int _count = 0; ///< # of times the loop executed.
405     int _wait  = 0; ///< # of timed wait for events
406 
407     /// Add @a that to @a this data.
408     /// This embodies the custom logic per member concerning whether each is a sum, min, or max.
409     EventMetrics &operator+=(EventMetrics const &that);
410 
411     EventMetrics() {}
412   };
413 
414   /** The number of metric blocks kept.
415       This is a circular buffer, with one block per second. We have a bit more than the required 1000
416       to provide sufficient slop for cross thread reading of the data (as only the current metric block
417       is being updated).
418   */
419   static int const N_EVENT_METRICS = 1024;
420 
421   volatile EventMetrics *current_metric = nullptr; ///< The current element of @a metrics
422   EventMetrics metrics[N_EVENT_METRICS];
423 
424   /** The various stats provided to the administrator.
425       THE ORDER IS VERY SENSITIVE.
426       More than one part of the code depends on this exact order. Be careful and thorough when changing.
427   */
428   enum STAT_ID {
429     STAT_LOOP_COUNT,      ///< # of event loops executed.
430     STAT_LOOP_EVENTS,     ///< # of events
431     STAT_LOOP_EVENTS_MIN, ///< min # of events dispatched in a loop
432     STAT_LOOP_EVENTS_MAX, ///< max # of events dispatched in a loop
433     STAT_LOOP_WAIT,       ///< # of loops that did a conditional wait.
434     STAT_LOOP_TIME_MIN,   ///< Shortest time spent in loop.
435     STAT_LOOP_TIME_MAX,   ///< Longest time spent in loop.
436     N_EVENT_STATS         ///< NOT A VALID STAT INDEX - # of different stat types.
437   };
438 
439   static char const *const STAT_NAME[N_EVENT_STATS];
440 
441   /** The number of time scales used in the event statistics.
442       Currently these are 10s, 100s, 1000s.
443   */
444   static int const N_EVENT_TIMESCALES = 3;
445   /// # of samples for each time scale.
446   static int const SAMPLE_COUNT[N_EVENT_TIMESCALES];
447 
448   /// Process the last 1000s of data and write out the summaries to @a summary.
449   void summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES]);
450   /// Back up the metric pointer, wrapping as needed.
451   EventMetrics *
452   prev(EventMetrics volatile *current)
453   {
454     return const_cast<EventMetrics *>(--current < metrics ? &metrics[N_EVENT_METRICS - 1] : current); // cast to remove volatile
455   }
456   /// Advance the metric pointer, wrapping as needed.
457   EventMetrics *
458   next(EventMetrics volatile *current)
459   {
460     return const_cast<EventMetrics *>(++current > &metrics[N_EVENT_METRICS - 1] ? metrics : current); // cast to remove volatile
461   }
462 };
463 
464 /**
465   This is used so that we dont use up operator new(size_t, void *)
466   which users might want to define for themselves.
467 
468 */
469 class ink_dummy_for_new
470 {
471 };
472 
473 inline void *
474 operator new(size_t, ink_dummy_for_new *p)
475 {
476   return (void *)p;
477 }
478 #define ETHREAD_GET_PTR(thread, offset) ((void *)((char *)(thread) + (offset)))
479 
480 extern EThread *this_ethread();
481 
482 extern int thread_max_heartbeat_mseconds;
483