1 /**
2   Licensed to the Apache Software Foundation (ASF) under one
3   or more contributor license agreements.  See the NOTICE file
4   distributed with this work for additional information
5   regarding copyright ownership.  The ASF licenses this file
6   to you under the Apache License, Version 2.0 (the
7   "License"); you may not use this file except in compliance
8   with the License.  You may obtain a copy of the License at
9 
10       http://www.apache.org/licenses/LICENSE-2.0
11 
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17  */
18 
19 /**
20  * @file InterceptPlugin.cc
21  */
22 
23 #include "tscpp/api/InterceptPlugin.h"
24 
25 #include "ts/ts.h"
26 #include "logging_internal.h"
27 #include "tscpp/api/noncopyable.h"
28 #include "utils_internal.h"
29 
30 #include <cstdlib>
31 #include <cerrno>
32 
33 #ifndef INT64_MAX
34 #define INT64_MAX (9223372036854775807LL)
35 #endif
36 
37 using namespace atscppapi;
38 using std::string;
39 
40 /**
41  * @private
42  */
43 struct InterceptPlugin::State {
44   TSCont cont_;
45   TSVConn net_vc_ = nullptr;
46 
47   struct IoHandle {
48     TSVIO vio_               = nullptr;
49     TSIOBuffer buffer_       = nullptr;
50     TSIOBufferReader reader_ = nullptr;
51     IoHandle()               = default;
52     ;
~IoHandleInterceptPlugin::State::IoHandle53     ~IoHandle()
54     {
55       if (reader_) {
56         TSIOBufferReaderFree(reader_);
57       }
58       if (buffer_) {
59         TSIOBufferDestroy(buffer_);
60       }
61     };
62   };
63 
64   IoHandle input_;
65   IoHandle output_;
66 
67   /** the API doesn't recognize end of input; so we have to explicitly
68    * figure out when to continue reading and when to stop */
69   TSHttpParser http_parser_;
70   int expected_body_size_  = 0;
71   int num_body_bytes_read_ = 0;
72   bool hdr_parsed_         = false;
73 
74   TSMBuffer hdr_buf_     = nullptr;
75   TSMLoc hdr_loc_        = nullptr;
76   int num_bytes_written_ = 0;
77   std::shared_ptr<Mutex> plugin_mutex_;
78   InterceptPlugin *plugin_ = nullptr;
79   Headers request_headers_;
80 
81   /** these two fields to be used by the continuation callback only */
82   TSEvent saved_event_ = TS_EVENT_NONE;
83   void *saved_edata_   = nullptr;
84 
85   TSAction timeout_action_ = nullptr;
86   bool plugin_io_done_     = false;
87 
StateInterceptPlugin::State88   State(TSCont cont, InterceptPlugin *plugin) : cont_(cont), plugin_(plugin)
89   {
90     plugin_mutex_ = plugin->getMutex();
91     http_parser_  = TSHttpParserCreate();
92   }
93 
~StateInterceptPlugin::State94   ~State()
95   {
96     TSHttpParserDestroy(http_parser_);
97     if (hdr_loc_) {
98       TSHandleMLocRelease(hdr_buf_, TS_NULL_MLOC, hdr_loc_);
99     }
100     if (hdr_buf_) {
101       TSMBufferDestroy(hdr_buf_);
102     }
103   }
104 };
105 
106 namespace
107 {
108 int handleEvents(TSCont cont, TSEvent event, void *edata);
109 void destroyCont(InterceptPlugin::State *state);
110 } // namespace
111 
InterceptPlugin(Transaction & transaction,InterceptPlugin::Type type)112 InterceptPlugin::InterceptPlugin(Transaction &transaction, InterceptPlugin::Type type) : TransactionPlugin(transaction)
113 {
114   TSCont cont = TSContCreate(handleEvents, TSMutexCreate());
115   state_      = new State(cont, this);
116   TSContDataSet(cont, state_);
117   TSHttpTxn txn = static_cast<TSHttpTxn>(transaction.getAtsHandle());
118   if (type == SERVER_INTERCEPT) {
119     TSHttpTxnServerIntercept(cont, txn);
120   } else {
121     TSHttpTxnIntercept(cont, txn);
122   }
123 }
124 
~InterceptPlugin()125 InterceptPlugin::~InterceptPlugin()
126 {
127   if (state_->cont_) {
128     LOG_DEBUG("Relying on callback for cleanup");
129     state_->plugin_ = nullptr; // prevent callback from invoking plugin
130   } else {                     // safe to cleanup
131     LOG_DEBUG("Normal cleanup");
132     delete state_;
133   }
134 }
135 
136 bool
produce(const void * data,int data_size)137 InterceptPlugin::produce(const void *data, int data_size)
138 {
139   std::lock_guard<Mutex> lock(*getMutex());
140   if (!state_->net_vc_) {
141     LOG_ERROR("Intercept not operational");
142     return false;
143   }
144   if (!state_->output_.buffer_) {
145     state_->output_.buffer_ = TSIOBufferCreate();
146     state_->output_.reader_ = TSIOBufferReaderAlloc(state_->output_.buffer_);
147     state_->output_.vio_    = TSVConnWrite(state_->net_vc_, state_->cont_, state_->output_.reader_, INT64_MAX);
148   }
149   int num_bytes_written = TSIOBufferWrite(state_->output_.buffer_, data, data_size);
150   if (num_bytes_written != data_size) {
151     LOG_ERROR("Error while writing to buffer! Attempted %d bytes but only wrote %d bytes", data_size, num_bytes_written);
152     return false;
153   }
154   TSVIOReenable(state_->output_.vio_);
155   state_->num_bytes_written_ += data_size;
156   LOG_DEBUG("Wrote %d bytes in response", data_size);
157   return true;
158 }
159 
160 bool
setOutputComplete()161 InterceptPlugin::setOutputComplete()
162 {
163   std::lock_guard<Mutex> scopedLock(*getMutex());
164   if (!state_->net_vc_) {
165     LOG_ERROR("Intercept not operational");
166     return false;
167   }
168   if (!state_->output_.buffer_) {
169     LOG_ERROR("No output produced so far");
170     return false;
171   }
172   TSVIONBytesSet(state_->output_.vio_, state_->num_bytes_written_);
173   TSVIOReenable(state_->output_.vio_);
174   state_->plugin_io_done_ = true;
175   LOG_DEBUG("Response complete");
176   return true;
177 }
178 
179 Headers &
getRequestHeaders()180 InterceptPlugin::getRequestHeaders()
181 {
182   return state_->request_headers_;
183 }
184 
185 TSSslConnection
getSslConnection()186 InterceptPlugin::getSslConnection()
187 {
188   if (!state_->net_vc_) {
189     LOG_ERROR("Intercept Plugin is not ready to provide SSL Connection");
190     return nullptr;
191   }
192 
193   return TSVConnSslConnectionGet(state_->net_vc_);
194 }
195 
196 bool
doRead()197 InterceptPlugin::doRead()
198 {
199   int avail = TSIOBufferReaderAvail(state_->input_.reader_);
200   if (avail == TS_ERROR) {
201     LOG_ERROR("Error while getting number of bytes available");
202     return false;
203   }
204 
205   int consumed = 0; // consumed is used to update the input buffers
206   if (avail > 0) {
207     int64_t num_body_bytes_in_block;
208     int64_t data_len; // size of all data (header + body) in a block
209     const char *data, *startptr;
210     TSIOBufferBlock block = TSIOBufferReaderStart(state_->input_.reader_);
211     while (block != nullptr) {
212       startptr = data         = TSIOBufferBlockReadStart(block, state_->input_.reader_, &data_len);
213       num_body_bytes_in_block = 0;
214       if (!state_->hdr_parsed_) {
215         const char *endptr = data + data_len;
216         if (TSHttpHdrParseReq(state_->http_parser_, state_->hdr_buf_, state_->hdr_loc_, &data, endptr) == TS_PARSE_DONE) {
217           LOG_DEBUG("Parsed header");
218           string content_length_str = state_->request_headers_.value("Content-Length");
219           if (!content_length_str.empty()) {
220             const char *start_ptr = content_length_str.data();
221             char *end_ptr;
222             int content_length = strtol(start_ptr, &end_ptr, 10 /* base */);
223             if ((errno != ERANGE) && (end_ptr != start_ptr) && (*end_ptr == '\0')) {
224               LOG_DEBUG("Got content length: %d", content_length);
225               state_->expected_body_size_ = content_length;
226             } else {
227               LOG_ERROR("Invalid content length header [%s]; Assuming no content", content_length_str.c_str());
228             }
229           }
230           if (state_->request_headers_.value("Transfer-Encoding") == "chunked") {
231             // implementing a "dechunker" is non-trivial and in the real
232             // world, most browsers don't send chunked requests
233             LOG_ERROR("Support for chunked request not implemented! Assuming no body");
234           }
235           LOG_DEBUG("Expecting %d bytes of request body", state_->expected_body_size_);
236           state_->hdr_parsed_ = true;
237           // remaining data in this block is body; 'data' will be pointing to first byte of the body
238           num_body_bytes_in_block = endptr - data;
239         }
240         consume(string(startptr, data - startptr), InterceptPlugin::REQUEST_HEADER);
241       } else {
242         num_body_bytes_in_block = data_len;
243       }
244       if (num_body_bytes_in_block) {
245         state_->num_body_bytes_read_ += num_body_bytes_in_block;
246         consume(string(data, num_body_bytes_in_block), InterceptPlugin::REQUEST_BODY);
247       }
248       consumed += data_len;
249       block = TSIOBufferBlockNext(block);
250     }
251   }
252   LOG_DEBUG("Consumed %d bytes from input vio", consumed);
253   TSIOBufferReaderConsume(state_->input_.reader_, consumed);
254 
255   // Modify the input VIO to reflect how much data we've completed.
256   TSVIONDoneSet(state_->input_.vio_, TSVIONDoneGet(state_->input_.vio_) + consumed);
257 
258   if (isWebsocket()) {
259     TSVIOReenable(state_->input_.vio_);
260     return true;
261   }
262 
263   if ((state_->hdr_parsed_) && (state_->num_body_bytes_read_ >= state_->expected_body_size_)) {
264     LOG_DEBUG("Completely read body");
265     if (state_->num_body_bytes_read_ > state_->expected_body_size_) {
266       LOG_ERROR("Read more data than specified in request");
267       // TODO: any further action required?
268     }
269     handleInputComplete();
270   } else {
271     LOG_DEBUG("Reenabling input vio as %d bytes still need to be read", state_->expected_body_size_ - state_->num_body_bytes_read_);
272     TSVIOReenable(state_->input_.vio_);
273   }
274   return true;
275 }
276 
277 void
handleEvent(int abstract_event,void * edata)278 InterceptPlugin::handleEvent(int abstract_event, void *edata)
279 {
280   TSEvent event = static_cast<TSEvent>(abstract_event);
281   LOG_DEBUG("Received event %d", event);
282 
283   switch (event) {
284   case TS_EVENT_NET_ACCEPT:
285     LOG_DEBUG("Handling net accept");
286     state_->net_vc_        = static_cast<TSVConn>(edata);
287     state_->input_.buffer_ = TSIOBufferCreate();
288     state_->input_.reader_ = TSIOBufferReaderAlloc(state_->input_.buffer_);
289     state_->input_.vio_    = TSVConnRead(state_->net_vc_, state_->cont_, state_->input_.buffer_,
290                                       INT64_MAX /* number of bytes to read - high value initially */);
291 
292     state_->hdr_buf_ = TSMBufferCreate();
293     state_->hdr_loc_ = TSHttpHdrCreate(state_->hdr_buf_);
294     state_->request_headers_.reset(state_->hdr_buf_, state_->hdr_loc_);
295     TSHttpHdrTypeSet(state_->hdr_buf_, state_->hdr_loc_, TS_HTTP_TYPE_REQUEST);
296     break;
297 
298   case TS_EVENT_VCONN_WRITE_READY: // nothing to do
299     LOG_DEBUG("Got write ready");
300     break;
301 
302   case TS_EVENT_VCONN_READ_READY:
303     LOG_DEBUG("Handling read ready");
304     if (doRead()) {
305       break;
306     }
307     // else fall through into the next shut down cases
308     LOG_ERROR("Error while reading request!");
309     // fallthrough
310 
311   case TS_EVENT_VCONN_READ_COMPLETE: // fall throughs intentional
312   case TS_EVENT_VCONN_WRITE_COMPLETE:
313   case TS_EVENT_VCONN_EOS:
314   case TS_EVENT_ERROR:             // erroring out, nothing more to do
315   case TS_EVENT_NET_ACCEPT_FAILED: // somebody canceled the transaction
316     if (event == TS_EVENT_ERROR) {
317       LOG_ERROR("Unknown Error!");
318     } else if (event == TS_EVENT_NET_ACCEPT_FAILED) {
319       LOG_ERROR("Got net_accept_failed!");
320     }
321     LOG_DEBUG("Shutting down intercept");
322     destroyCont(state_);
323     break;
324 
325   default:
326     LOG_ERROR("Unknown event %d", event);
327   }
328 }
329 
330 namespace
331 {
332 class TryLockGuard
333 {
334 public:
TryLockGuard(Mutex & m)335   TryLockGuard(Mutex &m) : _m(m), _isLocked(m.try_lock()) {}
336 
337   bool
isLocked() const338   isLocked() const
339   {
340     return _isLocked;
341   }
342 
~TryLockGuard()343   ~TryLockGuard()
344   {
345     if (_isLocked) {
346       _m.unlock();
347     }
348   }
349 
350 private:
351   std::recursive_mutex &_m;
352   const bool _isLocked;
353 };
354 
355 int
handleEvents(TSCont cont,TSEvent pristine_event,void * pristine_edata)356 handleEvents(TSCont cont, TSEvent pristine_event, void *pristine_edata)
357 {
358   // Separating pristine and mutable data helps debugging
359   TSEvent event = pristine_event;
360   void *edata   = pristine_edata;
361 
362   InterceptPlugin::State *state = static_cast<InterceptPlugin::State *>(TSContDataGet(cont));
363   if (!state) { // plugin is done, return.
364     return 0;
365   }
366 
367   TryLockGuard scopedTryLock(*(state->plugin_mutex_));
368   if (!scopedTryLock.isLocked()) {
369     LOG_ERROR("Couldn't get plugin lock. Will retry");
370     if (event != TS_EVENT_TIMEOUT) { // save only "non-retry" info
371       state->saved_event_ = event;
372       state->saved_edata_ = edata;
373     }
374     state->timeout_action_ = TSContScheduleOnPool(cont, 1, TS_THREAD_POOL_NET);
375     return 0;
376   }
377   if (event == TS_EVENT_TIMEOUT) { // we have a saved event to restore
378     state->timeout_action_ = nullptr;
379     if (state->plugin_io_done_) { // plugin is done, so can't send it saved event
380       event = TS_EVENT_VCONN_EOS; // fake completion
381       edata = nullptr;
382     } else {
383       event = state->saved_event_;
384       edata = state->saved_edata_;
385     }
386   }
387   if (state->plugin_) {
388     utils::internal::dispatchInterceptEvent(state->plugin_, event, edata);
389   } else { // plugin was destroyed before intercept was completed; cleaning up here
390     LOG_DEBUG("Cleaning up as intercept plugin is already destroyed");
391     destroyCont(state);
392     TSContDataSet(cont, nullptr);
393     delete state;
394   }
395   return 0;
396 }
397 
398 void
destroyCont(InterceptPlugin::State * state)399 destroyCont(InterceptPlugin::State *state)
400 {
401   if (state->net_vc_) {
402     TSVConnShutdown(state->net_vc_, 1, 1);
403     TSVConnClose(state->net_vc_);
404     state->net_vc_ = nullptr;
405   }
406 
407   if (state->cont_) {
408     if (state->timeout_action_) {
409       TSActionCancel(state->timeout_action_);
410       state->timeout_action_ = nullptr;
411     }
412     TSContDestroy(state->cont_);
413     state->cont_ = nullptr;
414   }
415 }
416 } // namespace
417