1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26    OneWayTunnel.cc
27 
28    A OneWayTunnel is a module that connects two virtual connections, a
29    source vc and a target vc, and copies the data between source and target.
30 
31    This class used to be called HttpTunnelVC, but it doesn't seem to have
32    anything to do with HTTP, so it has been renamed to OneWayTunnel.
33  ****************************************************************************/
34 
35 #include "P_EventSystem.h"
36 #include "I_OneWayTunnel.h"
37 
38 // #define TEST
39 
40 //////////////////////////////////////////////////////////////////////////////
41 //
42 //      OneWayTunnel::OneWayTunnel()
43 //
44 //////////////////////////////////////////////////////////////////////////////
45 
46 ClassAllocator<OneWayTunnel> OneWayTunnelAllocator("OneWayTunnelAllocator");
47 
48 inline void
transfer_data(MIOBufferAccessor & in_buf,MIOBufferAccessor & out_buf)49 transfer_data(MIOBufferAccessor &in_buf, MIOBufferAccessor &out_buf)
50 {
51   ink_release_assert(!"Not Implemented.");
52 
53   int64_t n = in_buf.reader()->read_avail();
54   int64_t o = out_buf.writer()->write_avail();
55 
56   if (n > o) {
57     n = o;
58   }
59   if (!n) {
60     return;
61   }
62   memcpy(in_buf.reader()->start(), out_buf.writer()->end(), n);
63   in_buf.reader()->consume(n);
64   out_buf.writer()->fill(n);
65 }
66 
OneWayTunnel()67 OneWayTunnel::OneWayTunnel() : Continuation(nullptr) {}
68 
69 OneWayTunnel *
OneWayTunnel_alloc()70 OneWayTunnel::OneWayTunnel_alloc()
71 {
72   return OneWayTunnelAllocator.alloc();
73 }
74 
75 void
OneWayTunnel_free(OneWayTunnel * pOWT)76 OneWayTunnel::OneWayTunnel_free(OneWayTunnel *pOWT)
77 {
78   pOWT->mutex = nullptr;
79   OneWayTunnelAllocator.free(pOWT);
80 }
81 
82 void
SetupTwoWayTunnel(OneWayTunnel * east,OneWayTunnel * west)83 OneWayTunnel::SetupTwoWayTunnel(OneWayTunnel *east, OneWayTunnel *west)
84 {
85   // make sure the both use the same mutex
86   ink_assert(east->mutex == west->mutex);
87 
88   east->tunnel_peer = west;
89   west->tunnel_peer = east;
90 }
91 
~OneWayTunnel()92 OneWayTunnel::~OneWayTunnel() {}
93 
OneWayTunnel(Continuation * aCont,Transform_fn aManipulate_fn,bool aclose_source,bool aclose_target)94 OneWayTunnel::OneWayTunnel(Continuation *aCont, Transform_fn aManipulate_fn, bool aclose_source, bool aclose_target)
95   : Continuation(aCont ? aCont->mutex.get() : new_ProxyMutex()),
96     cont(aCont),
97     manipulate_fn(aManipulate_fn),
98     n_connections(2),
99     lerrno(0),
100     single_buffer(true),
101     close_source(aclose_source),
102     close_target(aclose_target),
103     tunnel_till_done(false),
104     free_vcs(false)
105 {
106   ink_assert(!"This form of OneWayTunnel() constructor not supported");
107 }
108 
109 void
init(VConnection * vcSource,VConnection * vcTarget,Continuation * aCont,int size_estimate,ProxyMutex * aMutex,int64_t nbytes,bool asingle_buffer,bool aclose_source,bool aclose_target,Transform_fn aManipulate_fn,int water_mark)110 OneWayTunnel::init(VConnection *vcSource, VConnection *vcTarget, Continuation *aCont, int size_estimate, ProxyMutex *aMutex,
111                    int64_t nbytes, bool asingle_buffer, bool aclose_source, bool aclose_target, Transform_fn aManipulate_fn,
112                    int water_mark)
113 {
114   mutex            = aCont ? aCont->mutex.get() : (aMutex ? aMutex : new_ProxyMutex());
115   cont             = aMutex ? nullptr : aCont;
116   single_buffer    = asingle_buffer;
117   manipulate_fn    = aManipulate_fn;
118   n_connections    = 2;
119   close_source     = aclose_source;
120   close_target     = aclose_target;
121   lerrno           = 0;
122   tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
123 
124   SET_HANDLER(&OneWayTunnel::startEvent);
125 
126   int64_t size_index = 0;
127 
128   if (size_estimate) {
129     size_index = buffer_size_to_index(size_estimate);
130   } else {
131     size_index = default_large_iobuffer_size;
132   }
133 
134   Debug("one_way_tunnel", "buffer size index [%" PRId64 "] [%d]", size_index, size_estimate);
135 
136   // enqueue read request on vcSource.
137   MIOBuffer *buf1 = new_MIOBuffer(size_index);
138   MIOBuffer *buf2 = nullptr;
139   if (single_buffer) {
140     buf2 = buf1;
141   } else {
142     buf2 = new_MIOBuffer(size_index);
143   }
144 
145   buf1->water_mark = water_mark;
146 
147   SCOPED_MUTEX_LOCK(lock, mutex, this_ethread());
148   vioSource = vcSource->do_io_read(this, nbytes, buf1);
149   vioTarget = vcTarget->do_io_write(this, nbytes, buf2->alloc_reader(), false);
150   ink_assert(vioSource && vioTarget);
151 
152   return;
153 }
154 
155 void
init(VConnection * vcSource,VConnection * vcTarget,Continuation * aCont,VIO * SourceVio,IOBufferReader * reader,bool aclose_source,bool aclose_target)156 OneWayTunnel::init(VConnection *vcSource, VConnection *vcTarget, Continuation *aCont, VIO *SourceVio, IOBufferReader *reader,
157                    bool aclose_source, bool aclose_target)
158 {
159   (void)vcSource;
160   mutex            = aCont ? aCont->mutex : make_ptr(new_ProxyMutex());
161   cont             = aCont;
162   single_buffer    = true;
163   manipulate_fn    = nullptr;
164   n_connections    = 2;
165   close_source     = aclose_source;
166   close_target     = aclose_target;
167   tunnel_till_done = true;
168 
169   // Prior to constructing the OneWayTunnel, we initiated a do_io_read()
170   // on the source VC.  We wish to use the same MIO buffer in the tunnel.
171 
172   // do_io_read() already posted on vcSource.
173   SET_HANDLER(&OneWayTunnel::startEvent);
174 
175   SourceVio->set_continuation(this);
176   SCOPED_MUTEX_LOCK(lock, mutex, this_ethread());
177   vioSource = SourceVio;
178 
179   vioTarget = vcTarget->do_io_write(this, TUNNEL_TILL_DONE, reader, false);
180   ink_assert(vioSource && vioTarget);
181 }
182 
183 void
init(Continuation * aCont,VIO * SourceVio,VIO * TargetVio,bool aclose_source,bool aclose_target)184 OneWayTunnel::init(Continuation *aCont, VIO *SourceVio, VIO *TargetVio, bool aclose_source, bool aclose_target)
185 {
186   mutex            = aCont ? aCont->mutex : make_ptr(new_ProxyMutex());
187   cont             = aCont;
188   single_buffer    = true;
189   manipulate_fn    = nullptr;
190   n_connections    = 2;
191   close_source     = aclose_source;
192   close_target     = aclose_target;
193   tunnel_till_done = true;
194 
195   // do_io_read() read already posted on vcSource.
196   // do_io_write() already posted on vcTarget
197   SET_HANDLER(&OneWayTunnel::startEvent);
198 
199   ink_assert(SourceVio && TargetVio);
200 
201   SourceVio->set_continuation(this);
202   TargetVio->set_continuation(this);
203   vioSource = SourceVio;
204   vioTarget = TargetVio;
205 }
206 
207 void
transform(MIOBufferAccessor & in_buf,MIOBufferAccessor & out_buf)208 OneWayTunnel::transform(MIOBufferAccessor &in_buf, MIOBufferAccessor &out_buf)
209 {
210   if (manipulate_fn) {
211     manipulate_fn(in_buf, out_buf);
212   } else if (in_buf.writer() != out_buf.writer()) {
213     transfer_data(in_buf, out_buf);
214   }
215 }
216 
217 //////////////////////////////////////////////////////////////////////////////
218 //
219 //      int OneWayTunnel::startEvent()
220 //
221 //////////////////////////////////////////////////////////////////////////////
222 
223 //
224 // tunnel was invoked with an event
225 //
226 int
startEvent(int event,void * data)227 OneWayTunnel::startEvent(int event, void *data)
228 {
229   VIO *vio   = static_cast<VIO *>(data);
230   int ret    = VC_EVENT_DONE;
231   int result = 0;
232 
233 #ifdef TEST
234   const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
235   printf("OneWayTunnel --- %s received from %s VC\n", event_name, event_origin);
236 #endif
237 
238   if (!vioTarget) {
239     goto Lerror;
240   }
241 
242   // handle the event
243   //
244   switch (event) {
245   case ONE_WAY_TUNNEL_EVENT_PEER_CLOSE:
246     /* This event is sent out by our peer */
247     ink_assert(tunnel_peer);
248     tunnel_peer = nullptr;
249     free_vcs    = false;
250     goto Ldone;
251     break; // fix coverity
252 
253   case VC_EVENT_READ_READY:
254     transform(vioSource->buffer, vioTarget->buffer);
255     vioTarget->reenable();
256     ret = VC_EVENT_CONT;
257     break;
258 
259   case VC_EVENT_WRITE_READY:
260     if (vioSource) {
261       vioSource->reenable();
262     }
263     ret = VC_EVENT_CONT;
264     break;
265 
266   case VC_EVENT_EOS:
267     if (!tunnel_till_done && vio->ntodo()) {
268       goto Lerror;
269     }
270     if (vio == vioSource) {
271       transform(vioSource->buffer, vioTarget->buffer);
272       goto Lread_complete;
273     } else {
274       goto Ldone;
275     }
276     break; // fix coverity
277 
278   case VC_EVENT_READ_COMPLETE:
279   Lread_complete:
280     // set write nbytes to the current buffer size
281     //
282     vioTarget->nbytes = vioTarget->ndone + vioTarget->buffer.reader()->read_avail();
283     if (vioTarget->nbytes == vioTarget->ndone) {
284       goto Ldone;
285     }
286     vioTarget->reenable();
287     if (!tunnel_peer) {
288       close_source_vio(0);
289     }
290     break;
291 
292   case VC_EVENT_ERROR:
293   Lerror:
294     lerrno = (static_cast<VIO *>(data))->vc_server->lerrno;
295     // fallthrough
296 
297   case VC_EVENT_INACTIVITY_TIMEOUT:
298   case VC_EVENT_ACTIVE_TIMEOUT:
299     result = -1;
300     // fallthrough
301 
302   case VC_EVENT_WRITE_COMPLETE:
303   Ldone:
304     if (tunnel_peer) {
305       // inform the peer:
306       tunnel_peer->startEvent(ONE_WAY_TUNNEL_EVENT_PEER_CLOSE, data);
307     }
308     close_source_vio(result);
309     close_target_vio(result);
310     connection_closed(result);
311     break;
312 
313   default:
314     ink_assert(!"bad case");
315     ret = VC_EVENT_CONT;
316     break;
317   }
318 #ifdef TEST
319   printf("    (OneWayTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
320 #endif
321   return ret;
322 }
323 
324 // If result is Non-zero, the vc should be aborted.
325 void
close_source_vio(int result)326 OneWayTunnel::close_source_vio(int result)
327 {
328   if (vioSource) {
329     if (last_connection() || !single_buffer) {
330       free_MIOBuffer(vioSource->buffer.writer());
331       vioSource->buffer.clear();
332     }
333     if (close_source && free_vcs) {
334       vioSource->vc_server->do_io_close(result ? lerrno : -1);
335     }
336     vioSource = nullptr;
337     n_connections--;
338   }
339 }
340 
341 void
close_target_vio(int result,VIO * vio)342 OneWayTunnel::close_target_vio(int result, VIO *vio)
343 {
344   (void)vio;
345   if (vioTarget) {
346     if (last_connection() || !single_buffer) {
347       free_MIOBuffer(vioTarget->buffer.writer());
348       vioTarget->buffer.clear();
349     }
350     if (close_target && free_vcs) {
351       vioTarget->vc_server->do_io_close(result ? lerrno : -1);
352     }
353     vioTarget = nullptr;
354     n_connections--;
355   }
356 }
357 
358 //////////////////////////////////////////////////////////////////////////////
359 //
360 //      void OneWayTunnel::connection_closed
361 //
362 //////////////////////////////////////////////////////////////////////////////
363 void
connection_closed(int result)364 OneWayTunnel::connection_closed(int result)
365 {
366   if (cont) {
367 #ifdef TEST
368     cout << "OneWayTunnel::connection_closed() ... calling cont" << endl;
369 #endif
370     cont->handleEvent(result ? VC_EVENT_ERROR : VC_EVENT_EOS, this);
371   } else {
372     OneWayTunnel_free(this);
373   }
374 }
375 
376 void
reenable_all()377 OneWayTunnel::reenable_all()
378 {
379   if (vioSource) {
380     vioSource->reenable();
381   }
382   if (vioTarget) {
383     vioTarget->reenable();
384   }
385 }
386 
387 bool
last_connection()388 OneWayTunnel::last_connection()
389 {
390   return n_connections == 1;
391 }
392