1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "tsmemcache.h"
25 
26 /*
27   TODO
28   - on OPEN_WRITE_FAIL don't poll, figure out another way, and timeout
29   - factor code better, particularly incr/set
30   - MIOBufferAccessor::reader_for
31   - cleanup creader dependency in stream_event
32  */
33 
34 #define REALTIME_MAXDELTA 60 * 60 * 24 * 30
35 #define STRCMP_REST(_c, _s, _e) (((_e) - (_s)) < (int)sizeof(_c) || STRCMP(_s, _c) || !isspace((_s)[sizeof(_c) - 1]))
36 
37 ClassAllocator<MC> theMCAllocator("MC");
38 
39 static time_t base_day_time;
40 
41 // These should be persistent.
42 int32_t MC::verbosity     = 0;
43 ink_hrtime MC::last_flush = 0;
44 int64_t MC::next_cas      = 1;
45 
46 static void
tsmemcache_constants()47 tsmemcache_constants()
48 {
49   struct tm tm;
50   memset(&tm, 0, sizeof(tm));
51   // jan 1 2010
52   tm.tm_year    = 110;
53   tm.tm_mon     = 1;
54   tm.tm_mday    = 1;
55   base_day_time = mktime(&tm);
56   ink_assert(base_day_time != (time_t)-1);
57 }
58 
59 #ifdef DEBUG
60 char debug_string_buffer[TSMEMCACHE_TMP_CMD_BUFFER_SIZE];
61 static char *
mc_string(const char * s,int len)62 mc_string(const char *s, int len)
63 {
64   int l = len;
65   while (l && (s[l - 1] == '\r' || s[l - 1] == '\n')) {
66     l--;
67   }
68   if (l > TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1) {
69     l = TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1;
70   }
71   if (l) {
72     memcpy(debug_string_buffer, s, l);
73   }
74   debug_string_buffer[l] = 0;
75   return debug_string_buffer;
76 }
77 #endif
78 
79 #ifdef DEBUG
80 #define MCDebugBuf(_t, _s, _l) \
81   if (is_debug_tag_set(_t))    \
82   printf(_t ": %s\n", mc_string(_s, _l))
83 #define MCDebug Debug
84 #else
85 #define MCDebugBuf(_t, _s, _l) \
86   do {                         \
87   } while (0)
88 #define MCDebug \
89   if (0)        \
90   Debug
91 #endif
92 
93 static uint64_t
ink_hton64(uint64_t in)94 ink_hton64(uint64_t in)
95 {
96   int32_t val = 1;
97   uint8_t *c  = reinterpret_cast<uint8_t *>(&val);
98   if (*c == 1) {
99     union {
100       uint64_t rv;
101       uint8_t b[8];
102     } x;
103 #define SWP1B(_x, _y) \
104   do {                \
105     uint8_t t = (_y); \
106     (_y)      = (_x); \
107     (_x)      = t;    \
108   } while (0)
109     x.rv = in;
110     SWP1B(x.b[0], x.b[7]);
111     SWP1B(x.b[1], x.b[6]);
112     SWP1B(x.b[2], x.b[5]);
113     SWP1B(x.b[3], x.b[4]);
114 #undef SWP1B
115     return x.rv;
116   } else {
117     return in;
118   }
119 }
120 #define ink_ntoh64 ink_hton64
121 
122 int
main_event(int event,void * data)123 MCAccept::main_event(int event, void *data)
124 {
125   if (event == NET_EVENT_ACCEPT) {
126     NetVConnection *netvc = (NetVConnection *)data;
127     MC *mc                = theMCAllocator.alloc();
128     if (!mutex->thread_holding) {
129       mc->new_connection(netvc, netvc->thread);
130     } else {
131       mc->new_connection(netvc, mutex->thread_holding);
132     }
133     return EVENT_CONT;
134   } else {
135     Fatal("tsmemcache accept received fatal error: errno = %d", -((int)(intptr_t)data));
136     return EVENT_CONT;
137   }
138 }
139 
140 void
new_connection(NetVConnection * netvc,EThread * thread)141 MC::new_connection(NetVConnection *netvc, EThread *thread)
142 {
143   nvc              = netvc;
144   mutex            = new_ProxyMutex();
145   rbuf             = new_MIOBuffer(MAX_IOBUFFER_SIZE);
146   rbuf->water_mark = TSMEMCACHE_TMP_CMD_BUFFER_SIZE;
147   reader           = rbuf->alloc_reader();
148   wbuf             = new_empty_MIOBuffer();
149   cbuf             = 0;
150   writer           = wbuf->alloc_reader();
151   SCOPED_MUTEX_LOCK(lock, mutex, thread);
152   rvio         = nvc->do_io_read(this, INT64_MAX, rbuf);
153   wvio         = nvc->do_io_write(this, 0, writer);
154   header.magic = TSMEMCACHE_HEADER_MAGIC;
155   read_from_client();
156 }
157 
158 int
die()159 MC::die()
160 {
161   if (pending_action && pending_action != ACTION_RESULT_DONE) {
162     pending_action->cancel();
163   }
164   if (nvc) {
165     nvc->do_io_close(1); // abort
166   }
167   if (crvc) {
168     crvc->do_io_close(1); // abort
169   }
170   if (cwvc) {
171     cwvc->do_io_close(1); // abort
172   }
173   if (rbuf) {
174     free_MIOBuffer(rbuf);
175   }
176   if (wbuf) {
177     free_MIOBuffer(wbuf);
178   }
179   if (cbuf) {
180     free_MIOBuffer(cbuf);
181   }
182   if (tbuf) {
183     ats_free(tbuf);
184   }
185   mutex = NULL;
186   theMCAllocator.free(this);
187   return EVENT_DONE;
188 }
189 
190 int
unexpected_event()191 MC::unexpected_event()
192 {
193   ink_assert(!"unexpected event");
194   return die();
195 }
196 
197 int
write_then_close(int64_t ntowrite)198 MC::write_then_close(int64_t ntowrite)
199 {
200   SET_HANDLER(&MC::write_then_close_event);
201   return write_to_client(ntowrite);
202 }
203 
204 int
write_then_read_from_client(int64_t ntowrite)205 MC::write_then_read_from_client(int64_t ntowrite)
206 {
207   SET_HANDLER(&MC::read_from_client_event);
208   return write_to_client(ntowrite);
209 }
210 
211 int
stream_then_read_from_client(int64_t ntowrite)212 MC::stream_then_read_from_client(int64_t ntowrite)
213 {
214   SET_HANDLER(&MC::read_from_client_event);
215   creader = reader;
216   TS_PUSH_HANDLER(&MC::stream_event);
217   return write_to_client(ntowrite);
218 }
219 
220 void
add_binary_header(uint16_t err,uint8_t hdr_len,uint16_t key_len,uint32_t body_len)221 MC::add_binary_header(uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len)
222 {
223   protocol_binary_response_header r;
224 
225   r.response.magic    = static_cast<uint8_t>(PROTOCOL_BINARY_RES);
226   r.response.opcode   = binary_header.request.opcode;
227   r.response.keylen   = (uint16_t)htons(key_len);
228   r.response.extlen   = hdr_len;
229   r.response.datatype = static_cast<uint8_t>(PROTOCOL_BINARY_RAW_BYTES);
230   r.response.status   = (uint16_t)htons(err);
231   r.response.bodylen  = htonl(body_len);
232   r.response.opaque   = binary_header.request.opaque;
233   r.response.cas      = ink_hton64(header.cas);
234 
235   wbuf->write(&r, sizeof(r));
236 }
237 
238 int
write_binary_error(protocol_binary_response_status err,int swallow)239 MC::write_binary_error(protocol_binary_response_status err, int swallow)
240 {
241   const char *errstr = "Unknown error";
242   switch (err) {
243   case PROTOCOL_BINARY_RESPONSE_ENOMEM:
244     errstr = "Out of memory";
245     break;
246   case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
247     errstr = "Unknown command";
248     break;
249   case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
250     errstr = "Not found";
251     break;
252   case PROTOCOL_BINARY_RESPONSE_EINVAL:
253     errstr = "Invalid arguments";
254     break;
255   case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
256     errstr = "Data exists for key.";
257     break;
258   case PROTOCOL_BINARY_RESPONSE_E2BIG:
259     errstr = "Too large.";
260     break;
261   case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
262     errstr = "Non-numeric server-side value for incr or decr";
263     break;
264   case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
265     errstr = "Not stored.";
266     break;
267   case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
268     errstr = "Auth failure.";
269     break;
270   default:
271     ink_assert(!"unhandled error");
272     errstr = "UNHANDLED ERROR";
273     Warning("tsmemcache: unhandled error: %d\n", err);
274   }
275 
276   size_t len = strlen(errstr);
277   add_binary_header(err, 0, 0, len);
278   if (swallow > 0) {
279     int64_t avail = reader->read_avail();
280     if (avail >= swallow) {
281       reader->consume(swallow);
282     } else {
283       swallow_bytes = swallow - avail;
284       reader->consume(avail);
285       SET_HANDLER(&MC::swallow_then_read_event);
286     }
287   }
288   return 0;
289 }
290 
291 int
swallow_then_read_event(int event,void * data)292 MC::swallow_then_read_event(int event, void *data)
293 {
294   rvio->nbytes  = INT64_MAX;
295   int64_t avail = reader->read_avail();
296   if (avail >= swallow_bytes) {
297     reader->consume(swallow_bytes);
298     swallow_bytes = 0;
299     return read_from_client();
300   } else {
301     swallow_bytes -= avail;
302     reader->consume(avail);
303     return EVENT_CONT;
304   }
305 }
306 
307 int
swallow_cmd_then_read_from_client_event(int event,void * data)308 MC::swallow_cmd_then_read_from_client_event(int event, void *data)
309 {
310   int64_t avail = reader->read_avail();
311   if (avail) {
312     int64_t n = reader->memchr('\n');
313     if (n >= 0) {
314       reader->consume(n + 1);
315       return read_from_client();
316     }
317     reader->consume(avail);
318     return EVENT_CONT;
319   }
320   return EVENT_CONT;
321 }
322 
323 int
protocol_error()324 MC::protocol_error()
325 {
326   Warning("tsmemcache: protocol error");
327   return write_then_close(write_binary_error(PROTOCOL_BINARY_RESPONSE_EINVAL, 0));
328 }
329 
330 int
read_from_client()331 MC::read_from_client()
332 {
333   if (swallow_bytes) {
334     return TS_SET_CALL(&MC::swallow_then_read_event, VC_EVENT_READ_READY, rvio);
335   }
336   read_offset = 0;
337   end_of_cmd  = 0;
338   ngets       = 0;
339   ff          = 0;
340   if (crvc) {
341     crvc->do_io_close();
342     crvc  = 0;
343     crvio = NULL;
344   }
345   if (cwvc) {
346     cwvc->do_io_close();
347     cwvc  = 0;
348     cwvio = NULL;
349   }
350   if (cbuf) {
351     cbuf->clear();
352   }
353   ink_assert(!crvc && !cwvc);
354   if (tbuf) {
355     ats_free(tbuf);
356   }
357   return TS_SET_CALL(&MC::read_from_client_event, VC_EVENT_READ_READY, rvio);
358 }
359 
360 int
write_to_client(int64_t towrite)361 MC::write_to_client(int64_t towrite)
362 {
363   (void)towrite;
364   wvio->nbytes = INT64_MAX;
365   wvio->reenable();
366   return EVENT_CONT;
367 }
368 
369 int
write_binary_response(const void * d,int hlen,int keylen,int dlen)370 MC::write_binary_response(const void *d, int hlen, int keylen, int dlen)
371 {
372   if (!f.noreply || binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETQ ||
373       binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETKQ) {
374     add_binary_header(0, hlen, keylen, dlen);
375     if (dlen) {
376       MCDebug("tsmemcache", "response dlen %d\n", dlen);
377       wbuf->write(d, dlen);
378     } else
379       MCDebug("tsmemcache", "no response\n");
380   }
381   return writer->read_avail();
382 }
383 
384 #define CHECK_READ_AVAIL(_n, _h)                     \
385   do {                                               \
386     if (reader->read_avail() < _n) {                 \
387       switch (event) {                               \
388       case VC_EVENT_EOS:                             \
389         if ((VIO *)data == rvio)                     \
390           break;                                     \
391       /* fallthrough */                              \
392       case VC_EVENT_READ_READY:                      \
393         return EVENT_CONT;                           \
394       case VC_EVENT_WRITE_READY:                     \
395         if (wvio->buffer.reader()->read_avail() > 0) \
396           return EVENT_CONT;                         \
397       /* fallthrough */                              \
398       case VC_EVENT_WRITE_COMPLETE:                  \
399         return EVENT_DONE;                           \
400       default:                                       \
401         break;                                       \
402       }                                              \
403       return die();                                  \
404     }                                                \
405   } while (0)
406 
407 static char *
get_pointer(MC * mc,int start,int len)408 get_pointer(MC *mc, int start, int len)
409 {
410   if (mc->reader->block_read_avail() >= start + len) {
411     return mc->reader->start() + start;
412   }
413   // the block of data straddles an IOBufferBlock boundary, exceptional case, malloc
414   ink_assert(!mc->tbuf);
415   mc->tbuf = static_cast<char *>(ats_malloc(len));
416   mc->reader->memcpy(mc->tbuf, len, start);
417   return mc->tbuf;
418 }
419 
420 static inline char *
binary_get_key(MC * mc)421 binary_get_key(MC *mc)
422 {
423   return get_pointer(mc, 0, mc->binary_header.request.keylen);
424 }
425 
426 int
cache_read_event(int event,void * data)427 MC::cache_read_event(int event, void *data)
428 {
429   switch (event) {
430   case CACHE_EVENT_OPEN_READ: {
431     crvc     = (CacheVConnection *)data;
432     int hlen = 0;
433     if (crvc->get_header((void **)&rcache_header, &hlen) < 0) {
434       goto Lfail;
435     }
436     if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || rcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
437       goto Lfail;
438     }
439     if (header.nkey != rcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + rcache_header->nkey)) {
440       goto Lfail;
441     }
442     if (memcmp(key, rcache_header->key(), header.nkey)) {
443       goto Lfail;
444     }
445     {
446       ink_hrtime t = Thread::get_hrtime();
447       if ((static_cast<ink_hrtime>(rcache_header->settime)) <= last_flush ||
448           t >= (static_cast<ink_hrtime>(rcache_header->settime)) + HRTIME_SECONDS(rcache_header->exptime)) {
449         goto Lfail;
450       }
451     }
452     break;
453   Lfail:
454     crvc->do_io_close();
455     crvc  = 0;
456     crvio = NULL;
457     event = CACHE_EVENT_OPEN_READ_FAILED; // convert to failure
458     break;
459   }
460   case VC_EVENT_EOS:
461   case VC_EVENT_ERROR:
462   case CACHE_EVENT_OPEN_READ_FAILED:
463     break;
464   default:
465     return EVENT_CONT;
466   }
467   return TS_POP_CALL(event, data);
468 }
469 
470 int
get_item()471 MC::get_item()
472 {
473   TS_PUSH_HANDLER(&MC::cache_read_event);
474   CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
475   pending_action = cacheProcessor.open_read(this, &cache_key);
476   return EVENT_CONT;
477 }
478 
479 int
set_item()480 MC::set_item()
481 {
482   CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
483   pending_action = cacheProcessor.open_write(this, &cache_key, CACHE_FRAG_TYPE_NONE, header.nbytes,
484                                              CACHE_WRITE_OPT_OVERWRITE | TSMEMCACHE_WRITE_SYNC);
485   return EVENT_CONT;
486 }
487 
488 int
delete_item()489 MC::delete_item()
490 {
491   CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
492   pending_action = cacheProcessor.remove(this, &cache_key, CACHE_FRAG_TYPE_NONE);
493   return EVENT_CONT;
494 }
495 
496 int
binary_get_event(int event,void * data)497 MC::binary_get_event(int event, void *data)
498 {
499   ink_assert(!"EVENT_ITEM_GOT is incorrect here");
500   if (event != TSMEMCACHE_EVENT_GOT_ITEM) {
501     CHECK_READ_AVAIL(binary_header.request.keylen, &MC::binary_get);
502     key         = binary_get_key(this);
503     header.nkey = binary_header.request.keylen;
504     return get_item();
505   } else if (event == CACHE_EVENT_OPEN_READ_FAILED) {
506     if (f.noreply) {
507       return read_from_client();
508     }
509     if (binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETK) {
510       add_binary_header(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, header.nkey, header.nkey);
511       wbuf->write(key, header.nkey);
512       return write_then_read_from_client();
513     } else {
514       return write_binary_error(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
515     }
516   } else if (event == CACHE_EVENT_OPEN_READ) {
517     protocol_binary_response_get *rsp = &res.get;
518     uint16_t keylen                   = 0;
519     uint32_t bodylen                  = sizeof(rsp->message.body) + (rcache_header->nbytes - 2);
520     bool getk =
521       (binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETK || binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETKQ);
522     if (getk) {
523       bodylen += header.nkey;
524       keylen = header.nkey;
525     }
526     add_binary_header(0, sizeof(rsp->message.body), keylen, bodylen);
527     rsp->message.header.response.cas = ink_hton64(rcache_header->cas);
528     rsp->message.body.flags          = htonl(rcache_header->flags);
529     wbuf->write(&rsp->message.body, sizeof(rsp->message.body));
530     if (getk) {
531       wbuf->write(key, header.nkey);
532     }
533     crvio = crvc->do_io_read(this, rcache_header->nbytes, wbuf);
534     return stream_then_read_from_client(rcache_header->nbytes);
535   } else {
536     return unexpected_event();
537   }
538   return 0;
539 }
540 
541 int
bin_read_key()542 MC::bin_read_key()
543 {
544   return -1;
545 }
546 
547 int
read_binary_from_client_event(int event,void * data)548 MC::read_binary_from_client_event(int event, void *data)
549 {
550   if (reader->read_avail() < (int)sizeof(binary_header)) {
551     return EVENT_CONT;
552   }
553   reader->memcpy(&binary_header, sizeof(binary_header));
554   if (binary_header.request.magic != PROTOCOL_BINARY_REQ) {
555     Warning("tsmemcache: bad binary magic: %x", binary_header.request.magic);
556     return die();
557   }
558   int keylen = binary_header.request.keylen = ntohs(binary_header.request.keylen);
559   int bodylen = binary_header.request.bodylen = ntohl(binary_header.request.bodylen);
560   binary_header.request.cas                   = ink_ntoh64(binary_header.request.cas);
561   int extlen                                  = binary_header.request.extlen;
562   end_of_cmd                                  = sizeof(binary_header) + extlen;
563 
564 #define CHECK_PROTOCOL(_e) \
565   if (!(_e))               \
566     return protocol_error();
567 
568   MCDebug("tsmemcache", "bin cmd %d\n", binary_header.request.opcode);
569   switch (binary_header.request.opcode) {
570   case PROTOCOL_BINARY_CMD_VERSION:
571     CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0);
572     return write_to_client(write_binary_response(TSMEMCACHE_VERSION, 0, 0, STRLEN(TSMEMCACHE_VERSION)));
573   case PROTOCOL_BINARY_CMD_NOOP:
574     CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0);
575     return write_to_client(write_binary_response(nullptr, 0, 0, 0));
576   case PROTOCOL_BINARY_CMD_GETKQ:
577     f.noreply = 1; // fall through
578   case PROTOCOL_BINARY_CMD_GETQ:
579     f.noreply = 1; // fall through
580   case PROTOCOL_BINARY_CMD_GETK:
581   case PROTOCOL_BINARY_CMD_GET:
582     CHECK_PROTOCOL(extlen == 0 && (int)bodylen == keylen && keylen > 0);
583     return TS_SET_CALL(&MC::binary_get_event, event, data);
584   case PROTOCOL_BINARY_CMD_APPENDQ:
585   case PROTOCOL_BINARY_CMD_APPEND:
586     f.set_append = 1;
587     goto Lset;
588   case PROTOCOL_BINARY_CMD_PREPENDQ:
589   case PROTOCOL_BINARY_CMD_PREPEND:
590     f.set_prepend = 1;
591     goto Lset;
592   case PROTOCOL_BINARY_CMD_ADDQ:
593     f.noreply = 1; // fall through
594   case PROTOCOL_BINARY_CMD_ADD:
595     CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
596     f.set_add = 1;
597     goto Lset;
598   case PROTOCOL_BINARY_CMD_REPLACEQ:
599     f.noreply = 1; // fall through
600   case PROTOCOL_BINARY_CMD_REPLACE:
601     CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
602     f.set_replace = 1;
603     goto Lset;
604   case PROTOCOL_BINARY_CMD_SETQ:
605     f.noreply = 1; // fall through
606   case PROTOCOL_BINARY_CMD_SET: {
607     CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
608   Lset:
609     if (bin_read_key() < 0) {
610       return EVENT_CONT;
611     }
612     key                              = binary_get_key(this);
613     header.nkey                      = keylen;
614     protocol_binary_request_set *req = reinterpret_cast<protocol_binary_request_set *>(&binary_header);
615     req->message.body.flags          = ntohl(req->message.body.flags);
616     req->message.body.expiration     = ntohl(req->message.body.expiration);
617     nbytes                           = bodylen - (header.nkey + extlen);
618     break;
619   }
620   case PROTOCOL_BINARY_CMD_DELETEQ:
621     f.noreply = 1; // fall through
622   case PROTOCOL_BINARY_CMD_DELETE:
623     break;
624   case PROTOCOL_BINARY_CMD_INCREMENTQ:
625     f.noreply = 1; // fall through
626   case PROTOCOL_BINARY_CMD_INCREMENT:
627     break;
628   case PROTOCOL_BINARY_CMD_DECREMENTQ:
629     f.noreply = 1; // fall through
630   case PROTOCOL_BINARY_CMD_DECREMENT:
631     break;
632   case PROTOCOL_BINARY_CMD_QUITQ:
633     f.noreply = 1; // fall through
634   case PROTOCOL_BINARY_CMD_QUIT:
635     if (f.noreply) {
636       return die();
637     }
638     return write_then_close(write_binary_response(nullptr, 0, 0, 0));
639   case PROTOCOL_BINARY_CMD_FLUSHQ:
640     f.noreply = 1; // fall through
641   case PROTOCOL_BINARY_CMD_FLUSH:
642     break;
643     break;
644   case PROTOCOL_BINARY_CMD_STAT:
645     break;
646   case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
647   case PROTOCOL_BINARY_CMD_SASL_AUTH:
648   case PROTOCOL_BINARY_CMD_SASL_STEP:
649     Warning("tsmemcache: sasl not (yet) supported");
650     return die();
651   case PROTOCOL_BINARY_CMD_RGET:
652   case PROTOCOL_BINARY_CMD_RSET:
653   case PROTOCOL_BINARY_CMD_RSETQ:
654   case PROTOCOL_BINARY_CMD_RAPPEND:
655   case PROTOCOL_BINARY_CMD_RAPPENDQ:
656   case PROTOCOL_BINARY_CMD_RPREPEND:
657   case PROTOCOL_BINARY_CMD_RPREPENDQ:
658   case PROTOCOL_BINARY_CMD_RDELETE:
659   case PROTOCOL_BINARY_CMD_RDELETEQ:
660   case PROTOCOL_BINARY_CMD_RINCR:
661   case PROTOCOL_BINARY_CMD_RINCRQ:
662   case PROTOCOL_BINARY_CMD_RDECR:
663   case PROTOCOL_BINARY_CMD_RDECRQ:
664     Warning("tsmemcache: range not (yet) supported");
665     return die();
666   default:
667     Warning("tsmemcache: unexpected binary opcode %x", binary_header.request.opcode);
668     return die();
669   }
670   return EVENT_CONT;
671 }
672 
673 int
ascii_response(const char * s,int len)674 MC::ascii_response(const char *s, int len)
675 {
676   if (!f.noreply) {
677     wbuf->write(s, len);
678     wvio->nbytes = INT64_MAX;
679     wvio->reenable();
680     MCDebugBuf("tsmemcache_ascii_response", s, len);
681   }
682   if (end_of_cmd > 0) {
683     reader->consume(end_of_cmd);
684     return read_from_client();
685   } else if (end_of_cmd < 0) {
686     return read_from_client();
687   } else {
688     return TS_SET_CALL(&MC::swallow_cmd_then_read_from_client_event, EVENT_NONE, NULL);
689   }
690 }
691 
692 char *
get_ascii_input(int n,int * end)693 MC::get_ascii_input(int n, int *end)
694 {
695   int block_read_avail = reader->block_read_avail();
696   if (block_read_avail >= n) {
697   Lblock:
698     *end = block_read_avail;
699     return reader->start();
700   }
701   int read_avail = reader->read_avail();
702   if (block_read_avail == read_avail) {
703     goto Lblock;
704   }
705   char *c = tmp_cmd_buffer;
706   int e   = read_avail;
707   if (e > n) {
708     e = n;
709   }
710   reader->memcpy(c, e);
711   *end = e;
712   return c;
713 }
714 
715 int
ascii_get_event(int event,void * data)716 MC::ascii_get_event(int event, void *data)
717 {
718   switch (event) {
719   case CACHE_EVENT_OPEN_READ_FAILED:
720     reader->consume(read_offset);
721     read_offset = 0;
722     break;
723   case CACHE_EVENT_OPEN_READ: {
724     wbuf->WRITE("VALUE ");
725     wbuf->write(key, header.nkey);
726     wbuf->WRITE(" ");
727     char t[32], *te = t + 32;
728     char *flags = xutoa(rcache_header->flags, te);
729     wbuf->write(flags, te - flags);
730     wbuf->WRITE(" ");
731     char *bytes = xutoa(rcache_header->nbytes, te);
732     wbuf->write(bytes, te - bytes);
733     if (f.return_cas) {
734       wbuf->WRITE(" ");
735       char *pcas = xutoa(rcache_header->cas, te);
736       wbuf->write(pcas, te - pcas);
737     }
738     wbuf->WRITE("\r\n");
739     int ntowrite = writer->read_avail() + rcache_header->nbytes;
740     crvio        = crvc->do_io_read(this, rcache_header->nbytes, wbuf);
741     creader      = reader;
742     TS_PUSH_HANDLER(&MC::stream_event);
743     return write_to_client(ntowrite);
744   }
745   case TSMEMCACHE_STREAM_DONE:
746     crvc->do_io_close();
747     crvc  = 0;
748     crvio = NULL;
749     reader->consume(read_offset);
750     read_offset = 0;
751     wbuf->WRITE("\r\n");
752     return ascii_gets();
753   default:
754     break;
755   }
756   return ascii_gets();
757 }
758 
759 int
ascii_set_event(int event,void * data)760 MC::ascii_set_event(int event, void *data)
761 {
762   switch (event) {
763   case CACHE_EVENT_OPEN_WRITE_FAILED:
764     // another write currently in progress
765     mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL);
766     return EVENT_CONT;
767   case EVENT_INTERVAL:
768     return read_from_client();
769   case CACHE_EVENT_OPEN_WRITE: {
770     cwvc     = (CacheVConnection *)data;
771     int hlen = 0;
772     if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) {
773       if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
774         goto Lfail;
775       }
776       if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) {
777         goto Lfail;
778       }
779       ink_hrtime t = Thread::get_hrtime();
780       if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush ||
781           t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) {
782         goto Lstale;
783       }
784       if (f.set_add) {
785         return ASCII_RESPONSE("NOT_STORED");
786       }
787     } else {
788     Lstale:
789       if (f.set_replace) {
790         return ASCII_RESPONSE("NOT_STORED");
791       }
792     }
793     memcpy(tmp_cache_header_key, key, header.nkey);
794     header.settime = Thread::get_hrtime();
795     if (exptime) {
796       if (exptime > REALTIME_MAXDELTA) {
797         if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) {
798           header.exptime = 0;
799         } else {
800           header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND));
801         }
802       } else {
803         header.exptime = exptime;
804       }
805     } else {
806       header.exptime = UINT32_MAX; // 136 years
807     }
808     if (f.set_cas) {
809       if (!wcache_header) {
810         return ASCII_RESPONSE("NOT_FOUND");
811       }
812       if (header.cas && header.cas != wcache_header->cas) {
813         return ASCII_RESPONSE("EXISTS");
814       }
815     }
816     header.cas = ink_atomic_increment(&next_cas, 1);
817     if (f.set_append || f.set_prepend) {
818       header.nbytes = nbytes + rcache_header->nbytes;
819     } else {
820       header.nbytes = nbytes;
821     }
822     cwvc->set_header(&header, header.len());
823     reader->consume(end_of_cmd);
824     end_of_cmd    = -1;
825     swallow_bytes = 2; // \r\n
826     if (f.set_append) {
827       TS_PUSH_HANDLER(&MC::tunnel_event);
828       if (!cbuf) {
829         cbuf = new_empty_MIOBuffer();
830       }
831       creader = cbuf->alloc_reader();
832       crvio   = crvc->do_io_read(this, rcache_header->nbytes, cbuf);
833       cwvio   = cwvc->do_io_write(this, header.nbytes, creader);
834     } else {
835       if (f.set_prepend) {
836         int64_t a = reader->read_avail();
837         if (a >= static_cast<int64_t>(nbytes)) {
838           a = static_cast<int64_t>(nbytes);
839         }
840         if (!cbuf) {
841           cbuf = new_empty_MIOBuffer();
842         }
843         creader = cbuf->alloc_reader();
844         if (a) {
845           cbuf->write(reader, a);
846           reader->consume(a);
847         }
848         if (a == static_cast<int64_t>(nbytes)) {
849           cwvio = cwvc->do_io_write(this, header.nbytes, creader);
850           goto Lstreamdone;
851         }
852         rvio->nbytes = rvio->ndone + (int64_t)nbytes - a;
853       } else {
854         creader = reader;
855       }
856       TS_PUSH_HANDLER(&MC::stream_event);
857       cwvio = cwvc->do_io_write(this, header.nbytes, creader);
858     }
859     return EVENT_CONT;
860   }
861   case TSMEMCACHE_STREAM_DONE:
862     rvio->nbytes = UINT64_MAX;
863   Lstreamdone:
864     if (f.set_prepend) {
865       TS_PUSH_HANDLER(&MC::tunnel_event);
866       crvio = crvc->do_io_read(this, rcache_header->nbytes, cbuf);
867       return EVENT_CONT;
868     }
869     return ASCII_RESPONSE("STORED");
870   case TSMEMCACHE_TUNNEL_DONE:
871     crvc->do_io_close();
872     crvc  = 0;
873     crvio = NULL;
874     if (f.set_append) {
875       int64_t a = reader->read_avail();
876       if (a > static_cast<int64_t>(nbytes)) {
877         a = static_cast<int64_t>(nbytes);
878       }
879       if (a) {
880         cbuf->write(reader, a);
881         reader->consume(a);
882       }
883       TS_PUSH_HANDLER(&MC::stream_event);
884       return handleEvent(VC_EVENT_READ_READY, rvio);
885     }
886     ink_assert(f.set_prepend);
887     cwvc->do_io_close();
888     cwvc = 0;
889     return ASCII_RESPONSE("STORED");
890   case CACHE_EVENT_OPEN_READ_FAILED:
891     swallow_bytes = nbytes + 2;
892     return ASCII_RESPONSE("NOT_STORED");
893   case CACHE_EVENT_OPEN_READ:
894     crvc = (CacheVConnection *)data;
895     return set_item();
896   default:
897     break;
898   }
899   return EVENT_CONT;
900 Lfail:
901   Warning("tsmemcache: bad cache data");
902   return ASCII_SERVER_ERROR("");
903 }
904 
905 int
ascii_delete_event(int event,void * data)906 MC::ascii_delete_event(int event, void *data)
907 {
908   switch (event) {
909   case CACHE_EVENT_REMOVE_FAILED:
910     return ASCII_RESPONSE("NOT_FOUND");
911   case CACHE_EVENT_REMOVE:
912     return ASCII_RESPONSE("DELETED");
913   default:
914     return EVENT_CONT;
915   }
916 }
917 
918 int
ascii_incr_decr_event(int event,void * data)919 MC::ascii_incr_decr_event(int event, void *data)
920 {
921   switch (event) {
922   case CACHE_EVENT_OPEN_WRITE_FAILED:
923     // another write currently in progress
924     mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL);
925     return EVENT_CONT;
926   case EVENT_INTERVAL:
927     return read_from_client();
928   case CACHE_EVENT_OPEN_WRITE: {
929     int hlen = 0;
930     cwvc     = (CacheVConnection *)data;
931     {
932       if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) {
933         if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
934           goto Lfail;
935         }
936         if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) {
937           goto Lfail;
938         }
939         ink_hrtime t = Thread::get_hrtime();
940         if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush ||
941             t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) {
942           goto Lfail;
943         }
944       } else {
945         goto Lfail;
946       }
947       memcpy(tmp_cache_header_key, key, header.nkey);
948       header.settime = Thread::get_hrtime();
949       if (exptime) {
950         if (exptime > REALTIME_MAXDELTA) {
951           if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) {
952             header.exptime = 0;
953           } else {
954             header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND));
955           }
956         } else {
957           header.exptime = exptime;
958         }
959       } else {
960         header.exptime = UINT32_MAX; // 136 years
961       }
962     }
963     header.cas = ink_atomic_increment(&next_cas, 1);
964     {
965       char *localdata = nullptr;
966       int len         = 0;
967       // must be huge, why convert to a counter ??
968       if (cwvc->get_single_data((void **)&localdata, &len) < 0) {
969         goto Lfail;
970       }
971       uint64_t new_value = xatoull(localdata, localdata + len);
972       if (f.set_incr) {
973         new_value += delta;
974       } else {
975         if (delta > new_value) {
976           new_value = 0;
977         } else {
978           new_value -= delta;
979         }
980       }
981       char new_value_str_buffer[32], *e = &new_value_str_buffer[30];
982       e[0]    = '\r';
983       e[1]    = '\n';
984       char *s = xutoa(new_value, e);
985       creader = wbuf->clone_reader(writer);
986       wbuf->write(s, e - s + 2);
987       if (f.noreply) {
988         writer->consume(e - s + 2);
989       } else {
990         wvio->reenable();
991       }
992       MCDebugBuf("tsmemcache_ascii_response", s, e - s + 2);
993       header.nbytes = e - s;
994       cwvc->set_header(&header, header.len());
995       TS_PUSH_HANDLER(&MC::stream_event);
996       cwvio = cwvc->do_io_write(this, header.nbytes, creader);
997     }
998     return EVENT_CONT;
999   }
1000   case TSMEMCACHE_STREAM_DONE: {
1001     wbuf->dealloc_reader(creader);
1002     creader = 0;
1003     reader->consume(end_of_cmd);
1004     return read_from_client();
1005   }
1006   default:
1007     break;
1008   }
1009   return EVENT_CONT;
1010 Lfail:
1011   Warning("tsmemcache: bad cache data");
1012   return ASCII_RESPONSE("NOT_FOUND");
1013 }
1014 
1015 int
get_ascii_key(char * as,char * e)1016 MC::get_ascii_key(char *as, char *e)
1017 {
1018   char *s = as;
1019   // skip space
1020   while (*s == ' ') {
1021     s++;
1022     if (s >= e) {
1023       if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) {
1024         return ASCII_CLIENT_ERROR("bad command line");
1025       }
1026       return EVENT_CONT;
1027     }
1028   }
1029   // grab key
1030   key = s;
1031   while (!isspace(*s)) {
1032     if (s >= e) {
1033       if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) {
1034         return ASCII_RESPONSE("key too large");
1035       }
1036       return EVENT_CONT;
1037     }
1038     s++;
1039   }
1040   if (s - key > TSMEMCACHE_MAX_KEY_LEN) {
1041     return ASCII_CLIENT_ERROR("bad command line");
1042   }
1043   header.nkey = s - key;
1044   if (!header.nkey) {
1045     if (e - s >= 2) {
1046       if (*s == '\r') {
1047         s++;
1048       }
1049       if (*s == '\n' && ngets) {
1050         return ASCII_RESPONSE("END");
1051       }
1052       return ASCII_CLIENT_ERROR("bad command line");
1053     }
1054     return EVENT_CONT; // get some more
1055   }
1056   read_offset = s - as;
1057   return TSMEMCACHE_EVENT_GOT_KEY;
1058 }
1059 
1060 int
ascii_get(char * as,char * e)1061 MC::ascii_get(char *as, char *e)
1062 {
1063   SET_HANDLER(&MC::ascii_get_event);
1064   CHECK_RET(get_ascii_key(as, e), TSMEMCACHE_EVENT_GOT_KEY);
1065   ngets++;
1066   return get_item();
1067 }
1068 
1069 int
ascii_gets()1070 MC::ascii_gets()
1071 {
1072   int len = 0;
1073   char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len);
1074   return ascii_get(c, c + len);
1075 }
1076 
1077 #define SKIP_SPACE                                     \
1078   do {                                                 \
1079     while (*s == ' ') {                                \
1080       s++;                                             \
1081       if (s >= e)                                      \
1082         return ASCII_CLIENT_ERROR("bad command line"); \
1083     }                                                  \
1084   } while (0)
1085 
1086 #define SKIP_TOKEN                                     \
1087   do {                                                 \
1088     while (!isspace(*s)) {                             \
1089       s++;                                             \
1090       if (s >= e)                                      \
1091         return ASCII_CLIENT_ERROR("bad command line"); \
1092     }                                                  \
1093   } while (0)
1094 
1095 #define GET_NUM(_n)                                    \
1096   do {                                                 \
1097     if (isdigit(*s)) {                                 \
1098       _n = *s - '0';                                   \
1099       s++;                                             \
1100       if (s >= e)                                      \
1101         return ASCII_CLIENT_ERROR("bad command line"); \
1102     } else                                             \
1103       _n = 0;                                          \
1104     while (isdigit(*s)) {                              \
1105       _n *= 10;                                        \
1106       _n += *s - '0';                                  \
1107       s++;                                             \
1108       if (s >= e)                                      \
1109         return ASCII_CLIENT_ERROR("bad command line"); \
1110     }                                                  \
1111   } while (0)
1112 
1113 #define GET_SNUM(_n)                                   \
1114   do {                                                 \
1115     int neg = 0;                                       \
1116     if (*s == '-') {                                   \
1117       s++;                                             \
1118       neg = 1;                                         \
1119     }                                                  \
1120     if (isdigit(*s)) {                                 \
1121       _n = *s - '0';                                   \
1122       s++;                                             \
1123       if (s >= e)                                      \
1124         return ASCII_CLIENT_ERROR("bad command line"); \
1125     } else                                             \
1126       _n = 0;                                          \
1127     while (isdigit(*s)) {                              \
1128       _n *= 10;                                        \
1129       _n += *s - '0';                                  \
1130       s++;                                             \
1131       if (s >= e)                                      \
1132         return ASCII_CLIENT_ERROR("bad command line"); \
1133     }                                                  \
1134     if (neg)                                           \
1135       _n = -_n;                                        \
1136   } while (0)
1137 
1138 int
ascii_set(char * s,char * e)1139 MC::ascii_set(char *s, char *e)
1140 {
1141   SKIP_SPACE;
1142   key = s;
1143   SKIP_TOKEN;
1144   header.nkey = s - key;
1145   SKIP_SPACE;
1146   GET_NUM(header.flags);
1147   SKIP_SPACE;
1148   GET_SNUM(exptime);
1149   SKIP_SPACE;
1150   GET_NUM(nbytes);
1151   swallow_bytes = nbytes + 2; // assume failure
1152   if (f.set_cas) {
1153     SKIP_SPACE;
1154     GET_NUM(header.cas);
1155   } else {
1156     header.cas = 0;
1157   }
1158   SKIP_SPACE;
1159   if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1160     f.noreply = 1;
1161     s += 7;
1162     if (s >= e) {
1163       return ASCII_CLIENT_ERROR("bad command line");
1164     }
1165     SKIP_SPACE;
1166   }
1167   if (*s == '\r') {
1168     s++;
1169   }
1170   if (*s == '\n') {
1171     s++;
1172   }
1173   if (s != e) {
1174     return ASCII_CLIENT_ERROR("bad command line");
1175   }
1176   SET_HANDLER(&MC::ascii_set_event);
1177   if (f.set_append || f.set_prepend) {
1178     return get_item();
1179   } else {
1180     return set_item();
1181   }
1182 }
1183 
1184 int
ascii_delete(char * s,char * e)1185 MC::ascii_delete(char *s, char *e)
1186 {
1187   SKIP_SPACE;
1188   key = s;
1189   SKIP_TOKEN;
1190   header.nkey = s - key;
1191   SKIP_SPACE;
1192   if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1193     f.noreply = 1;
1194     s += 7;
1195     if (s >= e) {
1196       return ASCII_CLIENT_ERROR("bad command line");
1197     }
1198     SKIP_SPACE;
1199   }
1200   if (*s == '\r') {
1201     s++;
1202   }
1203   if (*s == '\n') {
1204     s++;
1205   }
1206   if (s != e) {
1207     return ASCII_CLIENT_ERROR("bad command line");
1208   }
1209   SET_HANDLER(&MC::ascii_delete_event);
1210   return delete_item();
1211 }
1212 
1213 int
ascii_incr_decr(char * s,char * e)1214 MC::ascii_incr_decr(char *s, char *e)
1215 {
1216   SKIP_SPACE;
1217   key = s;
1218   SKIP_TOKEN;
1219   header.nkey = s - key;
1220   SKIP_SPACE;
1221   GET_NUM(delta);
1222   SKIP_SPACE;
1223   if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1224     f.noreply = 1;
1225     s += 7;
1226     if (s >= e) {
1227       return ASCII_CLIENT_ERROR("bad command line");
1228     }
1229     SKIP_SPACE;
1230   }
1231   if (*s == '\r') {
1232     s++;
1233   }
1234   if (*s == '\n') {
1235     s++;
1236   }
1237   if (s != e) {
1238     return ASCII_CLIENT_ERROR("bad command line");
1239   }
1240   SET_HANDLER(&MC::ascii_incr_decr_event);
1241   return set_item();
1242 }
1243 
1244 static int
is_end_of_cmd(char * t,char * e)1245 is_end_of_cmd(char *t, char *e)
1246 {
1247   while (*t == ' ' && t < e) {
1248     t++; // skip spaces
1249   }
1250   if (*t == '\r') {
1251     t++;
1252   }
1253   if (t != e - 1) {
1254     return 0;
1255   }
1256   return 1;
1257 }
1258 
1259 // moves *pt past the noreply if it is found
1260 static int
is_noreply(char ** pt,char * e)1261 is_noreply(char **pt, char *e)
1262 {
1263   char *t = *pt;
1264   if (t < e - 8) {
1265     while (*t == ' ') {
1266       if (t > e - 8) {
1267         return 0;
1268       }
1269       t++;
1270     }
1271     if (t[0] == 'n' && !STRCMP(t + 1, "oreply") && isspace(t[7])) {
1272       *pt = t + sizeof("noreply") - 1;
1273       return 1;
1274     }
1275   }
1276   return 0;
1277 }
1278 
1279 int
read_ascii_from_client_event(int event,void * data)1280 MC::read_ascii_from_client_event(int event, void *data)
1281 {
1282   int len = 0;
1283   char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len), *s = c;
1284   MCDebugBuf("tsmemcache_ascii_cmd", c, len);
1285   char *e = c + len - 5; // at least 6 chars
1286   while (*s == ' ' && s < e) {
1287     s++; // skip leading spaces
1288   }
1289   if (s >= e) {
1290     if (len >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE || memchr(c, '\n', len)) {
1291       return ASCII_CLIENT_ERROR("bad command line");
1292     }
1293     return EVENT_CONT;
1294   }
1295   // gets can be large, so do not require the full cmd fit in the buffer
1296   e = c + len;
1297   switch (*s) {
1298   case 'g': // get gets
1299     if (s[3] == 's' && s[4] == ' ') {
1300       f.return_cas = 1;
1301       read_offset  = 5;
1302       goto Lget;
1303     } else if (s[3] == ' ') {
1304       read_offset = 4;
1305     Lget:
1306       reader->consume(read_offset);
1307       if (c != tmp_cmd_buffer) { // all in the block
1308         return ascii_get(s + read_offset, e);
1309       } else {
1310         return ascii_gets();
1311       }
1312     }
1313     break;
1314   case 'b': // bget
1315     if (s[4] != ' ') {
1316       break;
1317     }
1318     read_offset = 5;
1319     goto Lget;
1320     break;
1321   default:
1322     break;
1323   }
1324   // find the end of the command
1325   e = static_cast<char *>(memchr(s, '\n', len));
1326   if (!e) {
1327     if (reader->read_avail() > TSMEMCACHE_MAX_CMD_SIZE) {
1328       return ASCII_CLIENT_ERROR("bad command line");
1329     }
1330     return EVENT_CONT;
1331   }
1332   e++; // skip nl
1333   end_of_cmd = e - c;
1334   switch (*s) {
1335   case 's': // set stats
1336     if (s[1] == 'e' && s[2] == 't' && s[3] == ' ') {
1337       return ascii_set(s + sizeof("set") - 1, e);
1338     }
1339     if (STRCMP_REST("tats", s + 1, e)) {
1340       break;
1341     }
1342     s += sizeof("stats") - 1;
1343     if (is_noreply(&s, e)) {
1344       break; // to please memcapable
1345     } else {
1346       return ASCII_RESPONSE("END");
1347     }
1348   case 'a': // add
1349     if (s[1] == 'd' && s[2] == 'd' && s[3] == ' ') {
1350       f.set_add = 1;
1351       return ascii_set(s + sizeof("add") - 1, e);
1352     }
1353     if (STRCMP_REST("ppend", s + 1, e)) {
1354       break;
1355     }
1356     f.set_append = 1;
1357     return ascii_set(s + sizeof("append") - 1, e);
1358   case 'p': // prepend
1359     if (STRCMP_REST("repend", s + 1, e)) {
1360       break;
1361     }
1362     f.set_prepend = 1;
1363     return ascii_set(s + sizeof("prepend") - 1, e);
1364   case 'c': // cas
1365     if (s[1] == 'a' && s[2] == 's' && s[3] == ' ') {
1366       f.set_cas = 1;
1367       return ascii_set(s + sizeof("cas") - 1, e);
1368     }
1369     break;
1370   case 'i': // incr
1371     if (s[1] == 'n' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') {
1372       f.set_incr = 1;
1373       return ascii_incr_decr(s + sizeof("incr") - 1, e);
1374     }
1375     break;
1376   case 'f': { // flush_all
1377     if (STRCMP_REST("lush_all", s + 1, e)) {
1378       break;
1379     }
1380     s += sizeof("flush_all") - 1;
1381     SKIP_SPACE;
1382     int32_t time_offset = 0;
1383     if (isdigit(*s)) {
1384       GET_NUM(time_offset);
1385     }
1386     f.noreply                 = is_noreply(&s, e);
1387     ink_hrtime new_last_flush = Thread::get_hrtime() + HRTIME_SECONDS(time_offset);
1388 #if __WORDSIZE == 64
1389     last_flush = new_last_flush; // this will be atomic for native word size
1390 #else
1391     ink_atomic_swap(&last_flush, new_last_flush);
1392 #endif
1393     if (!is_end_of_cmd(s, e)) {
1394       break;
1395     }
1396     return ASCII_RESPONSE("OK");
1397   }
1398   case 'd': // delete decr
1399     if (e - s < 5) {
1400       break;
1401     }
1402     if (s[2] == 'l') {
1403       if (s[1] == 'e' && s[3] == 'e' && s[4] == 't' && s[5] == 'e' && s[6] == ' ') {
1404         return ascii_delete(s + sizeof("delete") - 1, e);
1405       }
1406     } else if (s[1] == 'e' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') { // decr
1407       f.set_decr = 1;
1408       return ascii_incr_decr(s + sizeof("decr") - 1, e);
1409     }
1410     break;
1411   case 'r': // replace
1412     if (STRCMP_REST("eplace", s + 1, e)) {
1413       break;
1414     }
1415     f.set_replace = 1;
1416     return ascii_set(s + sizeof("replace") - 1, e);
1417   case 'q': // quit
1418     if (STRCMP_REST("uit", s + 1, e)) {
1419       break;
1420     }
1421     if (!is_end_of_cmd(s + sizeof("quit") - 1, e)) {
1422       break;
1423     }
1424     return die();
1425   case 'v': { // version
1426     if (s[3] == 's') {
1427       if (STRCMP_REST("ersion", s + 1, e)) {
1428         break;
1429       }
1430       if (!is_end_of_cmd(s + sizeof("version") - 1, e)) {
1431         break;
1432       }
1433       return ASCII_RESPONSE("VERSION " TSMEMCACHE_VERSION);
1434     } else if (s[3] == 'b') {
1435       if (STRCMP_REST("erbosity", s + 1, e)) {
1436         break;
1437       }
1438       s += sizeof("verbosity") - 1;
1439       SKIP_SPACE;
1440       if (!isdigit(*s)) {
1441         break;
1442       }
1443       GET_NUM(verbosity);
1444       f.noreply = is_noreply(&s, e);
1445       if (!is_end_of_cmd(s, e)) {
1446         break;
1447       }
1448       return ASCII_RESPONSE("OK");
1449     }
1450     break;
1451   }
1452   }
1453   return ASCII_ERROR();
1454 }
1455 
1456 int
write_then_close_event(int event,void * data)1457 MC::write_then_close_event(int event, void *data)
1458 {
1459   switch (event) {
1460   case VC_EVENT_EOS:
1461     if ((VIO *)data == wvio) {
1462       break;
1463     }
1464   // fall through
1465   case VC_EVENT_READ_READY:
1466     return EVENT_DONE; // no more of that stuff
1467   case VC_EVENT_WRITE_READY:
1468     if (wvio->buffer.reader()->read_avail() > 0) {
1469       return EVENT_CONT;
1470     }
1471     break;
1472   default:
1473     break;
1474   }
1475   return die();
1476 }
1477 
1478 int
read_from_client_event(int event,void * data)1479 MC::read_from_client_event(int event, void *data)
1480 {
1481   switch (event) {
1482   case TSMEMCACHE_STREAM_DONE:
1483     return read_from_client();
1484   case VC_EVENT_READ_READY:
1485   case VC_EVENT_EOS:
1486     if (reader->read_avail() < 1) {
1487       return EVENT_CONT;
1488     }
1489     if ((uint8_t)reader->start()[0] == (uint8_t)PROTOCOL_BINARY_REQ) {
1490       return TS_SET_CALL(&MC::read_binary_from_client_event, event, data);
1491     } else {
1492       return TS_SET_CALL(&MC::read_ascii_from_client_event, event, data);
1493     }
1494   case VC_EVENT_WRITE_READY:
1495   case VC_EVENT_WRITE_COMPLETE:
1496     break;
1497   default:
1498     return die();
1499   }
1500   return EVENT_CONT;
1501 }
1502 
1503 // between client and cache
1504 int
stream_event(int event,void * data)1505 MC::stream_event(int event, void *data)
1506 {
1507   if (data == crvio || data == cwvio) {
1508     switch (event) {
1509     case VC_EVENT_READ_READY:
1510       wvio->reenable();
1511       break;
1512     case VC_EVENT_WRITE_READY:
1513       rvio->reenable();
1514       break;
1515     case VC_EVENT_WRITE_COMPLETE:
1516     case VC_EVENT_EOS:
1517     case VC_EVENT_READ_COMPLETE:
1518       return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0);
1519     default:
1520       return die();
1521     }
1522   } else {
1523     switch (event) {
1524     case VC_EVENT_READ_READY:
1525       if (cwvio) {
1526         if (creader != reader && creader->read_avail() < cwvio->nbytes) {
1527           int64_t a = reader->read_avail();
1528           if (a > static_cast<int64_t>(nbytes)) {
1529             a = static_cast<int64_t>(nbytes);
1530           }
1531           if (a) {
1532             cbuf->write(reader, a);
1533             reader->consume(a);
1534           }
1535         }
1536         cwvio->reenable();
1537       }
1538       break;
1539     case VC_EVENT_WRITE_READY:
1540       if (crvio) {
1541         crvio->reenable();
1542       }
1543       break;
1544     case VC_EVENT_WRITE_COMPLETE:
1545     case VC_EVENT_READ_COMPLETE:
1546       return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0);
1547     default:
1548       return die();
1549     }
1550   }
1551   return EVENT_CONT;
1552 }
1553 
1554 // cache to cache
1555 int
tunnel_event(int event,void * data)1556 MC::tunnel_event(int event, void *data)
1557 {
1558   MCDebug("tsmemcache", "tunnel %d %p crvio %p cwvio %p", event, data, crvio, cwvio);
1559   if (data == crvio) {
1560     switch (event) {
1561     case VC_EVENT_READ_READY:
1562       cwvio->reenable();
1563       break;
1564     case VC_EVENT_EOS:
1565     case VC_EVENT_READ_COMPLETE:
1566       if (cwvio->nbytes == cwvio->ndone + cwvio->buffer.reader()->read_avail()) {
1567         cwvio->reenable();
1568         return EVENT_CONT;
1569       }
1570       return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0);
1571     default:
1572       return die();
1573     }
1574   } else if (data == cwvio) {
1575     switch (event) {
1576     case VC_EVENT_WRITE_READY:
1577       crvio->reenable();
1578       break;
1579     case VC_EVENT_WRITE_COMPLETE:
1580     case VC_EVENT_EOS:
1581       return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0);
1582     default:
1583       return die();
1584     }
1585   } else { // network I/O
1586     switch (event) {
1587     case VC_EVENT_READ_READY:
1588     case VC_EVENT_WRITE_READY:
1589     case VC_EVENT_WRITE_COMPLETE:
1590     case VC_EVENT_READ_COMPLETE:
1591       return EVENT_CONT;
1592     default:
1593       return die();
1594     }
1595   }
1596   return EVENT_CONT;
1597 }
1598 
1599 int
init_tsmemcache(int port)1600 init_tsmemcache(int port)
1601 {
1602   tsmemcache_constants();
1603   MCAccept *a = new MCAccept;
1604   a->mutex    = new_ProxyMutex();
1605   NetProcessor::AcceptOptions options(NetProcessor::DEFAULT_ACCEPT_OPTIONS);
1606   options.local_port = a->accept_port = port;
1607   netProcessor.accept(a, options);
1608   return 0;
1609 }
1610 
1611 void
TSPluginInit(int argc,const char * argv[])1612 TSPluginInit(int argc, const char *argv[])
1613 {
1614   ink_assert(sizeof(protocol_binary_request_header) == 24);
1615 
1616   TSPluginRegistrationInfo info;
1617   info.plugin_name   = (char *)"tsmemcache";
1618   info.vendor_name   = (char *)"ats";
1619   info.support_email = (char *)"jplevyak@apache.org";
1620 
1621   int port = 11211;
1622 
1623   if (TSPluginRegister(&info) != TS_SUCCESS) {
1624     TSError("[PluginInit] tsmemcache registration failed.\n");
1625     goto error;
1626   }
1627 
1628   if (argc < 2) {
1629     TSError("[tsmemcache] Usage: tsmemcache.so [accept_port]\n");
1630     goto error;
1631   } else {
1632     int port = atoi(argv[1]);
1633     if (!port) {
1634       TSError("[tsmemcache] bad accept_port '%s'\n", argv[1]);
1635       goto error;
1636     }
1637     MCDebug("tsmemcache", "using accept_port %d", port);
1638   }
1639   init_tsmemcache(port);
1640   return;
1641 
1642 error:
1643   TSError("[PluginInit] Plugin not initialized");
1644 }
1645