1 /** @file
2 
3   Inlines base64 images from the ATS cache
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #pragma once
25 
26 #include <cassert>
27 #include <limits>
28 #include <list>
29 #include <memory>
30 #include <ts/ts.h>
31 
32 #ifdef NDEBUG
33 #define CHECK(X) X
34 #else
35 #define CHECK(X)                                         \
36   {                                                      \
37     const TSReturnCode r = static_cast<TSReturnCode>(X); \
38     assert(r == TS_SUCCESS);                             \
39   }
40 #endif
41 
42 namespace ats
43 {
44 namespace io
45 {
46   // TODO(dmorilha): dislike this
47   struct IO {
48     TSIOBuffer buffer;
49     TSIOBufferReader reader;
50     TSVIO vio = nullptr;
51 
~IOats::io::IO52     ~IO()
53     {
54       consume();
55       assert(reader != nullptr);
56       TSIOBufferReaderFree(reader);
57       assert(buffer != nullptr);
58       TSIOBufferDestroy(buffer);
59     }
60 
IOats::io::IO61     IO() : buffer(TSIOBufferCreate()), reader(TSIOBufferReaderAlloc(buffer)) {}
IOats::io::IO62     IO(const TSIOBuffer &b) : buffer(b), reader(TSIOBufferReaderAlloc(buffer)) { assert(buffer != nullptr); }
63     static IO *read(TSVConn, TSCont, const int64_t);
64 
65     static IO *
readats::io::IO66     read(TSVConn v, TSCont c)
67     {
68       return IO::read(v, c, std::numeric_limits<int64_t>::max());
69     }
70 
71     static IO *write(TSVConn, TSCont, const int64_t);
72 
73     static IO *
writeats::io::IO74     write(TSVConn v, TSCont c)
75     {
76       return IO::write(v, c, std::numeric_limits<int64_t>::max());
77     }
78 
79     uint64_t copy(const std::string &) const;
80 
81     int64_t consume() const;
82 
83     int64_t done() const;
84   };
85 
86   struct ReaderSize {
87     const TSIOBufferReader reader;
88     const size_t offset;
89     const size_t size;
90 
ReaderSizeats::io::ReaderSize91     ReaderSize(const TSIOBufferReader r, const size_t s, const size_t o = 0) : reader(r), offset(o), size(s)
92     {
93       assert(reader != nullptr);
94     }
95 
96     ReaderSize(const ReaderSize &) = delete;
97     ReaderSize &operator=(const ReaderSize &) = delete;
98     void *operator new(const std::size_t)     = delete;
99   };
100 
101   struct ReaderOffset {
102     const TSIOBufferReader reader;
103     const size_t offset;
104 
ReaderOffsetats::io::ReaderOffset105     ReaderOffset(const TSIOBufferReader r, const size_t o) : reader(r), offset(o) { assert(reader != nullptr); }
106     ReaderOffset(const ReaderOffset &) = delete;
107     ReaderOffset &operator=(const ReaderOffset &) = delete;
108     void *operator new(const std::size_t)         = delete;
109   };
110 
111   struct WriteOperation;
112 
113   typedef std::shared_ptr<WriteOperation> WriteOperationPointer;
114   typedef std::weak_ptr<WriteOperation> WriteOperationWeakPointer;
115 
116   struct Lock {
117     const TSMutex mutex_ = nullptr;
118 
~Lockats::io::Lock119     ~Lock()
120     {
121       if (mutex_ != nullptr) {
122         TSMutexUnlock(mutex_);
123       }
124     }
125 
Lockats::io::Lock126     Lock(const TSMutex m) : mutex_(m)
127     {
128       if (mutex_ != nullptr) {
129         TSMutexLock(mutex_);
130       }
131     }
132 
133     // noncopyable
Lockats::io::Lock134     Lock() {}
135     Lock(const Lock &) = delete;
136 
Lockats::io::Lock137     Lock(Lock &&l) : mutex_(l.mutex_) { const_cast<TSMutex &>(l.mutex_) = nullptr; }
138     Lock &operator=(const Lock &) = delete;
139   };
140 
141   struct WriteOperation : std::enable_shared_from_this<WriteOperation> {
142     TSVConn vconnection_;
143     TSIOBuffer buffer_;
144     TSIOBufferReader reader_;
145     TSMutex mutex_;
146     TSCont continuation_;
147     TSVIO vio_;
148     TSAction action_;
149     const size_t timeout_;
150     size_t bytes_;
151     bool reenable_;
152 
153     static int Handle(TSCont, TSEvent, void *);
154     static WriteOperationWeakPointer Create(const TSVConn, const TSMutex mutex = nullptr, const size_t timeout = 0);
155 
156     ~WriteOperation();
157 
158     // noncopyable
159     WriteOperation(const WriteOperation &) = delete;
160     WriteOperation &operator=(const WriteOperation &) = delete;
161 
162     WriteOperation &operator<<(const TSIOBufferReader);
163     WriteOperation &operator<<(const ReaderSize &);
164     WriteOperation &operator<<(const ReaderOffset &);
165     WriteOperation &operator<<(const char *const);
166     WriteOperation &operator<<(const std::string &);
167 
168     void process(const size_t b = 0);
169     void close();
170     void abort();
171 
172   private:
173     WriteOperation(const TSVConn, const TSMutex, const size_t);
174   };
175 
176   struct Node;
177   typedef std::shared_ptr<Node> NodePointer;
178   typedef std::list<NodePointer> Nodes;
179 
180   struct IOSink;
181   typedef std::shared_ptr<IOSink> IOSinkPointer;
182 
183   struct Sink;
184   typedef std::shared_ptr<Sink> SinkPointer;
185 
186   struct Data;
187   typedef std::shared_ptr<Data> DataPointer;
188 
189   struct IOSink : std::enable_shared_from_this<IOSink> {
190     WriteOperationWeakPointer operation_;
191     DataPointer data_;
192 
193     ~IOSink();
194 
195     // noncopyable
196     IOSink(const IOSink &) = delete;
197     IOSink &operator=(const IOSink &) = delete;
198 
199     template <class T>
200     IOSink &
operator <<ats::io::IOSink201     operator<<(T &&t)
202     {
203       const WriteOperationPointer operation = operation_.lock();
204       if (operation) {
205         const Lock lock(operation->mutex_);
206         *operation << std::forward<T>(t);
207       }
208       return *this;
209     }
210 
211     template <class... A>
212     static IOSinkPointer
Createats::io::IOSink213     Create(A &&... a)
214     {
215       return IOSinkPointer(new IOSink(WriteOperation::Create(std::forward<A>(a)...)));
216     }
217 
218     void process();
219     SinkPointer branch();
220     Lock lock();
221     void abort();
222 
223   private:
IOSinkats::io::IOSink224     IOSink(WriteOperationWeakPointer &&p) : operation_(std::move(p)) {}
225   };
226 
227   struct Node {
228     typedef std::pair<size_t, bool> Result;
229     IOSinkPointer ioSink_;
~Nodeats::io::Node230     virtual ~Node() {}
231     virtual Node::Result process(const TSIOBuffer) = 0;
232   };
233 
234   struct StringNode : Node {
235     std::string string_;
StringNodeats::io::StringNode236     explicit StringNode(std::string &&s) : string_(std::move(s)) {}
237     Node::Result process(const TSIOBuffer) override;
238   };
239 
240   struct BufferNode : Node {
241     const TSIOBuffer buffer_;
242     const TSIOBufferReader reader_;
243 
~BufferNodeats::io::BufferNode244     ~BufferNode() override
245     {
246       assert(reader_ != nullptr);
247       TSIOBufferReaderFree(reader_);
248       assert(buffer_ != nullptr);
249       TSIOBufferDestroy(buffer_);
250     }
251 
BufferNodeats::io::BufferNode252     BufferNode() : buffer_(TSIOBufferCreate()), reader_(TSIOBufferReaderAlloc(buffer_))
253     {
254       assert(buffer_ != nullptr);
255       assert(reader_ != nullptr);
256     }
257 
258     // noncopyable
259     BufferNode(const BufferNode &) = delete;
260     BufferNode &operator=(const BufferNode &) = delete;
261     BufferNode &operator<<(const TSIOBufferReader);
262     BufferNode &operator<<(const ReaderSize &);
263     BufferNode &operator<<(const ReaderOffset &);
264     BufferNode &operator<<(const char *const);
265     BufferNode &operator<<(const std::string &);
266     Node::Result process(const TSIOBuffer) override;
267   };
268 
269   struct Data : Node {
270     Nodes nodes_;
271     IOSinkPointer root_;
272     bool first_;
273 
Dataats::io::Data274     template <class T> Data(T &&t) : root_(std::forward<T>(t)), first_(false) {}
275     // noncopyable
276     Data(const Data &) = delete;
277     Data &operator=(const Data &) = delete;
278 
279     Node::Result process(const TSIOBuffer) override;
280   };
281 
282   struct Sink {
283     DataPointer data_;
284 
285     ~Sink();
286 
Sinkats::io::Sink287     template <class... A> Sink(A &&... a) : data_(std::forward<A>(a)...) {}
288     // noncopyable
289     Sink(const Sink &) = delete;
290     Sink &operator=(const Sink &) = delete;
291 
292     SinkPointer branch();
293 
294     Sink &operator<<(std::string &&);
295 
296     template <class T>
297     Sink &
operator <<ats::io::Sink298     operator<<(T &&t)
299     {
300       if (data_) {
301         const Lock lock = data_->root_->lock();
302         assert(data_->root_ != nullptr);
303         const bool empty = data_->nodes_.empty();
304         if (data_->first_ && empty) {
305           // TSDebug(PLUGIN_TAG, "flushing");
306           assert(data_->root_);
307           *data_->root_ << std::forward<T>(t);
308         } else {
309           // TSDebug(PLUGIN_TAG, "buffering");
310           BufferNode *buffer = nullptr;
311           if (!empty) {
312             buffer = dynamic_cast<BufferNode *>(data_->nodes_.back().get());
313           }
314           if (buffer == nullptr) {
315             data_->nodes_.emplace_back(new BufferNode());
316             buffer = reinterpret_cast<BufferNode *>(data_->nodes_.back().get());
317           }
318           assert(buffer != nullptr);
319           *buffer << std::forward<T>(t);
320         }
321       }
322       return *this;
323     }
324   };
325 
326 } // namespace io
327 } // namespace ats
328