1 /** @file
2   Licensed to the Apache Software Foundation (ASF) under one
3   or more contributor license agreements.  See the NOTICE file
4   distributed with this work for additional information
5   regarding copyright ownership.  The ASF licenses this file
6   to you under the Apache License, Version 2.0 (the
7   "License"); you may not use this file except in compliance
8   with the License.  You may obtain a copy of the License at
9 
10       http://www.apache.org/licenses/LICENSE-2.0
11 
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17  */
18 
19 #include "transfer.h"
20 
21 int64_t
transfer_content_bytes(Data * const data)22 transfer_content_bytes(Data *const data)
23 {
24   // nothing to transfer if there's no source.
25   if (nullptr == data->m_upstream.m_read.m_reader) {
26     return 0;
27   }
28 
29   TSIOBufferReader const reader = data->m_upstream.m_read.m_reader;
30   TSIOBuffer const output_buf   = data->m_dnstream.m_write.m_iobuf;
31   TSVIO const output_vio        = data->m_dnstream.m_write.m_vio;
32 
33   int64_t consumed = 0; // input vio bytes visited
34   int64_t copied   = 0; // output bytes transferred
35 
36   bool const canWrite = data->m_dnstream.m_write.isOpen();
37   bool done           = false;
38 
39   TSIOBufferBlock block = TSIOBufferReaderStart(reader);
40 
41   while (!done && nullptr != block) {
42     int64_t bavail = TSIOBufferBlockReadAvail(block, reader);
43 
44     if (0 == bavail) {
45       block = TSIOBufferBlockNext(block);
46     } else {
47       int64_t toconsume = 0;
48 
49       if (canWrite) {
50         int64_t const toskip = std::min(data->m_blockskip, bavail);
51         if (0 < toskip) { // before bytes
52           toconsume = toskip;
53           data->m_blockskip -= toskip;
54         } else {
55           int64_t const bytesleft = data->m_bytestosend - data->m_bytessent;
56           if (0 < bytesleft) { // transfer bytes
57             int64_t const tocopy = std::min(bavail, bytesleft);
58             int64_t const nbytes = TSIOBufferCopy(output_buf, reader, tocopy, 0);
59 
60             done = (nbytes < tocopy); // output buffer stuffed
61 
62             copied += nbytes;
63             data->m_bytessent += nbytes;
64 
65             toconsume = nbytes;
66           } else { // after bytes
67             toconsume = bavail;
68           }
69         }
70       } else { // drain
71         toconsume = bavail;
72       }
73 
74       if (0 < toconsume) {
75         if (bavail == toconsume) {
76           block = TSIOBufferBlockNext(block);
77         }
78         TSIOBufferReaderConsume(reader, toconsume);
79         consumed += toconsume;
80       }
81     }
82   }
83 
84   // tell output more data is available
85   if (0 < copied) {
86     TSVIOReenable(output_vio);
87   }
88 
89   if (0 < consumed) {
90     data->m_blockconsumed += consumed;
91 
92     TSVIO const input_vio = data->m_upstream.m_read.m_vio;
93     if (nullptr != input_vio) {
94       TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed);
95     }
96   }
97 
98   return consumed;
99 }
100 
101 // transfer all bytes from the server (error condition)
102 int64_t
transfer_all_bytes(Data * const data)103 transfer_all_bytes(Data *const data)
104 {
105   // nothing to transfer if there's no source.
106   if (nullptr == data->m_upstream.m_read.m_reader || !data->m_dnstream.m_write.isOpen()) {
107     return 0;
108   }
109 
110   int64_t consumed = 0; // input vio bytes visited
111 
112   TSIOBufferReader const reader = data->m_upstream.m_read.m_reader;
113   TSIOBuffer const output_buf   = data->m_dnstream.m_write.m_iobuf;
114 
115   bool done = false;
116 
117   TSIOBufferBlock block = TSIOBufferReaderStart(reader);
118 
119   while (!done && nullptr != block) {
120     int64_t bavail = TSIOBufferBlockReadAvail(block, reader);
121 
122     if (0 == bavail) {
123       block = TSIOBufferBlockNext(block);
124     } else {
125       int64_t const nbytes = TSIOBufferCopy(output_buf, reader, bavail, 0);
126       done                 = nbytes < bavail; // output buffer is full
127 
128       if (0 < nbytes) {
129         if (bavail == nbytes) {
130           block = TSIOBufferBlockNext(block);
131         }
132         TSIOBufferReaderConsume(reader, nbytes);
133         consumed += nbytes;
134       }
135     }
136   }
137 
138   if (0 < consumed) {
139     TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
140     if (nullptr != output_vio) {
141       TSVIOReenable(output_vio);
142     }
143 
144     TSVIO const input_vio = data->m_upstream.m_read.m_vio;
145     if (nullptr != input_vio) {
146       TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed);
147     }
148   }
149 
150   return consumed;
151 }
152