1 /*
2   Licensed to the Apache Software Foundation (ASF) under one
3   or more contributor license agreements.  See the NOTICE file
4   distributed with this work for additional information
5   regarding copyright ownership.  The ASF licenses this file
6   to you under the Apache License, Version 2.0 (the
7   "License"); you may not use this file except in compliance
8   with the License.  You may obtain a copy of the License at
9 
10   http://www.apache.org/licenses/LICENSE-2.0
11 
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17 */
18 
19 #include "ts_lua_util.h"
20 #include "ts_lua_http_intercept.h"
21 
22 static int ts_lua_http_intercept(lua_State *L);
23 static int ts_lua_http_server_intercept(lua_State *L);
24 static int ts_lua_http_intercept_entry(TSCont contp, TSEvent event, void *edata);
25 static void ts_lua_http_intercept_process(ts_lua_http_intercept_ctx *ictx, TSVConn conn);
26 static void ts_lua_http_intercept_setup_read(ts_lua_http_intercept_ctx *ictx);
27 static void ts_lua_http_intercept_setup_write(ts_lua_http_intercept_ctx *ictx);
28 static int ts_lua_http_intercept_handler(TSCont contp, TSEvent event, void *edata);
29 static int ts_lua_http_intercept_run_coroutine(ts_lua_http_intercept_ctx *ictx, int n);
30 static int ts_lua_http_intercept_process_read(TSEvent event, ts_lua_http_intercept_ctx *ictx);
31 static int ts_lua_http_intercept_process_write(TSEvent event, ts_lua_http_intercept_ctx *ictx);
32 
33 static int ts_lua_say(lua_State *L);
34 static int ts_lua_flush(lua_State *L);
35 static int ts_lua_flush_wakeup(ts_lua_http_intercept_ctx *ictx);
36 static int ts_lua_flush_wakeup_handler(TSCont contp, TSEvent event, void *edata);
37 static int ts_lua_flush_cleanup(ts_lua_async_item *ai);
38 
39 void
ts_lua_inject_http_intercept_api(lua_State * L)40 ts_lua_inject_http_intercept_api(lua_State *L)
41 {
42   /* ts.intercept */
43   lua_pushcfunction(L, ts_lua_http_intercept);
44   lua_setfield(L, -2, "intercept");
45 
46   /* ts.server_intercept */
47   lua_pushcfunction(L, ts_lua_http_server_intercept);
48   lua_setfield(L, -2, "server_intercept");
49 }
50 
51 void
ts_lua_inject_intercept_api(lua_State * L)52 ts_lua_inject_intercept_api(lua_State *L)
53 {
54   /*  ts.say(...) */
55   lua_pushcfunction(L, ts_lua_say);
56   lua_setfield(L, -2, "say");
57 
58   /*  ts.flush(...) */
59   lua_pushcfunction(L, ts_lua_flush);
60   lua_setfield(L, -2, "flush");
61 }
62 
63 static int
ts_lua_http_intercept(lua_State * L)64 ts_lua_http_intercept(lua_State *L)
65 {
66   TSCont contp;
67   int type, n;
68   ts_lua_http_ctx *http_ctx;
69   ts_lua_http_intercept_ctx *ictx;
70 
71   GET_HTTP_CONTEXT(http_ctx, L);
72 
73   n = lua_gettop(L);
74 
75   if (n < 1) {
76     TSError("[ts_lua][%s] ts.http.intercept need at least one param", __FUNCTION__);
77     return 0;
78   }
79 
80   type = lua_type(L, 1);
81   if (type != LUA_TFUNCTION) {
82     TSError("[ts_lua][%s] ts.http.intercept should use function as param, but there is %s", __FUNCTION__, lua_typename(L, type));
83     return 0;
84   }
85 
86   ictx  = ts_lua_create_http_intercept_ctx(L, http_ctx, n);
87   contp = TSContCreate(ts_lua_http_intercept_entry, TSMutexCreate());
88   TSContDataSet(contp, ictx);
89 
90   TSHttpTxnIntercept(contp, http_ctx->txnp);
91   http_ctx->has_hook = 1;
92 
93   return 0;
94 }
95 
96 static int
ts_lua_http_server_intercept(lua_State * L)97 ts_lua_http_server_intercept(lua_State *L)
98 {
99   TSCont contp;
100   int type, n;
101   ts_lua_http_ctx *http_ctx;
102   ts_lua_http_intercept_ctx *ictx;
103 
104   GET_HTTP_CONTEXT(http_ctx, L);
105 
106   n = lua_gettop(L);
107 
108   if (n < 1) {
109     TSError("[ts_lua][%s] ts.http.server_intercept need at least one param", __FUNCTION__);
110     return 0;
111   }
112 
113   type = lua_type(L, 1);
114   if (type != LUA_TFUNCTION) {
115     TSError("[ts_lua][%s] ts.http.server_intercept should use function as param, but there is %s", __FUNCTION__,
116             lua_typename(L, type));
117     return 0;
118   }
119 
120   ictx  = ts_lua_create_http_intercept_ctx(L, http_ctx, n);
121   contp = TSContCreate(ts_lua_http_intercept_entry, TSMutexCreate());
122   TSContDataSet(contp, ictx);
123 
124   TSHttpTxnServerIntercept(contp, http_ctx->txnp);
125   http_ctx->has_hook = 1;
126 
127   return 0;
128 }
129 
130 static int
ts_lua_http_intercept_entry(TSCont contp,TSEvent event,void * edata)131 ts_lua_http_intercept_entry(TSCont contp, TSEvent event, void *edata)
132 {
133   ts_lua_http_intercept_ctx *ictx;
134 
135   ictx = (ts_lua_http_intercept_ctx *)TSContDataGet(contp);
136 
137   switch (event) {
138   case TS_EVENT_NET_ACCEPT_FAILED:
139     if (edata) {
140       TSVConnClose((TSVConn)edata);
141     }
142 
143     ts_lua_destroy_http_intercept_ctx(ictx);
144     break;
145 
146   case TS_EVENT_NET_ACCEPT:
147     ts_lua_http_intercept_process(ictx, (TSVConn)edata);
148     break;
149 
150   default:
151     break;
152   }
153 
154   TSContDestroy(contp);
155   return 0;
156 }
157 
158 static void
ts_lua_http_intercept_process(ts_lua_http_intercept_ctx * ictx,TSVConn conn)159 ts_lua_http_intercept_process(ts_lua_http_intercept_ctx *ictx, TSVConn conn)
160 {
161   int n;
162   TSCont contp;
163   lua_State *L;
164   TSMutex mtxp;
165   ts_lua_cont_info *ci;
166 
167   ci   = &ictx->cinfo;
168   mtxp = ictx->cinfo.routine.mctx->mutexp;
169 
170   contp = TSContCreate(ts_lua_http_intercept_handler, TSMutexCreate());
171   TSContDataSet(contp, ictx);
172 
173   ci->contp = contp;
174   ci->mutex = TSContMutexGet(contp);
175 
176   ictx->net_vc = conn;
177 
178   // set up read.
179   ts_lua_http_intercept_setup_read(ictx);
180 
181   // set up write.
182   ts_lua_http_intercept_setup_write(ictx);
183 
184   // invoke function here
185   L = ci->routine.lua;
186 
187   TSMutexLock(mtxp);
188 
189   n = lua_gettop(L);
190 
191   ts_lua_http_intercept_run_coroutine(ictx, n - 1);
192 
193   TSMutexUnlock(mtxp);
194 }
195 
196 static void
ts_lua_http_intercept_setup_read(ts_lua_http_intercept_ctx * ictx)197 ts_lua_http_intercept_setup_read(ts_lua_http_intercept_ctx *ictx)
198 {
199   ictx->input.buffer = TSIOBufferCreate();
200   ictx->input.reader = TSIOBufferReaderAlloc(ictx->input.buffer);
201   ictx->input.vio    = TSVConnRead(ictx->net_vc, ictx->cinfo.contp, ictx->input.buffer, INT64_MAX);
202 }
203 
204 static void
ts_lua_http_intercept_setup_write(ts_lua_http_intercept_ctx * ictx)205 ts_lua_http_intercept_setup_write(ts_lua_http_intercept_ctx *ictx)
206 {
207   ictx->output.buffer = TSIOBufferCreate();
208   ictx->output.reader = TSIOBufferReaderAlloc(ictx->output.buffer);
209   ictx->output.vio    = TSVConnWrite(ictx->net_vc, ictx->cinfo.contp, ictx->output.reader, INT64_MAX);
210 }
211 
212 static int
ts_lua_http_intercept_handler(TSCont contp,TSEvent event,void * edata)213 ts_lua_http_intercept_handler(TSCont contp, TSEvent event, void *edata)
214 {
215   int ret, n;
216   TSMutex mtxp;
217   ts_lua_http_intercept_ctx *ictx;
218 
219   ictx = (ts_lua_http_intercept_ctx *)TSContDataGet(contp);
220   mtxp = NULL;
221 
222   if (edata == ictx->input.vio) {
223     ret = ts_lua_http_intercept_process_read(event, ictx);
224 
225   } else if (edata == ictx->output.vio) {
226     ret = ts_lua_http_intercept_process_write(event, ictx);
227 
228   } else {
229     mtxp = ictx->cinfo.routine.mctx->mutexp;
230     n    = (intptr_t)edata;
231 
232     TSMutexLock(mtxp);
233     ret = ts_lua_http_intercept_run_coroutine(ictx, n);
234     TSMutexUnlock(mtxp);
235   }
236 
237   if (ret || (ictx->send_complete && ictx->recv_complete)) {
238     ts_lua_destroy_http_intercept_ctx(ictx);
239   }
240 
241   return 0;
242 }
243 
244 static int
ts_lua_http_intercept_run_coroutine(ts_lua_http_intercept_ctx * ictx,int n)245 ts_lua_http_intercept_run_coroutine(ts_lua_http_intercept_ctx *ictx, int n)
246 {
247   int ret;
248   int64_t avail;
249   int64_t done;
250   ts_lua_cont_info *ci;
251   lua_State *L;
252 
253   ci = &ictx->cinfo;
254   L  = ci->routine.lua;
255 
256   ts_lua_set_cont_info(L, ci);
257   ret = lua_resume(L, n);
258 
259   switch (ret) {
260   case 0: // finished
261     avail = TSIOBufferReaderAvail(ictx->output.reader);
262     done  = TSVIONDoneGet(ictx->output.vio);
263     TSVIONBytesSet(ictx->output.vio, avail + done);
264     ictx->all_ready = 1;
265 
266     if (avail) {
267       TSVIOReenable(ictx->output.vio);
268     } else {
269       ictx->send_complete = 1;
270     }
271 
272     break;
273 
274   case 1: // yield
275     break;
276 
277   default: // error
278     TSError("[ts_lua][%s] lua_resume failed: %s", __FUNCTION__, lua_tostring(L, -1));
279     lua_pop(L, 1);
280     return -1;
281   }
282 
283   return 0;
284 }
285 
286 static int
ts_lua_http_intercept_process_read(TSEvent event,ts_lua_http_intercept_ctx * ictx)287 ts_lua_http_intercept_process_read(TSEvent event, ts_lua_http_intercept_ctx *ictx)
288 {
289   int64_t avail = TSIOBufferReaderAvail(ictx->input.reader);
290   TSIOBufferReaderConsume(ictx->input.reader, avail);
291 
292   switch (event) {
293   case TS_EVENT_VCONN_READ_READY:
294     TSVConnShutdown(ictx->net_vc, 1, 0);
295     ictx->recv_complete = 1;
296     break; // READ_READY_READY is not equal to EOS break statement is probably missing?
297   case TS_EVENT_VCONN_READ_COMPLETE:
298   case TS_EVENT_VCONN_EOS:
299     ictx->recv_complete = 1;
300     break;
301 
302   default:
303     return -1;
304   }
305 
306   return 0;
307 }
308 
309 static int
ts_lua_http_intercept_process_write(TSEvent event,ts_lua_http_intercept_ctx * ictx)310 ts_lua_http_intercept_process_write(TSEvent event, ts_lua_http_intercept_ctx *ictx)
311 {
312   int64_t done, avail;
313 
314   switch (event) {
315   case TS_EVENT_VCONN_WRITE_READY:
316 
317     avail = TSIOBufferReaderAvail(ictx->output.reader);
318 
319     if (ictx->all_ready) {
320       TSVIOReenable(ictx->output.vio);
321 
322     } else if (ictx->to_flush > 0) { // ts.flush()
323 
324       done = TSVIONDoneGet(ictx->output.vio);
325 
326       if (ictx->to_flush > done) {
327         TSVIOReenable(ictx->output.vio);
328 
329       } else { // we had flush all the data we want
330         ictx->to_flush = 0;
331         ts_lua_flush_wakeup(ictx); // wake up
332       }
333 
334     } else if (avail > 0) {
335       TSVIOReenable(ictx->output.vio);
336     }
337 
338     break;
339 
340   case TS_EVENT_VCONN_WRITE_COMPLETE:
341     ictx->send_complete = 1;
342     break;
343 
344   case TS_EVENT_ERROR:
345   default:
346     return -1;
347   }
348 
349   return 0;
350 }
351 
352 static int
ts_lua_say(lua_State * L)353 ts_lua_say(lua_State *L)
354 {
355   const char *data;
356   size_t len;
357   ts_lua_http_intercept_ctx *ictx;
358 
359   ictx = ts_lua_get_http_intercept_ctx(L);
360   if (ictx == NULL) {
361     TSError("[ts_lua][%s] missing ictx", __FUNCTION__);
362     return 0;
363   }
364 
365   data = luaL_checklstring(L, 1, &len);
366 
367   if (len > 0) {
368     TSIOBufferWrite(ictx->output.buffer, data, len);
369     TSVIOReenable(ictx->output.vio);
370   }
371 
372   return 0;
373 }
374 
375 static int
ts_lua_flush(lua_State * L)376 ts_lua_flush(lua_State *L)
377 {
378   int64_t avail;
379   ts_lua_http_intercept_ctx *ictx;
380 
381   ictx = ts_lua_get_http_intercept_ctx(L);
382   if (ictx == NULL) {
383     TSError("[ts_lua][%s] missing ictx", __FUNCTION__);
384     return 0;
385   }
386 
387   avail = TSIOBufferReaderAvail(ictx->output.reader);
388 
389   if (avail > 0) {
390     ictx->to_flush = TSVIONDoneGet(ictx->output.vio) + TSIOBufferReaderAvail(ictx->output.reader);
391     TSVIOReenable(ictx->output.vio);
392 
393     return lua_yield(L, 0);
394   }
395 
396   return 0;
397 }
398 
399 static int
ts_lua_flush_wakeup(ts_lua_http_intercept_ctx * ictx)400 ts_lua_flush_wakeup(ts_lua_http_intercept_ctx *ictx)
401 {
402   ts_lua_async_item *ai;
403   ts_lua_cont_info *ci;
404   TSAction action;
405   TSCont contp;
406 
407   ci = &ictx->cinfo;
408 
409   contp  = TSContCreate(ts_lua_flush_wakeup_handler, ci->mutex);
410   action = TSContScheduleOnPool(contp, 0, TS_THREAD_POOL_NET);
411 
412   ai = ts_lua_async_create_item(contp, ts_lua_flush_cleanup, (void *)action, ci);
413   TSContDataSet(contp, ai);
414 
415   return 0;
416 }
417 
418 static int
ts_lua_flush_wakeup_handler(TSCont contp,TSEvent event ATS_UNUSED,void * edata ATS_UNUSED)419 ts_lua_flush_wakeup_handler(TSCont contp, TSEvent event ATS_UNUSED, void *edata ATS_UNUSED)
420 {
421   ts_lua_async_item *ai;
422   ts_lua_cont_info *ci;
423 
424   ai = TSContDataGet(contp);
425   ci = ai->cinfo;
426 
427   ai->data = NULL;
428 
429   ts_lua_flush_cleanup(ai);
430 
431   TSContCall(ci->contp, TS_LUA_EVENT_COROUTINE_CONT, 0);
432 
433   return 0;
434 }
435 
436 static int
ts_lua_flush_cleanup(ts_lua_async_item * ai)437 ts_lua_flush_cleanup(ts_lua_async_item *ai)
438 {
439   if (ai->deleted) {
440     return 0;
441   }
442 
443   if (ai->data) {
444     TSActionCancel((TSAction)ai->data);
445     ai->data = NULL;
446   }
447 
448   TSContDestroy(ai->contp);
449   ai->deleted = 1;
450 
451   return 0;
452 }
453