1 /** @file
2 
3   Inlines base64 images from the ATS cache
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 #include <cstring>
24 #include <iostream>
25 #include <limits>
26 
27 #include "ts.h"
28 
29 namespace ats
30 {
31 namespace io
32 {
33   IO *
read(TSVConn v,TSCont c,const int64_t s)34   IO::read(TSVConn v, TSCont c, const int64_t s)
35   {
36     assert(s > 0);
37     IO *io  = new IO();
38     io->vio = TSVConnRead(v, c, io->buffer, s);
39     return io;
40   }
41 
42   IO *
write(TSVConn v,TSCont c,const int64_t s)43   IO::write(TSVConn v, TSCont c, const int64_t s)
44   {
45     assert(s > 0);
46     IO *io  = new IO();
47     io->vio = TSVConnWrite(v, c, io->reader, s);
48     return io;
49   }
50 
51   uint64_t
copy(const std::string & s) const52   IO::copy(const std::string &s) const
53   {
54     assert(buffer != nullptr);
55     const uint64_t size = TSIOBufferWrite(buffer, s.data(), s.size());
56     assert(size == s.size());
57     return size;
58   }
59 
60   int64_t
consume() const61   IO::consume() const
62   {
63     assert(reader != nullptr);
64     const int64_t available = TSIOBufferReaderAvail(reader);
65     if (available > 0) {
66       TSIOBufferReaderConsume(reader, available);
67     }
68     return available;
69   }
70 
71   int64_t
done() const72   IO::done() const
73   {
74     assert(vio != nullptr);
75     assert(reader != nullptr);
76     const int64_t d = TSIOBufferReaderAvail(reader) + TSVIONDoneGet(vio);
77     TSVIONDoneSet(vio, d);
78     return d;
79   }
80 
~WriteOperation()81   WriteOperation::~WriteOperation()
82   {
83     assert(mutex_ != nullptr);
84     const Lock lock(mutex_);
85     TSDebug(PLUGIN_TAG, "~WriteOperation");
86 
87     vio_ = nullptr;
88 
89     if (action_ != nullptr) {
90       TSActionCancel(action_);
91     }
92 
93     assert(reader_ != nullptr);
94     TSIOBufferReaderFree(reader_);
95 
96     assert(buffer_ != nullptr);
97     TSIOBufferDestroy(buffer_);
98 
99     assert(continuation_ != nullptr);
100     TSContDestroy(continuation_);
101 
102     assert(vconnection_ != nullptr);
103     TSVConnShutdown(vconnection_, 0, 1);
104   }
105 
WriteOperation(const TSVConn v,const TSMutex m,const size_t t)106   WriteOperation::WriteOperation(const TSVConn v, const TSMutex m, const size_t t)
107     : vconnection_(v),
108       buffer_(TSIOBufferCreate()),
109       reader_(TSIOBufferReaderAlloc(buffer_)),
110       mutex_(m != nullptr ? m : TSMutexCreate()),
111       continuation_(TSContCreate(WriteOperation::Handle, mutex_)),
112       vio_(TSVConnWrite(v, continuation_, reader_, std::numeric_limits<int64_t>::max())),
113       action_(nullptr),
114       timeout_(t),
115       bytes_(0),
116       reenable_(true)
117   {
118     assert(vconnection_ != nullptr);
119     assert(buffer_ != nullptr);
120     assert(reader_ != nullptr);
121     assert(mutex_ != nullptr);
122     assert(continuation_ != nullptr);
123     assert(vio_ != nullptr);
124 
125     if (timeout_ > 0) {
126       action_ = TSContScheduleOnPool(continuation_, timeout_, TS_THREAD_POOL_NET);
127       assert(action_ != nullptr);
128     }
129   }
130 
131   void
process(const size_t b)132   WriteOperation::process(const size_t b)
133   {
134     assert(mutex_);
135     const Lock lock(mutex_);
136     bytes_ += b;
137     if (vio_ != nullptr && TSVIOContGet(vio_) != nullptr) {
138       if (reenable_) {
139         TSVIOReenable(vio_);
140         reenable_ = false;
141       }
142     } else {
143       vio_ = nullptr;
144     }
145   }
146 
147   int
Handle(const TSCont c,const TSEvent e,void * d)148   WriteOperation::Handle(const TSCont c, const TSEvent e, void *d)
149   {
150     assert(c != nullptr);
151     WriteOperationPointer *const p = static_cast<WriteOperationPointer *>(TSContDataGet(c));
152 
153     if (TS_EVENT_VCONN_WRITE_COMPLETE == e) {
154       TSDebug(PLUGIN_TAG, "TS_EVENT_VCONN_WRITE_COMPLETE");
155       if (p != nullptr) {
156         TSContDataSet(c, nullptr);
157         delete p;
158       }
159       return TS_SUCCESS;
160     }
161 
162     assert(p != nullptr);
163     assert(*p);
164     WriteOperation &operation = **p;
165     assert(operation.continuation_ == c);
166     assert(operation.vconnection_ != nullptr);
167     assert(d != nullptr);
168     assert(TS_EVENT_ERROR == e || TS_EVENT_TIMEOUT == e || TS_EVENT_VCONN_WRITE_READY == e);
169 
170     switch (e) {
171     case TS_EVENT_ERROR:
172       TSError("[" PLUGIN_TAG "] TS_EVENT_ERROR from producer");
173       goto handle_error; // handle errors as timeouts
174 
175     case TS_EVENT_TIMEOUT:
176       TSError("[" PLUGIN_TAG "] TS_EVENT_TIMEOUT from producer");
177 
178     handle_error:
179       operation.close();
180       assert(operation.action_ != nullptr);
181       operation.action_ = nullptr;
182       /*
183       TSContDataSet(c, NULL);
184       delete p;
185       */
186       break;
187     case TS_EVENT_VCONN_WRITE_READY:
188       operation.reenable_ = true;
189       break;
190 
191     default:
192       TSError("[" PLUGIN_TAG "] Unknown event: %i", e);
193       assert(false); // UNREACHABLE
194       break;
195     }
196 
197     return TS_SUCCESS;
198   }
199 
200   WriteOperationWeakPointer
Create(const TSVConn v,const TSMutex m,const size_t t)201   WriteOperation::Create(const TSVConn v, const TSMutex m, const size_t t)
202   {
203     WriteOperation *const operation = new WriteOperation(v, m, t);
204     assert(operation != nullptr);
205     WriteOperationPointer *const pointer = new WriteOperationPointer(operation);
206     assert(pointer != nullptr);
207     TSContDataSet(operation->continuation_, pointer);
208 
209 #ifndef NDEBUG
210     {
211       WriteOperationPointer *const p = static_cast<WriteOperationPointer *>(TSContDataGet(operation->continuation_));
212       assert(pointer == p);
213       assert((*p).get() == operation);
214     }
215 #endif
216 
217     return WriteOperationWeakPointer(*pointer);
218   }
219 
220   WriteOperation &
operator <<(const TSIOBufferReader r)221   WriteOperation::operator<<(const TSIOBufferReader r)
222   {
223     assert(r != nullptr);
224     process(TSIOBufferCopy(buffer_, r, TSIOBufferReaderAvail(r), 0));
225     return *this;
226   }
227 
228   WriteOperation &
operator <<(const ReaderSize & r)229   WriteOperation::operator<<(const ReaderSize &r)
230   {
231     assert(r.reader != nullptr);
232     process(TSIOBufferCopy(buffer_, r.reader, r.size, r.offset));
233     return *this;
234   }
235 
236   WriteOperation &
operator <<(const ReaderOffset & r)237   WriteOperation::operator<<(const ReaderOffset &r)
238   {
239     assert(r.reader != nullptr);
240     process(TSIOBufferCopy(buffer_, r.reader, TSIOBufferReaderAvail(r.reader), r.offset));
241     return *this;
242   }
243 
244   WriteOperation &
operator <<(const char * const s)245   WriteOperation::operator<<(const char *const s)
246   {
247     assert(s != nullptr);
248     process(TSIOBufferWrite(buffer_, s, strlen(s)));
249     return *this;
250   }
251 
252   WriteOperation &
operator <<(const std::string & s)253   WriteOperation::operator<<(const std::string &s)
254   {
255     process(TSIOBufferWrite(buffer_, s.data(), s.size()));
256     return *this;
257   }
258 
259   void
close()260   WriteOperation::close()
261   {
262     assert(mutex_ != nullptr);
263     const Lock lock(mutex_);
264     if (vio_ != nullptr && TSVIOContGet(vio_) != nullptr) {
265       TSVIONBytesSet(vio_, bytes_);
266       TSVIOReenable(vio_);
267     }
268     vio_ = nullptr;
269   }
270 
271   void
abort()272   WriteOperation::abort()
273   {
274     assert(mutex_ != nullptr);
275     const Lock lock(mutex_);
276     vio_ = nullptr;
277   }
278 
~IOSink()279   IOSink::~IOSink()
280   {
281     // TSDebug(PLUGIN_TAG, "~IOSink %p", this);
282     const WriteOperationPointer operation = operation_.lock();
283     if (operation) {
284       operation_.reset();
285       operation->close();
286     }
287   }
288 
289   void
process()290   IOSink::process()
291   {
292     const WriteOperationPointer operation = operation_.lock();
293 
294     if (!data_ || !operation) {
295       return;
296     }
297 
298     assert(operation->mutex_ != nullptr);
299     const Lock lock(operation->mutex_);
300 
301     assert(operation->buffer_ != nullptr);
302     const Node::Result result = data_->process(operation->buffer_);
303     operation->bytes_ += result.first;
304     operation->process();
305     if (result.second && data_.unique()) {
306       data_.reset();
307     }
308   }
309 
310   Lock
lock()311   IOSink::lock()
312   {
313     const WriteOperationPointer operation = operation_.lock();
314 
315     if (!operation) {
316       return Lock();
317     }
318 
319     assert(operation != nullptr);
320     assert(operation->mutex_ != nullptr);
321 
322     return Lock(operation->mutex_);
323   }
324 
325   void
abort()326   IOSink::abort()
327   {
328     const WriteOperationPointer operation = operation_.lock();
329     if (operation) {
330       operation->abort();
331     }
332   }
333 
334   BufferNode &
operator <<(const TSIOBufferReader r)335   BufferNode::operator<<(const TSIOBufferReader r)
336   {
337     assert(r != nullptr);
338     TSIOBufferCopy(buffer_, r, TSIOBufferReaderAvail(r), 0);
339     return *this;
340   }
341 
342   BufferNode &
operator <<(const ReaderSize & r)343   BufferNode::operator<<(const ReaderSize &r)
344   {
345     assert(r.reader != nullptr);
346     TSIOBufferCopy(buffer_, r.reader, r.size, r.offset);
347     return *this;
348   }
349 
350   BufferNode &
operator <<(const ReaderOffset & r)351   BufferNode::operator<<(const ReaderOffset &r)
352   {
353     assert(r.reader != nullptr);
354     TSIOBufferCopy(buffer_, r.reader, TSIOBufferReaderAvail(r.reader), r.offset);
355     return *this;
356   }
357 
358   BufferNode &
operator <<(const char * const s)359   BufferNode::operator<<(const char *const s)
360   {
361     assert(s != nullptr);
362     TSIOBufferWrite(buffer_, s, strlen(s));
363     return *this;
364   }
365 
366   BufferNode &
operator <<(const std::string & s)367   BufferNode::operator<<(const std::string &s)
368   {
369     TSIOBufferWrite(buffer_, s.data(), s.size());
370     return *this;
371   }
372 
373   Node::Result
process(const TSIOBuffer b)374   BufferNode::process(const TSIOBuffer b)
375   {
376     assert(b != nullptr);
377     assert(buffer_ != nullptr);
378     assert(reader_ != nullptr);
379     const size_t available = TSIOBufferReaderAvail(reader_);
380     const size_t copied    = TSIOBufferCopy(b, reader_, available, 0);
381     assert(copied == available);
382     TSIOBufferReaderConsume(reader_, copied);
383     // TSDebug(PLUGIN_TAG, "BufferNode::process %lu", copied);
384     return Node::Result(copied, TSIOBufferReaderAvail(reader_) == 0);
385   }
386 
387   Node::Result
process(const TSIOBuffer b)388   StringNode::process(const TSIOBuffer b)
389   {
390     assert(b != nullptr);
391     const size_t copied = TSIOBufferWrite(b, string_.data(), string_.size());
392     assert(copied == string_.size());
393     return Node::Result(copied, true);
394   }
395 
396   SinkPointer
branch()397   IOSink::branch()
398   {
399     if (!data_) {
400       data_.reset(new Data(shared_from_this()));
401       data_->first_ = true;
402     }
403     SinkPointer pointer(new Sink(data_));
404     // TSDebug(PLUGIN_TAG, "IOSink branch %p", pointer.get());
405     return pointer;
406   }
407 
408   SinkPointer
branch()409   Sink::branch()
410   {
411     DataPointer data;
412     if (data_) {
413       const bool first = data_->nodes_.empty();
414       data.reset(new Data(data_->root_));
415       assert(data);
416       data_->nodes_.push_back(data);
417       assert(!data_->nodes_.empty());
418       data->first_ = first;
419     }
420     SinkPointer pointer(new Sink(data));
421     // TSDebug(PLUGIN_TAG, "Sink branch %p", pointer.get());
422     return pointer;
423   }
424 
~Sink()425   Sink::~Sink()
426   {
427     // TSDebug(PLUGIN_TAG, "~Sink %p", this);
428     assert(data_);
429     assert(data_.use_count() >= 1);
430     assert(data_->root_);
431     const IOSinkPointer root(std::move(data_->root_));
432     data_.reset();
433     root->process();
434   }
435 
436   Node::Result
process(const TSIOBuffer b)437   Data::process(const TSIOBuffer b)
438   {
439     assert(b != nullptr);
440     int64_t length = 0;
441 
442     const Nodes::iterator begin = nodes_.begin(), end = nodes_.end();
443 
444     Nodes::iterator it = begin;
445 
446     for (; it != end; ++it) {
447       assert(*it != nullptr);
448       const Node::Result result = (*it)->process(b);
449       length += result.first;
450       if (!result.second || !it->unique()) {
451         break;
452       }
453     }
454 
455     // TSDebug(PLUGIN_TAG, "Data::process %li", length);
456 
457     if (begin != it) {
458       // TSDebug(PLUGIN_TAG, "Data::process::erase");
459       nodes_.erase(begin, it);
460       if (it != end) {
461         Data *data = dynamic_cast<Data *>(it->get());
462         while (data != nullptr) {
463           // TSDebug(PLUGIN_TAG, "new first");
464           data->first_ = true;
465           if (data->nodes_.empty()) {
466             break;
467           }
468           assert(data->nodes_.front());
469           data = dynamic_cast<Data *>(data->nodes_.front().get());
470         }
471       }
472     }
473 
474     return Node::Result(length, nodes_.empty());
475   }
476 
477   Sink &
operator <<(std::string && s)478   Sink::operator<<(std::string &&s)
479   {
480     if (data_) {
481       data_->nodes_.emplace_back(new StringNode(std::move(s)));
482     }
483     return *this;
484   }
485 
486 } // namespace io
487 } // namespace ats
488