1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #pragma once
25 
26 #include "tscore/ink_platform.h"
27 #include "I_Continuation.h"
28 #include "I_Processor.h"
29 #include "I_Event.h"
30 #include <atomic>
31 
32 #ifdef TS_MAX_THREADS_IN_EACH_THREAD_TYPE
33 constexpr int MAX_THREADS_IN_EACH_TYPE = TS_MAX_THREADS_IN_EACH_THREAD_TYPE;
34 #else
35 constexpr int MAX_THREADS_IN_EACH_TYPE = 3072;
36 #endif
37 
38 #ifdef TS_MAX_NUMBER_EVENT_THREADS
39 constexpr int MAX_EVENT_THREADS = TS_MAX_NUMBER_EVENT_THREADS;
40 #else
41 constexpr int MAX_EVENT_THREADS        = 4096;
42 #endif
43 
44 class EThread;
45 
46 /**
47   Main processor for the Event System. The EventProcessor is the core
48   component of the Event System. Once started, it is responsible for
49   creating and managing groups of threads that execute user-defined
50   tasks asynchronously at a given time or periodically.
51 
52   The EventProcessor provides a set of scheduling functions through
53   which you can specify continuations to be called back by one of its
54   threads. These function calls do not block. Instead they return an
55   Event object and schedule the callback to the continuation passed in at
56   a later or specific time, as soon as possible or at certain intervals.
57 
58   Singleton model:
59 
60   Every executable that imports and statically links against the
61   EventSystem library is provided with a global instance of the
62   EventProcessor called eventProcessor. Therefore, it is not necessary to
63   create instances of the EventProcessor class because it was designed
64   as a singleton. It is important to note that none of its functions
65   are reentrant.
66 
67   Thread Groups (Event types):
68 
69   When the EventProcessor is started, the first group of threads is spawned and it is assigned the
70   special id ET_CALL. Depending on the complexity of the state machine or protocol, you may be
71   interested in creating additional threads and the EventProcessor gives you the ability to create a
72   single thread or an entire group of threads. In the former case, you call spawn_thread and the
73   thread is independent of the thread groups and it exists as long as your continuation handle
74   executes and there are events to process. In the latter, you call @c registerEventType to get an
75   event type and then @c spawn_event_theads which creates the threads in the group of that
76   type. Such threads require events to be scheduled on a specific thread in the group or for the
77   group in general using the event type. Note that between these two calls @c
78   EThread::schedule_spawn can be used to set up per thread initialization.
79 
80   Callback event codes:
81 
82   @b UNIX: For all of the scheduling functions, the callback_event
83   parameter is not used. On a callback, the event code passed in to
84   the continuation handler is always EVENT_IMMEDIATE.
85 
86   @b NT: The value of the event code passed in to the continuation
87   handler is the value provided in the callback_event parameter.
88 
89   Event allocation policy:
90 
91   Events are allocated and deallocated by the EventProcessor. A state
92   machine may access the returned, non-recurring event until it is
93   cancelled or the callback from the event is complete. For recurring
94   events, the Event may be accessed until it is cancelled. Once the event
95   is complete or cancelled, it's the eventProcessor's responsibility to
96   deallocate it.
97 
98 */
99 class EventProcessor : public Processor
100 {
101 public:
102   /** Register an event type with @a name.
103 
104       This must be called to get an event type to pass to @c spawn_event_threads
105       @see spawn_event_threads
106    */
107   EventType register_event_type(char const *name);
108 
109   /**
110     Spawn an additional thread for calling back the continuation. Spawns
111     a dedicated thread (EThread) that calls back the continuation passed
112     in as soon as possible.
113 
114     @param cont continuation that the spawn thread will call back
115       immediately.
116     @return event object representing the start of the thread.
117 
118   */
119   Event *spawn_thread(Continuation *cont, const char *thr_name, size_t stacksize = 0);
120 
121   /** Spawn a group of @a n_threads event dispatching threads.
122 
123       The threads run an event loop which dispatches events scheduled for a specific thread or the event type.
124 
125       @return EventType or thread id for the new group of threads (@a ev_type)
126 
127   */
128   EventType spawn_event_threads(EventType ev_type, int n_threads, size_t stacksize = DEFAULT_STACKSIZE);
129 
130   /// Convenience overload.
131   /// This registers @a name as an event type using @c registerEventType and then calls the real @c spawn_event_threads
132   EventType spawn_event_threads(const char *name, int n_thread, size_t stacksize = DEFAULT_STACKSIZE);
133 
134   /**
135     Schedules the continuation on a specific EThread to receive an event
136     at the given timeout.  Requests the EventProcessor to schedule
137     the callback to the continuation 'c' at the time specified in
138     'atimeout_at'. The event is assigned to the specified EThread.
139 
140     @param c Continuation to be called back at the time specified in
141       'atimeout_at'.
142     @param atimeout_at time value at which to callback.
143     @param ethread EThread on which to schedule the event.
144     @param callback_event code to be passed back to the continuation's
145       handler. See the Remarks section.
146     @param cookie user-defined value or pointer to be passed back in
147       the Event's object cookie field.
148     @return reference to an Event object representing the scheduling
149       of this callback.
150 
151   */
152   Event *schedule_imm(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
153                       void *cookie = nullptr);
154   /*
155     provides the same functionality as schedule_imm and also signals the thread immediately
156   */
157   Event *schedule_imm_signal(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
158                              void *cookie = nullptr);
159   /**
160     Schedules the continuation on a specific thread group to receive an
161     event at the given timeout. Requests the EventProcessor to schedule
162     the callback to the continuation 'c' at the time specified in
163     'atimeout_at'. The callback is handled by a thread in the specified
164     thread group (event_type).
165 
166     @param c Continuation to be called back at the time specified in
167       'atimeout_at'.
168     @param atimeout_at Time value at which to callback.
169     @param event_type thread group id (or event type) specifying the
170       group of threads on which to schedule the callback.
171     @param callback_event code to be passed back to the continuation's
172       handler. See the Remarks section.
173     @param cookie user-defined value or pointer to be passed back in
174       the Event's object cookie field.
175     @return reference to an Event object representing the scheduling of
176       this callback.
177 
178   */
179   Event *schedule_at(Continuation *c, ink_hrtime atimeout_at, EventType event_type = ET_CALL, int callback_event = EVENT_INTERVAL,
180                      void *cookie = nullptr);
181 
182   /**
183     Schedules the continuation on a specific thread group to receive an
184     event after the specified timeout elapses. Requests the EventProcessor
185     to schedule the callback to the continuation 'c' after the time
186     specified in 'atimeout_in' elapses. The callback is handled by a
187     thread in the specified thread group (event_type).
188 
189     @param c Continuation to call back aftert the timeout elapses.
190     @param atimeout_in amount of time after which to callback.
191     @param event_type Thread group id (or event type) specifying the
192       group of threads on which to schedule the callback.
193     @param callback_event code to be passed back to the continuation's
194       handler. See the Remarks section.
195     @param cookie user-defined value or pointer to be passed back in
196       the Event's object cookie field.
197     @return reference to an Event object representing the scheduling of
198       this callback.
199 
200   */
201   Event *schedule_in(Continuation *c, ink_hrtime atimeout_in, EventType event_type = ET_CALL, int callback_event = EVENT_INTERVAL,
202                      void *cookie = nullptr);
203 
204   /**
205     Schedules the continuation on a specific thread group to receive
206     an event periodically. Requests the EventProcessor to schedule the
207     callback to the continuation 'c' every time 'aperiod' elapses. The
208     callback is handled by a thread in the specified thread group
209     (event_type).
210 
211     @param c Continuation to call back every time 'aperiod' elapses.
212     @param aperiod duration of the time period between callbacks.
213     @param event_type thread group id (or event type) specifying the
214       group of threads on which to schedule the callback.
215     @param callback_event code to be passed back to the continuation's
216       handler. See the Remarks section.
217     @param cookie user-defined value or pointer to be passed back in
218       the Event's object cookie field.
219     @return reference to an Event object representing the scheduling of
220       this callback.
221 
222   */
223   Event *schedule_every(Continuation *c, ink_hrtime aperiod, EventType event_type = ET_CALL, int callback_event = EVENT_INTERVAL,
224                         void *cookie = nullptr);
225 
226   ////////////////////////////////////////////
227   // reschedule an already scheduled event. //
228   // may be called directly or called by    //
229   // schedule_xxx Event member functions.   //
230   // The returned value may be different    //
231   // from the argument e.                   //
232   ////////////////////////////////////////////
233 
234   Event *reschedule_imm(Event *e, int callback_event = EVENT_IMMEDIATE);
235   Event *reschedule_at(Event *e, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL);
236   Event *reschedule_in(Event *e, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL);
237   Event *reschedule_every(Event *e, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL);
238 
239   /// Schedule an @a event on continuation @a c when a thread of type @a ev_type is spawned.
240   /// The @a cookie is attached to the event instance passed to the continuation.
241   /// @return The scheduled event.
242   Event *schedule_spawn(Continuation *c, EventType ev_type, int event = EVENT_IMMEDIATE, void *cookie = nullptr);
243 
244   /// Schedule the function @a f to be called in a thread of type @a ev_type when it is spawned.
245   Event *schedule_spawn(void (*f)(EThread *), EventType ev_type);
246 
247   /// Schedule an @a event on continuation @a c to be called when a thread is spawned by this processor.
248   /// The @a cookie is attached to the event instance passed to the continuation.
249   /// @return The scheduled event.
250   //  Event *schedule_spawn(Continuation *c, int event, void *cookie = NULL);
251 
252   EventProcessor();
253   ~EventProcessor() override;
254   EventProcessor(const EventProcessor &) = delete;
255   EventProcessor &operator=(const EventProcessor &) = delete;
256 
257   /**
258     Initializes the EventProcessor and its associated threads. Spawns the
259     specified number of threads, initializes their state information and
260     sets them running. It creates the initial thread group, represented
261     by the event type ET_CALL.
262 
263     @return 0 if successful, and a negative value otherwise.
264 
265   */
266   int start(int n_net_threads, size_t stacksize = DEFAULT_STACKSIZE) override;
267 
268   /**
269     Stop the EventProcessor. Attempts to stop the EventProcessor and
270     all of the threads in each of the thread groups.
271 
272   */
273   void shutdown() override;
274 
275   /**
276     Allocates size bytes on the event threads. This function is thread
277     safe.
278 
279     @param size bytes to be allocated.
280 
281   */
282   off_t allocate(int size);
283 
284   /**
285     An array of pointers to all of the EThreads handled by the
286     EventProcessor. An array of pointers to all of the EThreads created
287     throughout the existence of the EventProcessor instance.
288 
289   */
290   EThread *all_ethreads[MAX_EVENT_THREADS];
291 
292   /**
293     An array of pointers, organized by thread group, to all of the
294     EThreads handled by the EventProcessor. An array of pointers to all of
295     the EThreads created throughout the existence of the EventProcessor
296     instance. It is a two-dimensional array whose first dimension is the
297     thread group id and the second the EThread pointers for that group.
298 
299   */
300   //  EThread *eventthread[MAX_EVENT_TYPES][MAX_THREADS_IN_EACH_TYPE];
301 
302   /// Data kept for each thread group.
303   /// The thread group ID is the index into an array of these and so is not stored explicitly.
304   struct ThreadGroupDescriptor {
305     std::string _name;                               ///< Name for the thread group.
306     int _count                 = 0;                  ///< # of threads of this type.
307     std::atomic<int> _started  = 0;                  ///< # of started threads of this type.
308     uint64_t _next_round_robin = 0;                  ///< Index of thread to use for events assigned to this group.
309     Que(Event, link) _spawnQueue;                    ///< Events to dispatch when thread is spawned.
310     EThread *_thread[MAX_THREADS_IN_EACH_TYPE] = {}; ///< The actual threads in this group.
311     std::function<void()> _afterStartCallback  = nullptr;
312   };
313 
314   /// Storage for per group data.
315   ThreadGroupDescriptor thread_group[MAX_EVENT_TYPES];
316 
317   /// Number of defined thread groups.
318   int n_thread_groups = 0;
319 
320   /**
321     Total number of threads controlled by this EventProcessor.  This is
322     the count of all the EThreads spawn by this EventProcessor, excluding
323     those created by spawn_thread
324 
325   */
326   int n_ethreads = 0;
327 
328   /*------------------------------------------------------*\
329   | Unix & non NT Interface                                |
330   \*------------------------------------------------------*/
331 
332   Event *schedule(Event *e, EventType etype, bool fast_signal = false);
333   EThread *assign_thread(EventType etype);
334   EThread *assign_affinity_by_type(Continuation *cont, EventType etype);
335 
336   EThread *all_dthreads[MAX_EVENT_THREADS];
337   int n_dthreads       = 0; // No. of dedicated threads
338   int thread_data_used = 0;
339 
340   /// Provide container style access to just the active threads, not the entire array.
341   class active_threads_type
342   {
343     using iterator = EThread *const *; ///< Internal iterator type, pointer to array element.
344   public:
345     iterator
346     begin() const
347     {
348       return _begin;
349     }
350 
351     iterator
352     end() const
353     {
354       return _end;
355     }
356 
357   private:
358     iterator _begin; ///< Start of threads.
359     iterator _end;   ///< End of threads.
360     /// Construct from base of the array (@a start) and the current valid count (@a n).
361     active_threads_type(iterator start, int n) : _begin(start), _end(start + n) {}
362     friend class EventProcessor;
363   };
364 
365   // These can be used in container for loops and other range operations.
366   active_threads_type
367   active_ethreads() const
368   {
369     return {all_ethreads, n_ethreads};
370   }
371 
372   active_threads_type
373   active_dthreads() const
374   {
375     return {all_dthreads, n_dthreads};
376   }
377 
378   active_threads_type
379   active_group_threads(int type) const
380   {
381     ThreadGroupDescriptor const &group{thread_group[type]};
382     return {group._thread, group._count};
383   }
384 
385 private:
386   void initThreadState(EThread *);
387 
388   /// Used to generate a callback at the start of thread execution.
389   class ThreadInit : public Continuation
390   {
391     typedef ThreadInit self;
392     EventProcessor *_evp;
393 
394   public:
395     explicit ThreadInit(EventProcessor *evp) : _evp(evp) { SET_HANDLER(&self::init); }
396 
397     int
398     init(int /* event ATS_UNUSED */, Event *ev)
399     {
400       _evp->initThreadState(ev->ethread);
401       return 0;
402     }
403   };
404   friend class ThreadInit;
405   ThreadInit thread_initializer;
406 
407   // Lock write access to the dedicated thread vector.
408   // @internal Not a @c ProxyMutex - that's a whole can of problems due to initialization ordering.
409   ink_mutex dedicated_thread_spawn_mutex;
410 };
411 
412 extern inkcoreapi class EventProcessor eventProcessor;
413