1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_EventSystem.h"
25 #include <sched.h>
26 #if TS_USE_HWLOC
27 #if HAVE_ALLOCA_H
28 #include <alloca.h>
29 #endif
30 #include <hwloc.h>
31 #endif
32 #include "tscore/ink_defs.h"
33 #include "tscore/hugepages.h"
34 
35 /// Global singleton.
36 class EventProcessor eventProcessor;
37 
38 class ThreadAffinityInitializer : public Continuation
39 {
40   typedef ThreadAffinityInitializer self;
41 
42 public:
43   /// Default construct.
44   ThreadAffinityInitializer() { SET_HANDLER(&self::set_affinity); }
45   /// Load up basic affinity data.
46   void init();
47   /// Set the affinity for the current thread.
48   int set_affinity(int, Event *);
49   /// Allocate a stack.
50   /// @internal This is the external entry point and is different depending on
51   /// whether HWLOC is enabled.
52   void *alloc_stack(EThread *t, size_t stacksize);
53 
54 protected:
55   /// Allocate a hugepage stack.
56   /// If huge pages are not enable, allocate a basic stack.
57   void *alloc_hugepage_stack(size_t stacksize);
58 
59 #if TS_USE_HWLOC
60 
61   /// Allocate a stack based on NUMA information, if possible.
62   void *alloc_numa_stack(EThread *t, size_t stacksize);
63 
64 private:
65   hwloc_obj_type_t obj_type = HWLOC_OBJ_MACHINE;
66   int obj_count             = 0;
67   char const *obj_name      = nullptr;
68 #endif
69 };
70 
71 ThreadAffinityInitializer Thread_Affinity_Initializer;
72 
73 namespace
74 {
75 int
76 EventMetricStatSync(const char *, RecDataT, RecData *, RecRawStatBlock *rsb, int)
77 {
78   int id = 0;
79   EThread::EventMetrics summary[EThread::N_EVENT_TIMESCALES];
80 
81   // scan the thread local values
82   for (EThread *t : eventProcessor.active_group_threads(ET_CALL)) {
83     t->summarize_stats(summary);
84   }
85 
86   ink_mutex_acquire(&(rsb->mutex));
87 
88   for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx, id += EThread::N_EVENT_STATS) {
89     EThread::EventMetrics *m = summary + ts_idx;
90     // Discarding the atomic swaps for global writes, doesn't seem to actually do anything useful.
91     rsb->global[id + EThread::STAT_LOOP_COUNT]->sum   = m->_count;
92     rsb->global[id + EThread::STAT_LOOP_COUNT]->count = 1;
93     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_COUNT);
94 
95     rsb->global[id + EThread::STAT_LOOP_WAIT]->sum   = m->_wait;
96     rsb->global[id + EThread::STAT_LOOP_WAIT]->count = 1;
97     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_WAIT);
98 
99     rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->sum   = m->_loop_time._min;
100     rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->count = 1;
101     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MIN);
102     rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->sum   = m->_loop_time._max;
103     rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->count = 1;
104     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MAX);
105 
106     rsb->global[id + EThread::STAT_LOOP_EVENTS]->sum   = m->_events._total;
107     rsb->global[id + EThread::STAT_LOOP_EVENTS]->count = 1;
108     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS);
109     rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->sum   = m->_events._min;
110     rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->count = 1;
111     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MIN);
112     rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->sum   = m->_events._max;
113     rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->count = 1;
114     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MAX);
115   }
116 
117   ink_mutex_release(&(rsb->mutex));
118   return REC_ERR_OKAY;
119 }
120 
121 /// This is a wrapper used to convert a static function into a continuation. The function pointer is
122 /// passed in the cookie. For this reason the class is used as a singleton.
123 /// @internal This is the implementation for @c schedule_spawn... overloads.
124 class ThreadInitByFunc : public Continuation
125 {
126 public:
127   ThreadInitByFunc() { SET_HANDLER(&ThreadInitByFunc::invoke); }
128   int
129   invoke(int, Event *ev)
130   {
131     void (*f)(EThread *) = reinterpret_cast<void (*)(EThread *)>(ev->cookie);
132     f(ev->ethread);
133     return 0;
134   }
135 } Thread_Init_Func;
136 } // namespace
137 
138 void *
139 ThreadAffinityInitializer::alloc_hugepage_stack(size_t stacksize)
140 {
141   return ats_hugepage_enabled() ? ats_alloc_hugepage(stacksize) : ats_memalign(ats_pagesize(), stacksize);
142 }
143 
144 #if TS_USE_HWLOC
145 void
146 ThreadAffinityInitializer::init()
147 {
148   int affinity = 1;
149   REC_ReadConfigInteger(affinity, "proxy.config.exec_thread.affinity");
150 
151   switch (affinity) {
152   case 4: // assign threads to logical processing units
153 // Older versions of libhwloc (eg. Ubuntu 10.04) don't have HWLOC_OBJ_PU.
154 #if HAVE_HWLOC_OBJ_PU
155     obj_type = HWLOC_OBJ_PU;
156     obj_name = "Logical Processor";
157     break;
158 #endif
159 
160   case 3: // assign threads to real cores
161     obj_type = HWLOC_OBJ_CORE;
162     obj_name = "Core";
163     break;
164 
165   case 1: // assign threads to NUMA nodes (often 1:1 with sockets)
166     obj_type = HWLOC_OBJ_NODE;
167     obj_name = "NUMA Node";
168     if (hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type) > 0) {
169       break;
170     }
171     // fallthrough
172 
173   case 2: // assign threads to sockets
174     obj_type = HWLOC_OBJ_SOCKET;
175     obj_name = "Socket";
176     break;
177   default: // assign threads to the machine as a whole (a level below SYSTEM)
178     obj_type = HWLOC_OBJ_MACHINE;
179     obj_name = "Machine";
180   }
181 
182   obj_count = hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type);
183   Debug("iocore_thread", "Affinity: %d %ss: %d PU: %d", affinity, obj_name, obj_count, ink_number_of_processors());
184 }
185 
186 int
187 ThreadAffinityInitializer::set_affinity(int, Event *)
188 {
189   EThread *t = this_ethread();
190 
191   if (obj_count > 0) {
192     // Get our `obj` instance with index based on the thread number we are on.
193     hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
194 #if HWLOC_API_VERSION >= 0x00010100
195     int cpu_mask_len = hwloc_bitmap_snprintf(nullptr, 0, obj->cpuset) + 1;
196     char *cpu_mask   = (char *)alloca(cpu_mask_len);
197     hwloc_bitmap_snprintf(cpu_mask, cpu_mask_len, obj->cpuset);
198     Debug("iocore_thread", "EThread: %p %s: %d CPU Mask: %s\n", t, obj_name, obj->logical_index, cpu_mask);
199 #else
200     Debug("iocore_thread", "EThread: %d %s: %d", _name, obj->logical_index);
201 #endif // HWLOC_API_VERSION
202     hwloc_set_thread_cpubind(ink_get_topology(), t->tid, obj->cpuset, HWLOC_CPUBIND_STRICT);
203   } else {
204     Warning("hwloc returned an unexpected number of objects -- CPU affinity disabled");
205   }
206   return 0;
207 }
208 
209 void *
210 ThreadAffinityInitializer::alloc_numa_stack(EThread *t, size_t stacksize)
211 {
212   hwloc_membind_policy_t mem_policy = HWLOC_MEMBIND_DEFAULT;
213   hwloc_nodeset_t nodeset           = hwloc_bitmap_alloc();
214   int num_nodes                     = 0;
215   void *stack                       = nullptr;
216   hwloc_obj_t obj                   = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
217 
218   // Find the NUMA node set that correlates to our next thread CPU set
219   hwloc_cpuset_to_nodeset(ink_get_topology(), obj->cpuset, nodeset);
220   // How many NUMA nodes will we be needing to allocate across?
221   num_nodes = hwloc_get_nbobjs_inside_cpuset_by_type(ink_get_topology(), obj->cpuset, HWLOC_OBJ_NODE);
222 
223   if (num_nodes == 1) {
224     // The preferred memory policy. The thread lives in one NUMA node.
225     mem_policy = HWLOC_MEMBIND_BIND;
226   } else if (num_nodes > 1) {
227     // If we have mode than one NUMA node we should interleave over them.
228     mem_policy = HWLOC_MEMBIND_INTERLEAVE;
229   }
230 
231   if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
232     // Let's temporarily set the memory binding to our destination NUMA node
233     hwloc_set_membind_nodeset(ink_get_topology(), nodeset, mem_policy, HWLOC_MEMBIND_THREAD);
234   }
235 
236   // Alloc our stack
237   stack = this->alloc_hugepage_stack(stacksize);
238 
239   if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
240     // Now let's set it back to default for this thread.
241     hwloc_set_membind_nodeset(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_DEFAULT,
242                               HWLOC_MEMBIND_THREAD);
243   }
244 
245   hwloc_bitmap_free(nodeset);
246 
247   return stack;
248 }
249 
250 void *
251 ThreadAffinityInitializer::alloc_stack(EThread *t, size_t stacksize)
252 {
253   return this->obj_count > 0 ? this->alloc_numa_stack(t, stacksize) : this->alloc_hugepage_stack(stacksize);
254 }
255 
256 #else
257 
258 void
259 ThreadAffinityInitializer::init()
260 {
261 }
262 
263 int
264 ThreadAffinityInitializer::set_affinity(int, Event *)
265 {
266   return 0;
267 }
268 
269 void *
270 ThreadAffinityInitializer::alloc_stack(EThread *, size_t stacksize)
271 {
272   return this->alloc_hugepage_stack(stacksize);
273 }
274 
275 #endif // TS_USE_HWLOC
276 
277 EventProcessor::EventProcessor() : thread_initializer(this)
278 {
279   ink_zero(all_ethreads);
280   ink_zero(all_dthreads);
281   ink_mutex_init(&dedicated_thread_spawn_mutex);
282   // Because ET_NET is compile time set to 0 it *must* be the first type registered.
283   this->register_event_type("ET_NET");
284 }
285 
286 EventProcessor::~EventProcessor()
287 {
288   ink_mutex_destroy(&dedicated_thread_spawn_mutex);
289 }
290 
291 namespace
292 {
293 Event *
294 make_event_for_scheduling(Continuation *c, int event_code, void *cookie)
295 {
296   Event *e = eventAllocator.alloc();
297 
298   e->init(c);
299   e->mutex          = c->mutex;
300   e->callback_event = event_code;
301   e->cookie         = cookie;
302 
303   return e;
304 }
305 } // namespace
306 
307 Event *
308 EventProcessor::schedule_spawn(Continuation *c, EventType ev_type, int event_code, void *cookie)
309 {
310   Event *e = make_event_for_scheduling(c, event_code, cookie);
311   ink_assert(ev_type < MAX_EVENT_TYPES);
312   thread_group[ev_type]._spawnQueue.enqueue(e);
313   return e;
314 }
315 
316 Event *
317 EventProcessor::schedule_spawn(void (*f)(EThread *), EventType ev_type)
318 {
319   Event *e = make_event_for_scheduling(&Thread_Init_Func, EVENT_IMMEDIATE, reinterpret_cast<void *>(f));
320   ink_assert(ev_type < MAX_EVENT_TYPES);
321   thread_group[ev_type]._spawnQueue.enqueue(e);
322   return e;
323 }
324 
325 EventType
326 EventProcessor::register_event_type(char const *name)
327 {
328   ThreadGroupDescriptor *tg = &(thread_group[n_thread_groups++]);
329   ink_release_assert(n_thread_groups <= MAX_EVENT_TYPES); // check for overflow
330 
331   tg->_name = name;
332   return n_thread_groups - 1;
333 }
334 
335 EventType
336 EventProcessor::spawn_event_threads(char const *name, int n_threads, size_t stacksize)
337 {
338   int ev_type = this->register_event_type(name);
339   this->spawn_event_threads(ev_type, n_threads, stacksize);
340   return ev_type;
341 }
342 
343 EventType
344 EventProcessor::spawn_event_threads(EventType ev_type, int n_threads, size_t stacksize)
345 {
346   char thr_name[MAX_THREAD_NAME_LENGTH];
347   int i;
348   ThreadGroupDescriptor *tg = &(thread_group[ev_type]);
349 
350   ink_release_assert(n_threads > 0);
351   ink_release_assert((n_ethreads + n_threads) <= MAX_EVENT_THREADS);
352   ink_release_assert(ev_type < MAX_EVENT_TYPES);
353 
354   stacksize = std::max(stacksize, static_cast<decltype(stacksize)>(INK_THREAD_STACK_MIN));
355   // Make sure it is a multiple of our page size
356   if (ats_hugepage_enabled()) {
357     stacksize = INK_ALIGN(stacksize, ats_hugepage_size());
358   } else {
359     stacksize = INK_ALIGN(stacksize, ats_pagesize());
360   }
361 
362   Debug("iocore_thread", "Thread stack size set to %zu", stacksize);
363 
364   for (i = 0; i < n_threads; ++i) {
365     EThread *t                   = new EThread(REGULAR, n_ethreads + i);
366     all_ethreads[n_ethreads + i] = t;
367     tg->_thread[i]               = t;
368     t->id                        = i; // unfortunately needed to support affinity and NUMA logic.
369     t->set_event_type(ev_type);
370     t->schedule_spawn(&thread_initializer);
371   }
372   tg->_count = n_threads;
373   n_ethreads += n_threads;
374 
375   // Separate loop to avoid race conditions between spawn events and updating the thread table for
376   // the group. Some thread set up depends on knowing the total number of threads but that can't be
377   // safely updated until all the EThread instances are created and stored in the table.
378   for (i = 0; i < n_threads; ++i) {
379     Debug("iocore_thread_start", "Created %s thread #%d", tg->_name.c_str(), i + 1);
380     snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[%s %d]", tg->_name.c_str(), i);
381     void *stack = Thread_Affinity_Initializer.alloc_stack(tg->_thread[i], stacksize);
382     tg->_thread[i]->start(thr_name, stack, stacksize);
383   }
384 
385   Debug("iocore_thread", "Created thread group '%s' id %d with %d threads", tg->_name.c_str(), ev_type, n_threads);
386 
387   return ev_type; // useless but not sure what would be better.
388 }
389 
390 // This is called from inside a thread as the @a start_event for that thread.  It chains to the
391 // startup events for the appropriate thread group start events.
392 void
393 EventProcessor::initThreadState(EThread *t)
394 {
395   // Run all thread type initialization continuations that match the event types for this thread.
396   for (int i = 0; i < MAX_EVENT_TYPES; ++i) {
397     if (t->is_event_type(i)) { // that event type done here, roll thread start events of that type.
398       if (++thread_group[i]._started == thread_group[i]._count && thread_group[i]._afterStartCallback != nullptr) {
399         thread_group[i]._afterStartCallback();
400       }
401       // To avoid race conditions on the event in the spawn queue, create a local one to actually send.
402       // Use the spawn queue event as a read only model.
403       Event *nev = eventAllocator.alloc();
404       for (Event *ev = thread_group[i]._spawnQueue.head; nullptr != ev; ev = ev->link.next) {
405         nev->init(ev->continuation, 0, 0);
406         nev->ethread        = t;
407         nev->callback_event = ev->callback_event;
408         nev->mutex          = ev->continuation->mutex;
409         nev->cookie         = ev->cookie;
410         ev->continuation->handleEvent(ev->callback_event, nev);
411       }
412       nev->free();
413     }
414   }
415 }
416 
417 int
418 EventProcessor::start(int n_event_threads, size_t stacksize)
419 {
420   // do some sanity checking.
421   static bool started = false;
422   ink_release_assert(!started);
423   ink_release_assert(n_event_threads > 0 && n_event_threads <= MAX_EVENT_THREADS);
424   started = true;
425 
426   Thread_Affinity_Initializer.init();
427   // Least ugly thing - this needs to be the first callback from the thread but by the time this
428   // method is called other spawn callbacks have been registered. This forces thread affinity
429   // first. The other alternative would be to require a call to an @c init method which I like even
430   // less because this cannot be done in the constructor - that depends on too much other
431   // infrastructure being in place (e.g. the proxy allocators).
432   thread_group[ET_CALL]._spawnQueue.push(make_event_for_scheduling(&Thread_Affinity_Initializer, EVENT_IMMEDIATE, nullptr));
433 
434   // Get our statistics set up
435   RecRawStatBlock *rsb = RecAllocateRawStatBlock(EThread::N_EVENT_STATS * EThread::N_EVENT_TIMESCALES);
436   char name[256];
437 
438   for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx) {
439     for (int id = 0; id < EThread::N_EVENT_STATS; ++id) {
440       snprintf(name, sizeof(name), "%s.%ds", EThread::STAT_NAME[id], EThread::SAMPLE_COUNT[ts_idx]);
441       RecRegisterRawStat(rsb, RECT_PROCESS, name, RECD_INT, RECP_NON_PERSISTENT, id + (ts_idx * EThread::N_EVENT_STATS), NULL);
442     }
443   }
444 
445   // Name must be that of a stat, pick one at random since we do all of them in one pass/callback.
446   RecRegisterRawStatSyncCb(name, EventMetricStatSync, rsb, 0);
447 
448   this->spawn_event_threads(ET_CALL, n_event_threads, stacksize);
449 
450   Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);
451   return 0;
452 }
453 
454 void
455 EventProcessor::shutdown()
456 {
457 }
458 
459 Event *
460 EventProcessor::spawn_thread(Continuation *cont, const char *thr_name, size_t stacksize)
461 {
462   /* Spawning threads in a live system - There are two potential race conditions in this logic. The
463      first is multiple calls to this method.  In that case @a all_dthreads can end up in a bad state
464      as the same entry is overwritten while another is left uninitialized.
465 
466      The other is read/write contention where another thread (e.g. the stats collection thread) is
467      iterating over the threads while the active count (@a n_dthreads) is being updated causing use
468      of a not yet initialized array element.
469 
470      This logic covers both situations. For write/write the actual array update is locked. The
471      potentially expensive set up is done outside the lock making the time spent locked small. For
472      read/write it suffices to do the active count increment after initializing the array
473      element. It's not a problem if, for one cycle, a new thread is skipped.
474   */
475 
476   // Do as much as possible outside the lock. Until the array element and count is changed
477   // this is thread safe.
478   Event *e = eventAllocator.alloc();
479   e->init(cont, 0, 0);
480   e->ethread  = new EThread(DEDICATED, e);
481   e->mutex    = e->ethread->mutex;
482   cont->mutex = e->ethread->mutex;
483   {
484     ink_scoped_mutex_lock lock(dedicated_thread_spawn_mutex);
485     ink_release_assert(n_dthreads < MAX_EVENT_THREADS);
486     all_dthreads[n_dthreads] = e->ethread;
487     ++n_dthreads; // Be very sure this is after the array element update.
488   }
489 
490   e->ethread->start(thr_name, nullptr, stacksize);
491 
492   return e;
493 }
494