1 /** @file
2 
3   FIFO queue
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22 
23   @section details Details
24 
25   ProtectedQueue implements a FIFO queue with the following functionality:
26     -# Multiple threads could be simultaneously trying to enqueue and
27       dequeue. Hence the queue needs to be protected with mutex.
28     -# In case the queue is empty, dequeue() sleeps for a specified amount
29       of time, or until a new element is inserted, whichever is earlier.
30 
31 */
32 
33 #include "P_EventSystem.h"
34 
35 // The protected queue is designed to delay signaling of threads
36 // until some amount of work has been completed on the current thread
37 // in order to prevent excess context switches.
38 //
39 // Defining EAGER_SIGNALLING disables this behavior and causes
40 // threads to be made runnable immediately.
41 //
42 // #define EAGER_SIGNALLING
43 
44 extern ClassAllocator<Event> eventAllocator;
45 
46 void
47 ProtectedQueue::enqueue(Event *e, bool fast_signal)
48 {
49   ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
50   EThread *e_ethread   = e->ethread;
51   e->in_the_prot_queue = 1;
52   bool was_empty       = (ink_atomiclist_push(&al, e) == nullptr);
53 
54   if (was_empty) {
55     EThread *inserting_thread = this_ethread();
56     // queue e->ethread in the list of threads to be signalled
57     // inserting_thread == 0 means it is not a regular EThread
58     if (inserting_thread != e_ethread) {
59       e_ethread->tail_cb->signalActivity();
60     }
61   }
62 }
63 
64 void
65 flush_signals(EThread *thr)
66 {
67   ink_assert(this_ethread() == thr);
68   int n = thr->n_ethreads_to_be_signalled;
69   if (n > eventProcessor.n_ethreads) {
70     n = eventProcessor.n_ethreads; // MAX
71   }
72   int i;
73 
74   for (i = 0; i < n; i++) {
75     if (thr->ethreads_to_be_signalled[i]) {
76       thr->ethreads_to_be_signalled[i]->tail_cb->signalActivity();
77       thr->ethreads_to_be_signalled[i] = nullptr;
78     }
79   }
80   thr->n_ethreads_to_be_signalled = 0;
81 }
82 
83 void
84 ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
85 {
86   (void)cur_time;
87   if (sleep) {
88     this->wait(timeout);
89   }
90   this->dequeue_external();
91 }
92 
93 void
94 ProtectedQueue::dequeue_external()
95 {
96   Event *e = static_cast<Event *>(ink_atomiclist_popall(&al));
97   // invert the list, to preserve order
98   SLL<Event, Event::Link_link> l, t;
99   t.head = e;
100   while ((e = t.pop())) {
101     l.push(e);
102   }
103   // insert into localQueue
104   while ((e = l.pop())) {
105     if (!e->cancelled) {
106       localQueue.enqueue(e);
107     } else {
108       e->mutex = nullptr;
109       eventAllocator.free(e);
110     }
111   }
112 }
113 
114 void
115 ProtectedQueue::wait(ink_hrtime timeout)
116 {
117   /* If there are no external events available, will do a cond_timedwait.
118    *
119    *   - The `EThread::lock` will be released,
120    *   - And then the Event Thread goes to sleep and waits for the wakeup signal of `EThread::might_have_data`,
121    *   - The `EThread::lock` will be locked again when the Event Thread wakes up.
122    */
123   if (INK_ATOMICLIST_EMPTY(al)) {
124     timespec ts = ink_hrtime_to_timespec(timeout);
125     ink_cond_timedwait(&might_have_data, &lock, &ts);
126   }
127 }
128