1 /** @file
2 
3   Multiplexes request to other origins.
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #pragma once
25 
26 #include <arpa/inet.h>
27 #include <iostream>
28 #include <limits>
29 #include <netinet/in.h>
30 
31 #include <cinttypes>
32 
33 #include "chunk-decoder.h"
34 #include "ts.h"
35 
36 #ifndef PLUGIN_TAG
37 #error Please define a PLUGIN_TAG before including this file.
38 #endif
39 
40 #define unlikely(x) __builtin_expect((x), 0)
41 
42 namespace ats
43 {
44 struct HttpParser {
45   bool parsed_ = false;
46   TSHttpParser parser_;
47   TSMBuffer buffer_;
48   TSMLoc location_;
49 
50   void destroyParser();
51 
~HttpParserats::HttpParser52   ~HttpParser()
53   {
54     TSHandleMLocRelease(buffer_, TS_NULL_MLOC, location_);
55     TSMBufferDestroy(buffer_);
56     destroyParser();
57   }
58 
HttpParserats::HttpParser59   HttpParser() : parser_(TSHttpParserCreate()), buffer_(TSMBufferCreate()), location_(TSHttpHdrCreate(buffer_))
60   {
61     TSHttpHdrTypeSet(buffer_, location_, TS_HTTP_TYPE_RESPONSE);
62   }
63 
64   bool parse(io::IO &);
65 
66   int
statusCodeats::HttpParser67   statusCode() const
68   {
69     return static_cast<int>(TSHttpHdrStatusGet(buffer_, location_));
70   }
71 };
72 
73 template <class T> struct HttpTransaction {
74   typedef HttpTransaction<T> Self;
75 
76   bool parsingHeaders_;
77   bool abort_;
78   bool timeout_;
79   io::IO *in_;
80   io::IO *out_;
81   TSVConn vconnection_;
82   TSCont continuation_;
83   T t_;
84   HttpParser parser_;
85   ChunkDecoder *chunkDecoder_;
86 
~HttpTransactionats::HttpTransaction87   ~HttpTransaction()
88   {
89     if (in_ != NULL) {
90       delete in_;
91       in_ = NULL;
92     }
93     if (out_ != NULL) {
94       delete out_;
95       out_ = NULL;
96     }
97     timeout(0);
98     assert(vconnection_ != NULL);
99     if (abort_) {
100       TSVConnAbort(vconnection_, TS_VC_CLOSE_ABORT);
101     } else {
102       TSVConnClose(vconnection_);
103     }
104     assert(continuation_ != NULL);
105     TSContDestroy(continuation_);
106     if (chunkDecoder_ != NULL) {
107       delete chunkDecoder_;
108     }
109   }
110 
HttpTransactionats::HttpTransaction111   HttpTransaction(TSVConn v, TSCont c, io::IO *const i, const uint64_t l, const T &t)
112     : parsingHeaders_(false),
113       abort_(false),
114       timeout_(false),
115       in_(nullptr),
116       out_(i),
117       vconnection_(v),
118       continuation_(c),
119       t_(t),
120       chunkDecoder_(nullptr)
121   {
122     assert(vconnection_ != NULL);
123     assert(continuation_ != NULL);
124     assert(out_ != NULL);
125     assert(l > 0);
126     out_->vio = TSVConnWrite(vconnection_, continuation_, out_->reader, l);
127   }
128 
129   inline void
abortats::HttpTransaction130   abort(const bool b = true)
131   {
132     abort_ = b;
133   }
134 
135   void
timeoutats::HttpTransaction136   timeout(const int64_t t)
137   {
138     assert(t >= 0);
139     assert(vconnection_ != NULL);
140     if (timeout_) {
141       TSVConnActiveTimeoutCancel(vconnection_);
142       timeout_ = false;
143     } else {
144       TSVConnActiveTimeoutSet(vconnection_, t);
145       timeout_ = true;
146     }
147   }
148 
149   static void
closeats::HttpTransaction150   close(Self *const s)
151   {
152     assert(s != NULL);
153     TSVConnShutdown(s->vconnection_, 1, 0);
154     delete s;
155   }
156 
157   static bool
isChunkEncodingats::HttpTransaction158   isChunkEncoding(const TSMBuffer b, const TSMLoc l)
159   {
160     assert(b != nullptr);
161     assert(l != nullptr);
162     bool result        = false;
163     const TSMLoc field = TSMimeHdrFieldFind(b, l, TS_MIME_FIELD_TRANSFER_ENCODING, TS_MIME_LEN_TRANSFER_ENCODING);
164     if (field != nullptr) {
165       int length;
166       const char *const value = TSMimeHdrFieldValueStringGet(b, l, field, -1, &length);
167       if (value != nullptr && length == TS_HTTP_LEN_CHUNKED) {
168         result = strncasecmp(value, TS_HTTP_VALUE_CHUNKED, TS_HTTP_LEN_CHUNKED) == 0;
169       }
170       TSHandleMLocRelease(b, l, field);
171     }
172     return result;
173   }
174 
175   static int
handleats::HttpTransaction176   handle(TSCont c, TSEvent e, void *data)
177   {
178     Self *const self = static_cast<Self *const>(TSContDataGet(c));
179     assert(self != NULL);
180     switch (e) {
181     case TS_EVENT_ERROR:
182       TSDebug(PLUGIN_TAG, "HttpTransaction: ERROR");
183       self->t_.error();
184       self->abort();
185       close(self);
186       TSContDataSet(c, nullptr);
187       break;
188     case TS_EVENT_VCONN_EOS:
189       TSDebug(PLUGIN_TAG, "HttpTransaction: EOS");
190       goto here;
191 
192     case TS_EVENT_VCONN_READ_COMPLETE:
193       TSDebug(PLUGIN_TAG, "HttpTransaction: Read Complete");
194       goto here;
195 
196     case TS_EVENT_VCONN_READ_READY:
197       TSDebug(PLUGIN_TAG, "HttpTransaction: Read");
198     here : {
199       assert(self->in_ != NULL);
200       assert(self->in_->reader != NULL);
201       assert(self->in_->vio != NULL);
202       int64_t available = TSIOBufferReaderAvail(self->in_->reader);
203       if (available > 0) {
204         TSVIONDoneSet(self->in_->vio, available + TSVIONDoneGet(self->in_->vio) + 2);
205         if (self->parsingHeaders_) {
206           if (self->parser_.parse(*self->in_)) {
207             if (isChunkEncoding(self->parser_.buffer_, self->parser_.location_)) {
208               assert(self->chunkDecoder_ == NULL);
209               self->chunkDecoder_ = new ChunkDecoder();
210             }
211             self->t_.header(self->parser_.buffer_, self->parser_.location_);
212             self->parsingHeaders_ = false;
213           }
214         }
215         if (!self->parsingHeaders_) {
216           if (self->chunkDecoder_ != NULL) {
217             available = self->chunkDecoder_->decode(self->in_->reader);
218             if (available == 0) {
219               self->t_.data(self->in_->reader, available);
220             }
221             while (available > 0) {
222               self->t_.data(self->in_->reader, available);
223               TSIOBufferReaderConsume(self->in_->reader, available);
224               available = self->chunkDecoder_->decode(self->in_->reader);
225             }
226           } else {
227             self->t_.data(self->in_->reader, available);
228             TSIOBufferReaderConsume(self->in_->reader, available);
229           }
230         }
231       }
232       if (e == TS_EVENT_VCONN_READ_COMPLETE || e == TS_EVENT_VCONN_EOS) {
233         self->t_.done();
234         close(self);
235         TSContDataSet(c, nullptr);
236       } else if (self->chunkDecoder_ != NULL && self->chunkDecoder_->isEnd()) {
237         assert(self->parsingHeaders_ == false);
238         assert(isChunkEncoding(self->parser_.buffer_, self->parser_.location_));
239         self->abort();
240         self->t_.done();
241         close(self);
242         TSContDataSet(c, nullptr);
243       } else {
244         TSVIOReenable(self->in_->vio);
245       }
246     } break;
247     case TS_EVENT_VCONN_WRITE_COMPLETE:
248       TSDebug(PLUGIN_TAG, "HttpTransaction: Write Complete");
249       self->parsingHeaders_ = true;
250       assert(self->in_ == NULL);
251       self->in_ = io::IO::read(self->vconnection_, c);
252       assert(self->vconnection_);
253       TSVConnShutdown(self->vconnection_, 0, 1);
254       assert(self->out_ != NULL);
255       delete self->out_;
256       self->out_ = NULL;
257       break;
258     case TS_EVENT_VCONN_WRITE_READY:
259       TSDebug(PLUGIN_TAG, "HttpTransaction: Write Ready (Done: %" PRId64 " Todo: %" PRId64 ")", TSVIONDoneGet(self->out_->vio),
260               TSVIONTodoGet(self->out_->vio));
261       assert(self->out_ != NULL);
262       TSVIOReenable(self->out_->vio);
263       break;
264     case 106:
265     case TS_EVENT_TIMEOUT:
266     case TS_EVENT_VCONN_INACTIVITY_TIMEOUT:
267       TSDebug(PLUGIN_TAG, "HttpTransaction: Timeout");
268       self->t_.timeout();
269       self->abort();
270       close(self);
271       TSContDataSet(c, nullptr);
272       break;
273 
274     default:
275       assert(false); // UNREACHABLE.
276     }
277     return 0;
278   }
279 };
280 
281 template <class T>
282 bool
get(const std::string & a,io::IO * const i,const int64_t l,const T & t,const int64_t ti=0)283 get(const std::string &a, io::IO *const i, const int64_t l, const T &t, const int64_t ti = 0)
284 {
285   typedef HttpTransaction<T> Transaction;
286   struct sockaddr_in socket;
287   socket.sin_family = AF_INET;
288   socket.sin_port   = 80;
289   if (!inet_pton(AF_INET, a.c_str(), &socket.sin_addr)) {
290     TSDebug(PLUGIN_TAG, "ats::get Invalid address provided \"%s\".", a.c_str());
291     return false;
292   }
293   TSVConn vconn = TSHttpConnect(reinterpret_cast<sockaddr *>(&socket));
294   assert(vconn != nullptr);
295   TSCont contp = TSContCreate(Transaction::handle, TSMutexCreate());
296   assert(contp != nullptr);
297   Transaction *transaction = new Transaction(vconn, contp, i, l, t);
298   TSContDataSet(contp, transaction);
299   if (ti > 0) {
300     TSDebug(PLUGIN_TAG, "ats::get Setting active timeout to: %" PRId64, ti);
301     transaction->timeout(ti);
302   }
303   return true;
304 }
305 
306 template <class T>
307 bool
get(io::IO * const i,const int64_t l,const T & t,const int64_t ti=0)308 get(io::IO *const i, const int64_t l, const T &t, const int64_t ti = 0)
309 {
310   return get("127.0.0.1", i, l, t, ti);
311 }
312 } // namespace ats
313