1 /** @file
2 
3   Multiplexes request to other origins.
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 #include <cinttypes>
24 #include <sys/time.h>
25 
26 #include "dispatch.h"
27 #include "fetcher.h"
28 #include "original-request.h"
29 
30 #ifndef PLUGIN_TAG
31 #error Please define a PLUGIN_TAG before including this file.
32 #endif
33 
34 extern Statistics statistics;
35 
36 size_t timeout;
37 
Request(const std::string & h,const TSMBuffer b,const TSMLoc l)38 Request::Request(const std::string &h, const TSMBuffer b, const TSMLoc l) : host(h), length(0), io(new ats::io::IO())
39 {
40   assert(!host.empty());
41   assert(b != nullptr);
42   assert(l != nullptr);
43   assert(io.get() != nullptr);
44   TSHttpHdrPrint(b, l, io->buffer);
45   length = TSIOBufferReaderAvail(io->reader);
46   assert(length > 0);
47   /*
48    * TSHttpHdrLengthGet returns the size with possible "internal" headers
49    * which are not printed by TSHttpHdrPrint.
50    * Therefore the greater than or equal comparison
51    */
52   assert(TSHttpHdrLengthGet(b, l) >= length);
53 }
54 
Request(Request && that)55 Request::Request(Request &&that) : host(std::move(that.host)), length(that.length), io(std::move(that.io))
56 {
57   assert(!host.empty());
58   assert(length > 0);
59   assert(io.get() != nullptr);
60 }
61 
62 Request &
operator =(const Request & r)63 Request::operator=(const Request &r)
64 {
65   host   = r.host;
66   length = r.length;
67   io.reset(const_cast<Request &>(r).io.release());
68   assert(!host.empty());
69   assert(length > 0);
70   assert(io.get() != nullptr);
71   assert(r.io.get() == nullptr);
72   return *this;
73 }
74 
75 uint64_t
copy(const TSIOBufferReader & r,const TSIOBuffer b)76 copy(const TSIOBufferReader &r, const TSIOBuffer b)
77 {
78   assert(r != nullptr);
79   assert(b != nullptr);
80   TSIOBufferBlock block = TSIOBufferReaderStart(r);
81 
82   uint64_t length = 0;
83 
84   for (; block; block = TSIOBufferBlockNext(block)) {
85     int64_t size              = 0;
86     const void *const pointer = TSIOBufferBlockReadStart(block, r, &size);
87 
88     if (pointer != nullptr && size > 0) {
89       const int64_t size2 = TSIOBufferWrite(b, pointer, size);
90       assert(size == size2);
91       length += size;
92     }
93   }
94 
95   return length;
96 }
97 
98 uint64_t
read(const TSIOBufferReader & r,std::string & o,int64_t l=0)99 read(const TSIOBufferReader &r, std::string &o, int64_t l = 0)
100 {
101   assert(r != nullptr);
102   TSIOBufferBlock block = TSIOBufferReaderStart(r);
103 
104   assert(l >= 0);
105   if (l == 0) {
106     l = TSIOBufferReaderAvail(r);
107     assert(l >= 0);
108   }
109 
110   uint64_t length = 0;
111 
112   for (; block && l > 0; block = TSIOBufferBlockNext(block)) {
113     int64_t size              = 0;
114     const char *const pointer = TSIOBufferBlockReadStart(block, r, &size);
115     if (pointer != nullptr && size > 0) {
116       size = std::min(size, l);
117       o.append(pointer, size);
118       length += size;
119       l -= size;
120     }
121   }
122 
123   return length;
124 }
125 
126 uint64_t
read(const TSIOBuffer & b,std::string & o,const int64_t l=0)127 read(const TSIOBuffer &b, std::string &o, const int64_t l = 0)
128 {
129   TSIOBufferReader reader = TSIOBufferReaderAlloc(b);
130   const uint64_t length   = read(reader, o);
131   TSIOBufferReaderFree(reader);
132   return length;
133 }
134 
135 class Handler
136 {
137   int64_t length;
138   struct timeval start;
139   std::string response;
140 
141 public:
142   const std::string url;
143 
Handler(std::string u)144   Handler(std::string u) : length(0)
145   {
146     assert(!u.empty());
147     const_cast<std::string &>(url).swap(u);
148     gettimeofday(&start, nullptr);
149   }
150 
151   void
error()152   error()
153   {
154     TSError("[" PLUGIN_TAG "] error when communicating with \"%s\"\n", url.c_str());
155     TSStatIntIncrement(statistics.failures, 1);
156   }
157 
158   void
timeout()159   timeout()
160   {
161     TSError("[" PLUGIN_TAG "] timeout when communicating with \"%s\"\n", url.c_str());
162     TSStatIntIncrement(statistics.timeouts, 1);
163   }
164 
165   void
header(const TSMBuffer b,const TSMLoc l)166   header(const TSMBuffer b, const TSMLoc l)
167   {
168     if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
169       const TSIOBuffer buffer = TSIOBufferCreate();
170       TSHttpHdrPrint(b, l, buffer);
171       std::string buf;
172       read(buffer, buf);
173       TSDebug(PLUGIN_TAG, "Response header for \"%s\" was:\n%s", url.c_str(), buf.c_str());
174       TSIOBufferDestroy(buffer);
175     }
176   }
177 
178   void
data(const TSIOBufferReader r,const int64_t l)179   data(const TSIOBufferReader r, const int64_t l)
180   {
181     length += l;
182     if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
183       std::string buffer;
184       const uint64_t length = read(r, buffer, l);
185       response += buffer;
186       TSDebug(PLUGIN_TAG, "Receiving response chunk \"%s\" of %" PRIu64 " bytes", buffer.c_str(), length);
187     }
188   }
189 
190   void
done()191   done()
192   {
193     struct timeval end;
194 
195     gettimeofday(&end, nullptr);
196 
197     if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
198       TSDebug(PLUGIN_TAG, "Response for \"%s\" was:\n%s", url.c_str(), response.c_str());
199     }
200 
201     const long diff = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
202 
203     TSStatIntIncrement(statistics.hits, 1);
204     TSStatIntIncrement(statistics.time, diff);
205     TSStatIntIncrement(statistics.size, length);
206   }
207 };
208 
209 void
generateRequests(const Origins & o,const TSMBuffer buffer,const TSMLoc location,Requests & r)210 generateRequests(const Origins &o, const TSMBuffer buffer, const TSMLoc location, Requests &r)
211 {
212   assert(!o.empty());
213   assert(buffer != nullptr);
214   assert(location != nullptr);
215 
216   Origins::const_iterator iterator  = o.begin();
217   const Origins::const_iterator end = o.end();
218 
219   OriginalRequest request(buffer, location);
220   request.urlScheme("");
221   request.urlHost("");
222   request.xMultiplexerHeader("copy");
223 
224   for (; iterator != end; ++iterator) {
225     const std::string &host = *iterator;
226     assert(!host.empty());
227     request.hostHeader(host);
228     r.push_back(Request(host, buffer, location));
229   }
230 }
231 
232 void
addBody(Requests & r,const TSIOBufferReader re)233 addBody(Requests &r, const TSIOBufferReader re)
234 {
235   assert(re != nullptr);
236   Requests::iterator iterator  = r.begin();
237   const Requests::iterator end = r.end();
238   const int64_t length         = TSIOBufferReaderAvail(re);
239   if (length == 0) {
240     return;
241   }
242   assert(length > 0);
243   for (; iterator != end; ++iterator) {
244     assert(iterator->io.get() != nullptr);
245     const int64_t size = copy(re, iterator->io->buffer);
246     assert(size == length);
247     iterator->length += size;
248   }
249 }
250 
251 void
dispatch(Requests & r,const int t)252 dispatch(Requests &r, const int t)
253 {
254   Requests::iterator iterator  = r.begin();
255   const Requests::iterator end = r.end();
256   for (; iterator != end; ++iterator) {
257     assert(iterator->io.get() != nullptr);
258     if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
259       TSDebug(PLUGIN_TAG, "Dispatching %i bytes to \"%s\"", iterator->length, iterator->host.c_str());
260       std::string b;
261       read(iterator->io->reader, b);
262       assert(b.size() == static_cast<uint64_t>(iterator->length));
263       TSDebug(PLUGIN_TAG, "%s", b.c_str());
264     }
265     // forwarding iterator->io pointer ownership
266     ats::get(iterator->io.release(), iterator->length, Handler(iterator->host), t);
267   }
268 }
269