1 /** @file
2 
3   Implements callin functions for plugins
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "tscore/ink_config.h"
25 #include "FetchSM.h"
26 #include <cstdio>
27 #include "HTTP.h"
28 #include "PluginVC.h"
29 #include "ts/ts.h" // Ugly, but we need a bunch of the public APIs here ... :-/
30 
31 #define DEBUG_TAG "FetchSM"
32 #define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
33 
34 ClassAllocator<FetchSM> FetchSMAllocator("FetchSMAllocator");
35 void
cleanUp()36 FetchSM::cleanUp()
37 {
38   Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__);
39 
40   if (!ink_atomic_cas(&destroyed, false, true)) {
41     Debug(DEBUG_TAG, "Error: Double delete on FetchSM, this:%p", this);
42     return;
43   }
44 
45   if (resp_is_chunked > 0 && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
46     chunked_handler.clear();
47   }
48 
49   free_MIOBuffer(req_buffer);
50   free_MIOBuffer(resp_buffer);
51   mutex.clear();
52   http_parser_clear(&http_parser);
53   client_response_hdr.destroy();
54   ats_free(client_response);
55   cont_mutex.clear();
56   http_vc->do_io_close();
57   FetchSMAllocator.free(this);
58 }
59 
60 void
httpConnect()61 FetchSM::httpConnect()
62 {
63   PluginIdentity *pi = dynamic_cast<PluginIdentity *>(contp);
64   const char *tag    = pi ? pi->getPluginTag() : "fetchSM";
65   int64_t id         = pi ? pi->getPluginId() : 0;
66 
67   Debug(DEBUG_TAG, "[%s] calling httpconnect write pi=%p tag=%s id=%" PRId64, __FUNCTION__, pi, tag, id);
68   http_vc = reinterpret_cast<PluginVC *>(TSHttpConnectWithPluginId(&_addr.sa, tag, id));
69 
70   /*
71    * TS-2906: We need a way to unset internal request when using FetchSM, the use case for this
72    * is H2 when it creates outgoing requests it uses FetchSM and the outgoing requests
73    * are spawned via H2 SYN packets which are definitely not internal requests.
74    */
75   if (!is_internal_request) {
76     PluginVC *other_side = reinterpret_cast<PluginVC *>(http_vc)->get_other_side();
77     if (other_side != nullptr) {
78       other_side->set_is_internal_request(false);
79     }
80   }
81 
82   read_vio  = http_vc->do_io_read(this, INT64_MAX, resp_buffer);
83   write_vio = http_vc->do_io_write(this, getReqLen() + req_content_length, req_reader);
84 }
85 
86 char *
resp_get(int * length)87 FetchSM::resp_get(int *length)
88 {
89   *length = client_bytes;
90   return client_response;
91 }
92 
93 int
InvokePlugin(int event,void * data)94 FetchSM::InvokePlugin(int event, void *data)
95 {
96   EThread *mythread = this_ethread();
97 
98   MUTEX_TAKE_LOCK(contp->mutex, mythread);
99 
100   int ret = contp->handleEvent(event, data);
101 
102   MUTEX_UNTAKE_LOCK(contp->mutex, mythread);
103 
104   return ret;
105 }
106 
107 bool
has_body()108 FetchSM::has_body()
109 {
110   int status_code;
111   HTTPHdr *hdr;
112 
113   if (!header_done) {
114     return false;
115   }
116 
117   if (is_method_head) {
118     return false;
119   }
120   //
121   // The following code comply with HTTP/1.1:
122   // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4
123   //
124 
125   hdr = &client_response_hdr;
126 
127   status_code = hdr->status_get();
128   if (status_code < 200 || status_code == 204 || status_code == 304) {
129     return false;
130   }
131 
132   if (check_chunked()) {
133     return true;
134   }
135 
136   resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
137   if (!resp_content_length) {
138     if (check_connection_close()) {
139       return true;
140     } else {
141       return false;
142     }
143   }
144 
145   return true;
146 }
147 
148 bool
check_body_done()149 FetchSM::check_body_done()
150 {
151   if (!check_chunked()) {
152     if (resp_content_length == resp_received_body_len + resp_reader->read_avail()) {
153       return true;
154     }
155 
156     return false;
157   }
158 
159   //
160   // TODO: check whether the chunked body is done
161   //
162   return true;
163 }
164 
165 bool
check_for_field_value(const char * name,size_t name_len,char const * value,size_t value_len)166 FetchSM::check_for_field_value(const char *name, size_t name_len, char const *value, size_t value_len)
167 {
168   bool zret = false; // not found.
169   StrList slist;
170   HTTPHdr *hdr = &client_response_hdr;
171   int ret      = hdr->value_get_comma_list(name, name_len, &slist);
172 
173   ink_release_assert(header_done);
174 
175   if (ret) {
176     for (Str *f = slist.head; f != nullptr; f = f->next) {
177       if (f->len == value_len && 0 == strncasecmp(f->str, value, value_len)) {
178         Debug(DEBUG_TAG, "[%s] field '%.*s', value '%.*s'", __FUNCTION__, static_cast<int>(name_len), name,
179               static_cast<int>(value_len), value);
180         zret = true;
181         break;
182       }
183     }
184   }
185   return zret;
186 }
187 
188 bool
check_chunked()189 FetchSM::check_chunked()
190 {
191   static const char CHUNKED_TEXT[] = "chunked";
192   static size_t const CHUNKED_LEN  = sizeof(CHUNKED_TEXT) - 1;
193 
194   if (resp_is_chunked < 0) {
195     resp_is_chunked = static_cast<int>(
196       this->check_for_field_value(MIME_FIELD_TRANSFER_ENCODING, MIME_LEN_TRANSFER_ENCODING, CHUNKED_TEXT, CHUNKED_LEN));
197 
198     if (resp_is_chunked && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
199       ChunkedHandler *ch = &chunked_handler;
200       ch->init_by_action(resp_reader, ChunkedHandler::ACTION_DECHUNK);
201       ch->dechunked_reader = ch->dechunked_buffer->alloc_reader();
202       ch->state            = ChunkedHandler::CHUNK_READ_SIZE;
203       resp_reader->dealloc();
204     }
205   }
206   return resp_is_chunked > 0;
207 }
208 
209 bool
check_connection_close()210 FetchSM::check_connection_close()
211 {
212   static const char CLOSE_TEXT[] = "close";
213   static size_t const CLOSE_LEN  = sizeof(CLOSE_TEXT) - 1;
214 
215   if (resp_received_close < 0) {
216     resp_received_close =
217       static_cast<int>(this->check_for_field_value(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION, CLOSE_TEXT, CLOSE_LEN));
218   }
219   return resp_received_close > 0;
220 }
221 
222 int
dechunk_body()223 FetchSM::dechunk_body()
224 {
225   ink_assert(resp_is_chunked > 0);
226   //
227   // Return Value:
228   //  - 0: need to read more data.
229   //  - TS_FETCH_EVENT_EXT_BODY_READY.
230   //  - TS_FETCH_EVENT_EXT_BODY_DONE.
231   //
232   if (chunked_handler.process_chunked_content()) {
233     return TS_FETCH_EVENT_EXT_BODY_DONE;
234   }
235 
236   if (chunked_handler.dechunked_reader->read_avail()) {
237     return TS_FETCH_EVENT_EXT_BODY_READY;
238   }
239 
240   return 0;
241 }
242 
243 void
InvokePluginExt(int fetch_event)244 FetchSM::InvokePluginExt(int fetch_event)
245 {
246   int event;
247   EThread *mythread        = this_ethread();
248   bool read_complete_event = (fetch_event == TS_EVENT_VCONN_READ_COMPLETE) || (fetch_event == TS_EVENT_VCONN_EOS);
249 
250   //
251   // Increasing *recursion* to prevent
252   // FetchSM being deleted by callback.
253   //
254   recursion++;
255 
256   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
257     MUTEX_TAKE_LOCK(cont_mutex, mythread);
258   }
259 
260   if (!contp) {
261     goto out;
262   }
263 
264   if (fetch_event && !read_complete_event) {
265     contp->handleEvent(fetch_event, this);
266     goto out;
267   }
268 
269   if (!has_sent_header) {
270     if (fetch_event != TS_EVENT_VCONN_EOS) {
271       contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this);
272       has_sent_header = true;
273     } else {
274       contp->handleEvent(fetch_event, this);
275       goto out;
276     }
277   }
278 
279   // TS-3112: always check 'contp' after handleEvent()
280   // since handleEvent effectively calls the plugin (or H2 layer)
281   // which may call TSFetchDestroy in error conditions.
282   // TSFetchDestroy sets contp to NULL, but, doesn't destroy FetchSM yet,
283   // since, it¹s in a tight loop protected by 'recursion' counter.
284   // When handleEvent returns, 'recursion' is decremented and contp is
285   // already null, so, FetchSM gets destroyed.
286   if (!contp) {
287     goto out;
288   }
289 
290   if (!has_body()) {
291     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
292     goto out;
293   }
294 
295   Debug(DEBUG_TAG, "[%s] chunked:%d, content_len: %" PRId64 ", received_len: %" PRId64 ", avail: %" PRId64 "", __FUNCTION__,
296         resp_is_chunked, resp_content_length, resp_received_body_len,
297         resp_is_chunked > 0 ? chunked_handler.chunked_reader->read_avail() : resp_reader->read_avail());
298 
299   if (resp_is_chunked > 0) {
300     if (!chunked_handler.chunked_reader->read_avail()) {
301       if (read_complete_event) {
302         contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
303       }
304       goto out;
305     }
306   } else if (!resp_reader->read_avail()) {
307     if (read_complete_event) {
308       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
309     }
310     goto out;
311   }
312 
313   if (!check_chunked()) {
314     if (!check_body_done() && !read_complete_event) {
315       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
316     } else {
317       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
318     }
319   } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK) {
320     do {
321       if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
322         chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
323       }
324 
325       event = dechunk_body();
326       if (!event) {
327         read_vio->reenable();
328         goto out;
329       }
330 
331       contp->handleEvent(event, this);
332 
333       // contp may be null after handleEvent
334       if (!contp) {
335         goto out;
336       }
337 
338     } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
339   } else if (check_body_done()) {
340     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
341   } else {
342     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
343   }
344 
345 out:
346   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
347     MUTEX_UNTAKE_LOCK(cont_mutex, mythread);
348   }
349   recursion--;
350 
351   if (!contp && !recursion) {
352     cleanUp();
353   }
354 
355   return;
356 }
357 
358 void
get_info_from_buffer(IOBufferReader * reader)359 FetchSM::get_info_from_buffer(IOBufferReader *reader)
360 {
361   char *buf, *info;
362   int64_t read_avail, read_done;
363 
364   if (!reader) {
365     client_bytes = 0;
366     return;
367   }
368 
369   read_avail = reader->read_avail();
370   Debug(DEBUG_TAG, "[%s] total avail %" PRId64, __FUNCTION__, read_avail);
371   if (!read_avail) {
372     client_bytes = 0;
373     return;
374   }
375 
376   info            = (char *)ats_malloc(sizeof(char) * (read_avail + 1));
377   client_response = info;
378 
379   // To maintain backwards compatibility we don't allow chunking when it's not streaming.
380   if (!(fetch_flags & TS_FETCH_FLAGS_STREAM) || !check_chunked()) {
381     /* Read the data out of the reader */
382     while (read_avail > 0) {
383       if (reader->block) {
384         reader->skip_empty_blocks();
385       }
386 
387       IOBufferBlock *blk = reader->block.get();
388 
389       // This is the equivalent of TSIOBufferBlockReadStart()
390       buf       = blk->start() + reader->start_offset;
391       read_done = blk->read_avail() - reader->start_offset;
392 
393       if (read_done > 0) {
394         memcpy(info, buf, read_done);
395         reader->consume(read_done);
396         read_avail -= read_done;
397         info += read_done;
398         client_bytes += read_done;
399       }
400     }
401     client_response[client_bytes] = '\0';
402     return;
403   }
404 
405   reader = chunked_handler.dechunked_reader;
406   do {
407     if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
408       chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
409     }
410 
411     if (!dechunk_body()) {
412       break;
413     }
414 
415     /* Read the data out of the reader */
416     read_avail = reader->read_avail();
417     while (read_avail > 0) {
418       if (reader->block) {
419         reader->skip_empty_blocks();
420       }
421 
422       IOBufferBlock *blk = reader->block.get();
423 
424       // This is the equivalent of TSIOBufferBlockReadStart()
425       buf       = blk->start() + reader->start_offset;
426       read_done = blk->read_avail() - reader->start_offset;
427 
428       if (read_done > 0) {
429         memcpy(info, buf, read_done);
430         reader->consume(read_done);
431         read_avail -= read_done;
432         info += read_done;
433         client_bytes += read_done;
434       }
435     }
436   } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
437 
438   client_response[client_bytes] = '\0';
439   return;
440 }
441 
442 void
process_fetch_read(int event)443 FetchSM::process_fetch_read(int event)
444 {
445   Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__);
446   int64_t bytes;
447   int bytes_used;
448   int64_t total_bytes_copied = 0;
449 
450   switch (event) {
451   case TS_EVENT_VCONN_READ_READY:
452     // duplicate the bytes for backward compatibility with TSFetchUrl()
453     if (!(fetch_flags & TS_FETCH_FLAGS_STREAM)) {
454       bytes = resp_reader->read_avail();
455       Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64, __FUNCTION__, bytes);
456 
457       while (total_bytes_copied < bytes) {
458         int64_t actual_bytes_copied;
459         actual_bytes_copied = resp_buffer->write(resp_reader, bytes, 0);
460         Debug(DEBUG_TAG, "[%s] copied %" PRId64 " bytes", __FUNCTION__, actual_bytes_copied);
461         if (actual_bytes_copied <= 0) {
462           break;
463         }
464         total_bytes_copied += actual_bytes_copied;
465       }
466       Debug(DEBUG_TAG, "[%s] total copied %" PRId64 " bytes", __FUNCTION__, total_bytes_copied);
467       resp_reader->consume(total_bytes_copied);
468     }
469 
470     if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) {
471       if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, false) == PARSE_RESULT_DONE) {
472         header_done = true;
473         if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
474           return InvokePluginExt();
475         } else {
476           InvokePlugin(callback_events.success_event_id, (void *)&client_response_hdr);
477         }
478       }
479     } else {
480       if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
481         return InvokePluginExt();
482       }
483     }
484     read_vio->reenable();
485     break;
486   case TS_EVENT_VCONN_READ_COMPLETE:
487   case TS_EVENT_VCONN_EOS:
488     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
489       return InvokePluginExt(event);
490     }
491     if (callback_options == AFTER_HEADER || callback_options == AFTER_BODY) {
492       get_info_from_buffer(resp_reader);
493       InvokePlugin(callback_events.success_event_id, (void *)this);
494     }
495     Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__);
496     cleanUp();
497     break;
498   case TS_EVENT_ERROR:
499   default:
500     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
501       return InvokePluginExt(event);
502     }
503     InvokePlugin(callback_events.failure_event_id, nullptr);
504     cleanUp();
505     break;
506   }
507 }
508 
509 void
process_fetch_write(int event)510 FetchSM::process_fetch_write(int event)
511 {
512   Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__);
513   switch (event) {
514   case TS_EVENT_VCONN_WRITE_COMPLETE:
515     req_finished = true;
516     break;
517   case TS_EVENT_VCONN_WRITE_READY:
518     // data is processed in chunks of 32k; if there is more than 32k
519     // of input data, we have to continue reenabling until all data is
520     // read (we have already written all the data to the buffer)
521     if (req_reader->read_avail() > 0) {
522       ((PluginVC *)http_vc)->reenable(write_vio);
523     }
524     break;
525   case TS_EVENT_ERROR:
526     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
527       return InvokePluginExt(event);
528     }
529     InvokePlugin(callback_events.failure_event_id, nullptr);
530     cleanUp();
531     break;
532   default:
533     break;
534   }
535 }
536 
537 int
fetch_handler(int event,void * edata)538 FetchSM::fetch_handler(int event, void *edata)
539 {
540   Debug(DEBUG_TAG, "[%s] calling fetch_plugin", __FUNCTION__);
541 
542   if (edata == read_vio) {
543     process_fetch_read(event);
544   } else if (edata == write_vio) {
545     process_fetch_write(event);
546   } else {
547     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
548       InvokePluginExt(event);
549       return 1;
550     }
551     InvokePlugin(callback_events.failure_event_id, nullptr);
552     cleanUp();
553   }
554   return 1;
555 }
556 
557 void
ext_init(Continuation * cont,const char * method,const char * url,const char * version,const sockaddr * client_addr,int flags)558 FetchSM::ext_init(Continuation *cont, const char *method, const char *url, const char *version, const sockaddr *client_addr,
559                   int flags)
560 {
561   init_comm();
562 
563   if (flags & TS_FETCH_FLAGS_NEWLOCK) {
564     mutex      = new_ProxyMutex();
565     cont_mutex = cont->mutex;
566   } else {
567     mutex = cont->mutex;
568   }
569 
570   contp = cont;
571   _addr.assign(client_addr);
572 
573   //
574   // Enable stream IO automatically.
575   //
576   fetch_flags = (TS_FETCH_FLAGS_STREAM | flags);
577   if (fetch_flags & TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST) {
578     set_internal_request(false);
579   }
580 
581   //
582   // These options are not used when enable
583   // stream IO.
584   //
585   memset(&callback_options, 0, sizeof(callback_options));
586   memset(&callback_events, 0, sizeof(callback_events));
587 
588   int method_len = strlen(method);
589   req_buffer->write(method, method_len);
590   req_buffer->write(" ", 1);
591   req_buffer->write(url, strlen(url));
592   req_buffer->write(" ", 1);
593   req_buffer->write(version, strlen(version));
594   req_buffer->write("\r\n", 2);
595 
596   if ((method_len == HTTP_LEN_HEAD) && !memcmp(method, HTTP_METHOD_HEAD, HTTP_LEN_HEAD)) {
597     is_method_head = true;
598   }
599 }
600 
601 void
ext_add_header(const char * name,int name_len,const char * value,int value_len)602 FetchSM::ext_add_header(const char *name, int name_len, const char *value, int value_len)
603 {
604   if (TS_MIME_LEN_CONTENT_LENGTH == name_len && !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) {
605     req_content_length = atoll(value);
606   }
607 
608   req_buffer->write(name, name_len);
609   req_buffer->write(": ", 2);
610   req_buffer->write(value, value_len);
611   req_buffer->write("\r\n", 2);
612 }
613 
614 void
ext_launch()615 FetchSM::ext_launch()
616 {
617   req_buffer->write("\r\n", 2);
618   httpConnect();
619 }
620 
621 void
ext_write_data(const void * data,size_t len)622 FetchSM::ext_write_data(const void *data, size_t len)
623 {
624   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
625     MUTEX_TAKE_LOCK(mutex, this_ethread());
626   }
627   req_buffer->write(data, len);
628 
629   Debug(DEBUG_TAG, "[%s] re-enabling write_vio, header_done %u", __FUNCTION__, header_done);
630   write_vio->reenable();
631 
632   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
633     MUTEX_UNTAKE_LOCK(mutex, this_ethread());
634   }
635 }
636 
637 ssize_t
ext_read_data(char * buf,size_t len)638 FetchSM::ext_read_data(char *buf, size_t len)
639 {
640   const char *start;
641   TSIOBufferReader reader;
642   TSIOBufferBlock blk, next_blk;
643   int64_t already, blk_len, need, wavail;
644 
645   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
646     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
647     if (!lock.is_locked()) {
648       return 0;
649     }
650   }
651 
652   if (!header_done) {
653     return 0;
654   }
655 
656   if (check_chunked() && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
657     reader = (tsapi_bufferreader *)chunked_handler.dechunked_reader;
658   } else {
659     reader = (TSIOBufferReader)resp_reader;
660   }
661 
662   already = 0;
663   blk     = TSIOBufferReaderStart(reader);
664 
665   while (blk) {
666     wavail = len - already;
667 
668     next_blk = TSIOBufferBlockNext(blk);
669     start    = TSIOBufferBlockReadStart(blk, reader, &blk_len);
670 
671     need = blk_len > wavail ? wavail : blk_len;
672 
673     memcpy(&buf[already], start, need);
674     already += need;
675 
676     if (already >= static_cast<int64_t>(len)) {
677       break;
678     }
679 
680     blk = next_blk;
681   }
682 
683   resp_received_body_len += already;
684   TSIOBufferReaderConsume(reader, already);
685 
686   read_vio->reenable();
687   return already;
688 }
689 
690 void
ext_destroy()691 FetchSM::ext_destroy()
692 {
693   contp = nullptr;
694 
695   if (recursion) {
696     return;
697   }
698 
699   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
700     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
701     if (!lock.is_locked()) {
702       eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME);
703       return;
704     }
705   }
706 
707   cleanUp();
708 }
709 
710 void
ext_set_user_data(void * data)711 FetchSM::ext_set_user_data(void *data)
712 {
713   user_data = data;
714 }
715 
716 void *
ext_get_user_data()717 FetchSM::ext_get_user_data()
718 {
719   return user_data;
720 }
721 
722 TSMBuffer
resp_hdr_bufp()723 FetchSM::resp_hdr_bufp()
724 {
725   HdrHeapSDKHandle *heap;
726   heap = (HdrHeapSDKHandle *)&client_response_hdr;
727 
728   return (TSMBuffer)heap;
729 }
730 
731 TSMLoc
resp_hdr_mloc()732 FetchSM::resp_hdr_mloc()
733 {
734   return (TSMLoc)client_response_hdr.m_http;
735 }
736