1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22 
23  */
24 
25 #pragma once
26 
27 #include "tscore/ink_platform.h"
28 #include "tscore/ink_rand.h"
29 #include "tscore/I_Version.h"
30 #include "I_Thread.h"
31 #include "I_PriorityEventQueue.h"
32 #include "I_ProtectedQueue.h"
33 
34 // TODO: This would be much nicer to have "run-time" configurable (or something),
35 // perhaps based on proxy.config.stat_api.max_stats_allowed or other configs. XXX
36 #define PER_THREAD_DATA (1024 * 1024)
37 
38 // This is not used by the cache anymore, it uses proxy.config.cache.mutex_retry_delay
39 // instead.
40 #define MUTEX_RETRY_DELAY HRTIME_MSECONDS(20)
41 
42 struct DiskHandler;
43 struct EventIO;
44 
45 class ServerSessionPool;
46 class Event;
47 class Continuation;
48 
49 enum ThreadType {
50   REGULAR = 0,
51   DEDICATED,
52 };
53 
54 /**
55   Event System specific type of thread.
56 
57   The EThread class is the type of thread created and managed by
58   the Event System. It is one of the available interfaces for
59   scheduling events in the event system (another two are the Event
60   and EventProcessor classes).
61 
62   In order to handle events, each EThread object has two event
63   queues, one external and one internal. The external queue is
64   provided for users of the EThread (clients) to append events to
65   that particular thread. Since it can be accessed by other threads
66   at the same time, operations using it must proceed in an atomic
67   fashion.
68 
69   The internal queue, in the other hand, is used exclusively by the
70   EThread to process timed events within a certain time frame. These
71   events are queued internally and they may come from the external
72   queue as well.
73 
74   Scheduling Interface:
75 
76   There are eight scheduling functions provided by EThread and
77   they are a wrapper around their counterparts in EventProcessor.
78 
79   @see EventProcessor
80   @see Event
81 
82 */
83 class EThread : public Thread
84 {
85 public:
86   /** Handler for tail of event loop.
87 
88       The event loop should not spin. To avoid that a tail handler is called to block for a limited time.
89       This is a protocol class that defines the interface to the handler.
90   */
91   class LoopTailHandler
92   {
93   public:
94     /** Called at the end of the event loop to block.
95         @a timeout is the maximum length of time (in ns) to block.
96     */
97     virtual int waitForActivity(ink_hrtime timeout) = 0;
98     /** Unblock.
99 
100         This is required to unblock (wake up) the block created by calling @a cb.
101     */
102     virtual void signalActivity() = 0;
103 
104     virtual ~LoopTailHandler() {}
105   };
106 
107   /*-------------------------------------------------------*\
108   |  Common Interface                                       |
109   \*-------------------------------------------------------*/
110 
111   /**
112     Schedules the continuation on this EThread to receive an event
113     as soon as possible.
114 
115     Forwards to the EventProcessor the schedule of the callback to
116     the continuation 'c' as soon as possible. The event is assigned
117     to EThread.
118 
119     @param c Continuation to be called back as soon as possible.
120     @param callback_event Event code to be passed back to the
121       continuation's handler. See the the EventProcessor class.
122     @param cookie User-defined value or pointer to be passed back
123       in the Event's object cookie field.
124     @return Reference to an Event object representing the scheduling
125       of this callback.
126 
127   */
128   Event *schedule_imm(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
129   Event *schedule_imm_signal(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
130 
131   /**
132     Schedules the continuation on this EThread to receive an event
133     at the given timeout.
134 
135     Forwards the request to the EventProcessor to schedule the
136     callback to the continuation 'c' at the time specified in
137     'atimeout_at'. The event is assigned to this EThread.
138 
139     @param c Continuation to be called back at the time specified
140       in 'atimeout_at'.
141     @param atimeout_at Time value at which to callback.
142     @param callback_event Event code to be passed back to the
143       continuation's handler. See the EventProcessor class.
144     @param cookie User-defined value or pointer to be passed back
145       in the Event's object cookie field.
146     @return A reference to an Event object representing the scheduling
147       of this callback.
148 
149   */
150   Event *schedule_at(Continuation *c, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
151 
152   /**
153     Schedules the continuation on this EThread to receive an event
154     after the timeout elapses.
155 
156     Instructs the EventProcessor to schedule the callback to the
157     continuation 'c' after the time specified in atimeout_in elapses.
158     The event is assigned to this EThread.
159 
160     @param c Continuation to be called back after the timeout elapses.
161     @param atimeout_in Amount of time after which to callback.
162     @param callback_event Event code to be passed back to the
163       continuation's handler. See the EventProcessor class.
164     @param cookie User-defined value or pointer to be passed back
165       in the Event's object cookie field.
166     @return A reference to an Event object representing the scheduling
167       of this callback.
168 
169   */
170   Event *schedule_in(Continuation *c, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
171 
172   /**
173     Schedules the continuation on this EThread to receive an event
174     periodically.
175 
176     Schedules the callback to the continuation 'c' in the EventProcessor
177     to occur every time 'aperiod' elapses. It is scheduled on this
178     EThread.
179 
180     @param c Continuation to call back every time 'aperiod' elapses.
181     @param aperiod Duration of the time period between callbacks.
182     @param callback_event Event code to be passed back to the
183       continuation's handler. See the Remarks section in the
184       EventProcessor class.
185     @param cookie User-defined value or pointer to be passed back
186       in the Event's object cookie field.
187     @return A reference to an Event object representing the scheduling
188       of this callback.
189 
190   */
191   Event *schedule_every(Continuation *c, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
192 
193   /**
194     Schedules the continuation on this EThread to receive an event
195     as soon as possible.
196 
197     Schedules the callback to the continuation 'c' as soon as
198     possible. The event is assigned to this EThread.
199 
200     @param c Continuation to be called back as soon as possible.
201     @param callback_event Event code to be passed back to the
202       continuation's handler. See the EventProcessor class.
203     @param cookie User-defined value or pointer to be passed back
204       in the Event's object cookie field.
205     @return A reference to an Event object representing the scheduling
206       of this callback.
207 
208   */
209   Event *schedule_imm_local(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
210 
211   /**
212     Schedules the continuation on this EThread to receive an event
213     at the given timeout.
214 
215     Schedules the callback to the continuation 'c' at the time
216     specified in 'atimeout_at'. The event is assigned to this
217     EThread.
218 
219     @param c Continuation to be called back at the time specified
220       in 'atimeout_at'.
221     @param atimeout_at Time value at which to callback.
222     @param callback_event Event code to be passed back to the
223       continuation's handler. See the EventProcessor class.
224     @param cookie User-defined value or pointer to be passed back
225       in the Event's object cookie field.
226     @return A reference to an Event object representing the scheduling
227       of this callback.
228 
229   */
230   Event *schedule_at_local(Continuation *c, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
231 
232   /**
233     Schedules the continuation on this EThread to receive an event
234     after the timeout elapses.
235 
236     Schedules the callback to the continuation 'c' after the time
237     specified in atimeout_in elapses. The event is assigned to this
238     EThread.
239 
240     @param c Continuation to be called back after the timeout elapses.
241     @param atimeout_in Amount of time after which to callback.
242     @param callback_event Event code to be passed back to the
243       continuation's handler. See the Remarks section in the
244       EventProcessor class.
245     @param cookie User-defined value or pointer to be passed back
246       in the Event's object cookie field.
247     @return A reference to an Event object representing the scheduling
248       of this callback.
249 
250   */
251   Event *schedule_in_local(Continuation *c, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
252 
253   /**
254     Schedules the continuation on this EThread to receive an event
255     periodically.
256 
257     Schedules the callback to the continuation 'c' to occur every
258     time 'aperiod' elapses. It is scheduled on this EThread.
259 
260     @param c Continuation to call back every time 'aperiod' elapses.
261     @param aperiod Duration of the time period between callbacks.
262     @param callback_event Event code to be passed back to the
263       continuation's handler. See the Remarks section in the
264       EventProcessor class.
265     @param cookie User-defined value or pointer to be passed back
266       in the Event's object cookie field.
267     @return A reference to an Event object representing the scheduling
268       of this callback.
269 
270   */
271   Event *schedule_every_local(Continuation *c, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
272 
273   /** Schedule an event called once when the thread is spawned.
274 
275       This is useful only for regular threads and if called before @c Thread::start. The event will be
276       called first before the event loop.
277 
278       @Note This will override the event for a dedicate thread so that this is called instead of the
279       event passed to the constructor.
280   */
281   Event *schedule_spawn(Continuation *c, int ev = EVENT_IMMEDIATE, void *cookie = nullptr);
282 
283   // Set the tail handler.
284   void set_tail_handler(LoopTailHandler *handler);
285 
286   /* private */
287 
288   Event *schedule_local(Event *e);
289 
290   InkRand generator = static_cast<uint64_t>(Thread::get_hrtime_updated() ^ reinterpret_cast<uintptr_t>(this));
291 
292   /*-------------------------------------------------------*\
293   |  UNIX Interface                                         |
294   \*-------------------------------------------------------*/
295 
296   EThread();
297   EThread(ThreadType att, int anid);
298   EThread(ThreadType att, Event *e);
299   EThread(const EThread &) = delete;
300   EThread &operator=(const EThread &) = delete;
301   ~EThread() override;
302 
303   Event *schedule(Event *e, bool fast_signal = false);
304 
305   /** Block of memory to allocate thread specific data e.g. stat system arrays. */
306   char thread_private[PER_THREAD_DATA];
307 
308   /** Private Data for the Disk Processor. */
309   DiskHandler *diskHandler = nullptr;
310 
311   /** Private Data for AIO. */
312   Que(Continuation, link) aio_ops;
313 
314   ProtectedQueue EventQueueExternal;
315   PriorityEventQueue EventQueue;
316 
317   EThread **ethreads_to_be_signalled = nullptr;
318   int n_ethreads_to_be_signalled     = 0;
319 
320   static constexpr int NO_ETHREAD_ID = -1;
321   int id                             = NO_ETHREAD_ID;
322   unsigned int event_types           = 0;
323   bool is_event_type(EventType et);
324   void set_event_type(EventType et);
325 
326   // Private Interface
327 
328   void execute() override;
329   void execute_regular();
330   void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count);
331   void process_event(Event *e, int calling_code);
332   void free_event(Event *e);
333   LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;
334 
335 #if HAVE_EVENTFD
336   int evfd = ts::NO_FD;
337 #else
338   int evpipe[2];
339 #endif
340   EventIO *ep = nullptr;
341 
342   ThreadType tt = REGULAR;
343   /** Initial event to call, before any scheduling.
344 
345       For dedicated threads this is the only event called.
346       For regular threads this is called first before the event loop starts.
347       @internal For regular threads this is used by the EventProcessor to get called back after
348       the thread starts but before any other events can be dispatched to provide initializations
349       needed for the thread.
350   */
351   Event *start_event = nullptr;
352 
353   ServerSessionPool *server_session_pool = nullptr;
354 
355   /** Default handler used until it is overridden.
356 
357       This uses the cond var wait in @a ExternalQueue.
358   */
359   class DefaultTailHandler : public LoopTailHandler
360   {
361     DefaultTailHandler(ProtectedQueue &q) : _q(q) {}
362 
363     int
364     waitForActivity(ink_hrtime timeout) override
365     {
366       _q.wait(Thread::get_hrtime() + timeout);
367       return 0;
368     }
369     void
370     signalActivity() override
371     {
372       /* Try to acquire the `EThread::lock` of the Event Thread:
373        *   - Acquired, indicating that the Event Thread is sleep,
374        *               must send a wakeup signal to the Event Thread.
375        *   - Failed, indicating that the Event Thread is busy, do nothing.
376        */
377       (void)_q.try_signal();
378     }
379 
380     ProtectedQueue &_q;
381 
382     friend class EThread;
383   } DEFAULT_TAIL_HANDLER = EventQueueExternal;
384 
385   /// Statistics data for event dispatching.
386   struct EventMetrics {
387     /// Time the loop was active, not including wait time but including event dispatch time.
388     struct LoopTimes {
389       ink_hrtime _start = 0;         ///< The time of the first loop for this sample. Used to mark valid entries.
390       ink_hrtime _min   = INT64_MAX; ///< Shortest loop time.
391       ink_hrtime _max   = 0;         ///< Longest loop time.
392       LoopTimes() {}
393     } _loop_time;
394 
395     struct Events {
396       int _min   = INT_MAX;
397       int _max   = 0;
398       int _total = 0;
399       Events() {}
400     } _events;
401 
402     int _count = 0; ///< # of times the loop executed.
403     int _wait  = 0; ///< # of timed wait for events
404 
405     /// Add @a that to @a this data.
406     /// This embodies the custom logic per member concerning whether each is a sum, min, or max.
407     EventMetrics &operator+=(EventMetrics const &that);
408 
409     EventMetrics() {}
410   };
411 
412   /** The number of metric blocks kept.
413       This is a circular buffer, with one block per second. We have a bit more than the required 1000
414       to provide sufficient slop for cross thread reading of the data (as only the current metric block
415       is being updated).
416   */
417   static int const N_EVENT_METRICS = 1024;
418 
419   volatile EventMetrics *current_metric = nullptr; ///< The current element of @a metrics
420   EventMetrics metrics[N_EVENT_METRICS];
421 
422   /** The various stats provided to the administrator.
423       THE ORDER IS VERY SENSITIVE.
424       More than one part of the code depends on this exact order. Be careful and thorough when changing.
425   */
426   enum STAT_ID {
427     STAT_LOOP_COUNT,      ///< # of event loops executed.
428     STAT_LOOP_EVENTS,     ///< # of events
429     STAT_LOOP_EVENTS_MIN, ///< min # of events dispatched in a loop
430     STAT_LOOP_EVENTS_MAX, ///< max # of events dispatched in a loop
431     STAT_LOOP_WAIT,       ///< # of loops that did a conditional wait.
432     STAT_LOOP_TIME_MIN,   ///< Shortest time spent in loop.
433     STAT_LOOP_TIME_MAX,   ///< Longest time spent in loop.
434     N_EVENT_STATS         ///< NOT A VALID STAT INDEX - # of different stat types.
435   };
436 
437   static char const *const STAT_NAME[N_EVENT_STATS];
438 
439   /** The number of time scales used in the event statistics.
440       Currently these are 10s, 100s, 1000s.
441   */
442   static int const N_EVENT_TIMESCALES = 3;
443   /// # of samples for each time scale.
444   static int const SAMPLE_COUNT[N_EVENT_TIMESCALES];
445 
446   /// Process the last 1000s of data and write out the summaries to @a summary.
447   void summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES]);
448   /// Back up the metric pointer, wrapping as needed.
449   EventMetrics *
450   prev(EventMetrics volatile *current)
451   {
452     return const_cast<EventMetrics *>(--current < metrics ? &metrics[N_EVENT_METRICS - 1] : current); // cast to remove volatile
453   }
454   /// Advance the metric pointer, wrapping as needed.
455   EventMetrics *
456   next(EventMetrics volatile *current)
457   {
458     return const_cast<EventMetrics *>(++current > &metrics[N_EVENT_METRICS - 1] ? metrics : current); // cast to remove volatile
459   }
460 };
461 
462 /**
463   This is used so that we dont use up operator new(size_t, void *)
464   which users might want to define for themselves.
465 
466 */
467 class ink_dummy_for_new
468 {
469 };
470 
471 inline void *
472 operator new(size_t, ink_dummy_for_new *p)
473 {
474   return (void *)p;
475 }
476 #define ETHREAD_GET_PTR(thread, offset) ((void *)((char *)(thread) + (offset)))
477 
478 extern EThread *this_ethread();
479 
480 extern int thread_max_heartbeat_mseconds;
481