1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26   P_UnixEThread.h
27 
28 
29 
30 *****************************************************************************/
31 #pragma once
32 
33 #include "I_EThread.h"
34 #include "I_EventProcessor.h"
35 
36 const int DELAY_FOR_RETRY = HRTIME_MSECONDS(10);
37 
38 TS_INLINE Event *
39 EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
40 {
41   Event *e          = ::eventAllocator.alloc();
42   e->callback_event = callback_event;
43   e->cookie         = cookie;
44   return schedule(e->init(cont, 0, 0));
45 }
46 
47 TS_INLINE Event *
48 EThread::schedule_imm_signal(Continuation *cont, int callback_event, void *cookie)
49 {
50   Event *e          = ::eventAllocator.alloc();
51   e->callback_event = callback_event;
52   e->cookie         = cookie;
53   return schedule(e->init(cont, 0, 0), true);
54 }
55 
56 TS_INLINE Event *
57 EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
58 {
59   Event *e          = ::eventAllocator.alloc();
60   e->callback_event = callback_event;
61   e->cookie         = cookie;
62   return schedule(e->init(cont, t, 0));
63 }
64 
65 TS_INLINE Event *
66 EThread::schedule_in(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
67 {
68   Event *e          = ::eventAllocator.alloc();
69   e->callback_event = callback_event;
70   e->cookie         = cookie;
71   return schedule(e->init(cont, get_hrtime() + t, 0));
72 }
73 
74 TS_INLINE Event *
75 EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
76 {
77   Event *e          = ::eventAllocator.alloc();
78   e->callback_event = callback_event;
79   e->cookie         = cookie;
80   if (t < 0) {
81     return schedule(e->init(cont, t, t));
82   } else {
83     return schedule(e->init(cont, get_hrtime() + t, t));
84   }
85 }
86 
87 TS_INLINE Event *
88 EThread::schedule(Event *e, bool fast_signal)
89 {
90   e->ethread = this;
91   ink_assert(tt == REGULAR);
92   if (e->continuation->mutex) {
93     e->mutex = e->continuation->mutex;
94   } else {
95     e->mutex = e->continuation->mutex = e->ethread->mutex;
96   }
97   ink_assert(e->mutex.get());
98 
99   // Make sure client IP debugging works consistently
100   // The continuation that gets scheduled later is not always the
101   // client VC, it can be HttpCacheSM etc. so save the flags
102   e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
103   EventQueueExternal.enqueue(e, fast_signal);
104   return e;
105 }
106 
107 TS_INLINE Event *
108 EThread::schedule_imm_local(Continuation *cont, int callback_event, void *cookie)
109 {
110   Event *e          = EVENT_ALLOC(eventAllocator, this);
111   e->callback_event = callback_event;
112   e->cookie         = cookie;
113   return schedule_local(e->init(cont, 0, 0));
114 }
115 
116 TS_INLINE Event *
117 EThread::schedule_at_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
118 {
119   Event *e          = EVENT_ALLOC(eventAllocator, this);
120   e->callback_event = callback_event;
121   e->cookie         = cookie;
122   return schedule_local(e->init(cont, t, 0));
123 }
124 
125 TS_INLINE Event *
126 EThread::schedule_in_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
127 {
128   Event *e          = EVENT_ALLOC(eventAllocator, this);
129   e->callback_event = callback_event;
130   e->cookie         = cookie;
131   return schedule_local(e->init(cont, get_hrtime() + t, 0));
132 }
133 
134 TS_INLINE Event *
135 EThread::schedule_every_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
136 {
137   Event *e          = EVENT_ALLOC(eventAllocator, this);
138   e->callback_event = callback_event;
139   e->cookie         = cookie;
140   if (t < 0) {
141     return schedule(e->init(cont, t, t));
142   } else {
143     return schedule(e->init(cont, get_hrtime() + t, t));
144   }
145 }
146 
147 TS_INLINE Event *
148 EThread::schedule_local(Event *e)
149 {
150   if (tt != REGULAR) {
151     ink_assert(tt == DEDICATED);
152     return eventProcessor.schedule(e, ET_CALL);
153   }
154   if (!e->mutex) {
155     e->ethread = this;
156     e->mutex   = e->continuation->mutex;
157   } else {
158     ink_assert(e->ethread == this);
159   }
160   e->globally_allocated = false;
161 
162   // Make sure client IP debugging works consistently
163   // The continuation that gets scheduled later is not always the
164   // client VC, it can be HttpCacheSM etc. so save the flags
165   e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
166   EventQueueExternal.enqueue_local(e);
167   return e;
168 }
169 
170 TS_INLINE Event *
171 EThread::schedule_spawn(Continuation *c, int ev, void *cookie)
172 {
173   ink_assert(this != this_ethread()); // really broken to call this from the same thread.
174   if (start_event) {
175     free_event(start_event);
176   }
177   start_event          = EVENT_ALLOC(eventAllocator, this);
178   start_event->ethread = this;
179   start_event->mutex   = this->mutex;
180   start_event->init(c);
181   start_event->callback_event = ev;
182   start_event->cookie         = cookie;
183   return start_event;
184 }
185 
186 TS_INLINE EThread *
187 this_ethread()
188 {
189   return dynamic_cast<EThread *>(this_thread());
190 }
191 
192 TS_INLINE EThread *
193 this_event_thread()
194 {
195   EThread *ethread = this_ethread();
196   if (ethread != nullptr && ethread->tt == REGULAR) {
197     return ethread;
198   } else {
199     return nullptr;
200   }
201 }
202 
203 TS_INLINE void
204 EThread::free_event(Event *e)
205 {
206   ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
207   e->mutex = nullptr;
208   EVENT_FREE(e, eventAllocator, this);
209 }
210 
211 TS_INLINE void
212 EThread::set_tail_handler(LoopTailHandler *handler)
213 {
214   ink_atomic_swap(&tail_cb, handler);
215 }
216