xref: /trafficserver/iocore/cache/Cache.cc (revision e2642f99)
1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_Cache.h"
25 
26 // Cache Inspector and State Pages
27 #include "P_CacheTest.h"
28 #include "StatPages.h"
29 
30 #include "tscore/I_Layout.h"
31 #include "tscore/Filenames.h"
32 
33 #include "HttpTransactCache.h"
34 #include "HttpSM.h"
35 #include "HttpCacheSM.h"
36 #include "InkAPIInternal.h"
37 
38 #include "tscore/hugepages.h"
39 
40 #include <atomic>
41 
42 constexpr ts::VersionNumber CACHE_DB_VERSION(CACHE_DB_MAJOR_VERSION, CACHE_DB_MINOR_VERSION);
43 
44 // Compilation Options
45 #define USELESS_REENABLES // allow them for now
46 // #define VERIFY_JTEST_DATA
47 
48 static size_t DEFAULT_RAM_CACHE_MULTIPLIER = 10; // I.e. 10x 1MB per 1GB of disk.
49 
50 // This is the oldest version number that is still usable.
51 static short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
52 
53 #define DOCACHE_CLEAR_DYN_STAT(x)  \
54   do {                             \
55     RecSetRawStatSum(rsb, x, 0);   \
56     RecSetRawStatCount(rsb, x, 0); \
57   } while (0);
58 
59 // Configuration
60 
61 int64_t cache_config_ram_cache_size            = AUTO_SIZE_RAM_CACHE;
62 int cache_config_ram_cache_algorithm           = 1;
63 int cache_config_ram_cache_compress            = 0;
64 int cache_config_ram_cache_compress_percent    = 90;
65 int cache_config_ram_cache_use_seen_filter     = 1;
66 int cache_config_http_max_alts                 = 3;
67 int cache_config_dir_sync_frequency            = 60;
68 int cache_config_permit_pinning                = 0;
69 int cache_config_select_alternate              = 1;
70 int cache_config_max_doc_size                  = 0;
71 int cache_config_min_average_object_size       = ESTIMATED_OBJECT_SIZE;
72 int64_t cache_config_ram_cache_cutoff          = AGG_SIZE;
73 int cache_config_max_disk_errors               = 5;
74 int cache_config_hit_evacuate_percent          = 10;
75 int cache_config_hit_evacuate_size_limit       = 0;
76 int cache_config_force_sector_size             = 0;
77 int cache_config_target_fragment_size          = DEFAULT_TARGET_FRAGMENT_SIZE;
78 int cache_config_agg_write_backlog             = AGG_SIZE * 2;
79 int cache_config_enable_checksum               = 0;
80 int cache_config_alt_rewrite_max_size          = 4096;
81 int cache_config_read_while_writer             = 0;
82 int cache_config_mutex_retry_delay             = 2;
83 int cache_read_while_writer_retry_delay        = 50;
84 int cache_config_read_while_writer_max_retries = 10;
85 
86 // Globals
87 
88 RecRawStatBlock *cache_rsb          = nullptr;
89 Cache *theCache                     = nullptr;
90 CacheDisk **gdisks                  = nullptr;
91 int gndisks                         = 0;
92 std::atomic<int> initialize_disk    = 0;
93 Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr};
94 CacheSync *cacheDirSync             = nullptr;
95 Store theCacheStore;
96 int CacheProcessor::initialized          = CACHE_INITIALIZING;
97 uint32_t CacheProcessor::cache_ready     = 0;
98 int CacheProcessor::start_done           = 0;
99 bool CacheProcessor::clear               = false;
100 bool CacheProcessor::fix                 = false;
101 bool CacheProcessor::check               = false;
102 int CacheProcessor::start_internal_flags = 0;
103 int CacheProcessor::auto_clear_flag      = 0;
104 CacheProcessor cacheProcessor;
105 Vol **gvol             = nullptr;
106 std::atomic<int> gnvol = 0;
107 ClassAllocator<CacheVC> cacheVConnectionAllocator("cacheVConnection");
108 ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock");
109 ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont");
110 ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey");
111 int CacheVC::size_to_init = -1;
112 CacheKey zero_key;
113 
114 struct VolInitInfo {
115   off_t recover_pos;
116   AIOCallbackInternal vol_aio[4];
117   char *vol_h_f;
118 
VolInitInfoVolInitInfo119   VolInitInfo()
120   {
121     recover_pos = 0;
122     vol_h_f     = static_cast<char *>(ats_memalign(ats_pagesize(), 4 * STORE_BLOCK_SIZE));
123     memset(vol_h_f, 0, 4 * STORE_BLOCK_SIZE);
124   }
125 
~VolInitInfoVolInitInfo126   ~VolInitInfo()
127   {
128     for (auto &i : vol_aio) {
129       i.action = nullptr;
130       i.mutex.clear();
131     }
132     free(vol_h_f);
133   }
134 };
135 
136 #if AIO_MODE == AIO_MODE_NATIVE
137 struct VolInit : public Continuation {
138   Vol *vol;
139   char *path;
140   off_t blocks;
141   int64_t offset;
142   bool vol_clear;
143 
144   int
mainEventVolInit145   mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
146   {
147     vol->init(path, blocks, offset, vol_clear);
148     mutex.clear();
149     delete this;
150     return EVENT_DONE;
151   }
152 
VolInitVolInit153   VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex), vol(v), path(p), blocks(b), offset(o), vol_clear(c)
154   {
155     SET_HANDLER(&VolInit::mainEvent);
156   }
157 };
158 
159 struct DiskInit : public Continuation {
160   CacheDisk *disk;
161   char *s;
162   off_t blocks;
163   off_t askip;
164   int ahw_sector_size;
165   int fildes;
166   bool clear;
167 
168   int
mainEventDiskInit169   mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
170   {
171     disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
172     ats_free(s);
173     mutex.clear();
174     delete this;
175     return EVENT_DONE;
176   }
177 
DiskInitDiskInit178   DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c)
179     : Continuation(d->mutex), disk(d), s(ats_strdup(str)), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c)
180   {
181     SET_HANDLER(&DiskInit::mainEvent);
182   }
183 };
184 #endif
185 void cplist_init();
186 static void cplist_update();
187 int cplist_reconfigure();
188 static int create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp);
189 static void rebuild_host_table(Cache *cache);
190 void register_cache_stats(RecRawStatBlock *rsb, const char *prefix);
191 
192 // Global list of the volumes created
193 Queue<CacheVol> cp_list;
194 int cp_list_len = 0;
195 ConfigVolumes config_volumes;
196 
197 #if TS_HAS_TESTS
198 void
force_link_CacheTestCaller()199 force_link_CacheTestCaller()
200 {
201   force_link_CacheTest();
202 }
203 #endif
204 
205 int64_t
cache_bytes_used(int volume)206 cache_bytes_used(int volume)
207 {
208   uint64_t used = 0;
209 
210   for (int i = 0; i < gnvol; i++) {
211     if (!DISK_BAD(gvol[i]->disk) && (volume == -1 || gvol[i]->cache_vol->vol_number == volume)) {
212       if (!gvol[i]->header->cycle) {
213         used += gvol[i]->header->write_pos - gvol[i]->start;
214       } else {
215         used += gvol[i]->len - gvol[i]->dirlen() - EVACUATION_SIZE;
216       }
217     }
218   }
219 
220   return used;
221 }
222 
223 int
cache_stats_bytes_used_cb(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)224 cache_stats_bytes_used_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
225 {
226   int volume = -1;
227   char *p;
228 
229   // Well, there's no way to pass along the volume ID, so extracting it from the stat name.
230   p = strstr(const_cast<char *>(name), "volume_");
231   if (p != nullptr) {
232     // I'm counting on the compiler to optimize out strlen("volume_").
233     volume = strtol(p + strlen("volume_"), nullptr, 10);
234   }
235 
236   if (cacheProcessor.initialized == CACHE_INITIALIZED) {
237     int64_t used, total = 0;
238     float percent_full;
239 
240     used = cache_bytes_used(volume);
241     RecSetGlobalRawStatSum(rsb, id, used);
242     RecRawStatSyncSum(name, data_type, data, rsb, id);
243     RecGetGlobalRawStatSum(rsb, static_cast<int>(cache_bytes_total_stat), &total);
244     percent_full = static_cast<float>(used) / static_cast<float>(total) * 100;
245     // The percent_full float below gets rounded down
246     RecSetGlobalRawStatSum(rsb, static_cast<int>(cache_percent_full_stat), static_cast<int64_t>(percent_full));
247   }
248 
249   return 1;
250 }
251 
252 static int
validate_rww(int new_value)253 validate_rww(int new_value)
254 {
255   if (new_value) {
256     float http_bg_fill;
257 
258     REC_ReadConfigFloat(http_bg_fill, "proxy.config.http.background_fill_completed_threshold");
259     if (http_bg_fill > 0.0) {
260       Note("to enable reading while writing a document, %s should be 0.0: read while writing disabled",
261            "proxy.config.http.background_fill_completed_threshold");
262       return 0;
263     }
264     if (cache_config_max_doc_size > 0) {
265       Note("to enable reading while writing a document, %s should be 0: read while writing disabled",
266            "proxy.config.cache.max_doc_size");
267       return 0;
268     }
269     return new_value;
270   }
271   return 0;
272 }
273 
274 static int
update_cache_config(const char *,RecDataT,RecData data,void *)275 update_cache_config(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data,
276                     void * /* cookie ATS_UNUSED */)
277 {
278   int new_value                  = validate_rww(data.rec_int);
279   cache_config_read_while_writer = new_value;
280 
281   return 0;
282 }
283 
CacheVC()284 CacheVC::CacheVC()
285 {
286   size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *)nullptr)->vio;
287   memset((void *)&vio, 0, size_to_init);
288 }
289 
290 HTTPInfo::FragOffset *
get_frag_table()291 CacheVC::get_frag_table()
292 {
293   ink_assert(alternate.valid());
294   return alternate.valid() ? alternate.get_frag_table() : nullptr;
295 }
296 
297 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * abuf)298 CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
299 {
300   ink_assert(vio.op == VIO::READ);
301   vio.buffer.writer_for(abuf);
302   vio.set_continuation(c);
303   vio.ndone     = 0;
304   vio.nbytes    = nbytes;
305   vio.vc_server = this;
306 #ifdef DEBUG
307   ink_assert(!c || c->mutex->thread_holding);
308 #endif
309   if (c && !trigger && !recursive) {
310     trigger = c->mutex->thread_holding->schedule_imm_local(this);
311   }
312   return &vio;
313 }
314 
315 VIO *
do_io_pread(Continuation * c,int64_t nbytes,MIOBuffer * abuf,int64_t offset)316 CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
317 {
318   ink_assert(vio.op == VIO::READ);
319   vio.buffer.writer_for(abuf);
320   vio.set_continuation(c);
321   vio.ndone     = 0;
322   vio.nbytes    = nbytes;
323   vio.vc_server = this;
324   seek_to       = offset;
325 #ifdef DEBUG
326   ink_assert(c->mutex->thread_holding);
327 #endif
328   if (!trigger && !recursive) {
329     trigger = c->mutex->thread_holding->schedule_imm_local(this);
330   }
331   return &vio;
332 }
333 
334 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuf,bool owner)335 CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
336 {
337   ink_assert(vio.op == VIO::WRITE);
338   ink_assert(!owner);
339   vio.buffer.reader_for(abuf);
340   vio.set_continuation(c);
341   vio.ndone     = 0;
342   vio.nbytes    = nbytes;
343   vio.vc_server = this;
344 #ifdef DEBUG
345   ink_assert(!c || c->mutex->thread_holding);
346 #endif
347   if (c && !trigger && !recursive) {
348     trigger = c->mutex->thread_holding->schedule_imm_local(this);
349   }
350   return &vio;
351 }
352 
353 void
do_io_close(int alerrno)354 CacheVC::do_io_close(int alerrno)
355 {
356   ink_assert(mutex->thread_holding == this_ethread());
357   int previous_closed = closed;
358   closed              = (alerrno == -1) ? 1 : -1; // Stupid default arguments
359   DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed);
360   if (!previous_closed && !recursive) {
361     die();
362   }
363 }
364 
365 void
reenable(VIO * avio)366 CacheVC::reenable(VIO *avio)
367 {
368   DDebug("cache_reenable", "reenable %p", this);
369   (void)avio;
370 #ifdef DEBUG
371   ink_assert(avio->mutex->thread_holding);
372 #endif
373   if (!trigger) {
374 #ifndef USELESS_REENABLES
375     if (vio.op == VIO::READ) {
376       if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark)
377         ink_assert(!"useless reenable of cache read");
378     } else if (!vio.buffer.reader()->read_avail())
379       ink_assert(!"useless reenable of cache write");
380 #endif
381     trigger = avio->mutex->thread_holding->schedule_imm_local(this);
382   }
383 }
384 
385 void
reenable_re(VIO * avio)386 CacheVC::reenable_re(VIO *avio)
387 {
388   DDebug("cache_reenable", "reenable_re %p", this);
389   (void)avio;
390 #ifdef DEBUG
391   ink_assert(avio->mutex->thread_holding);
392 #endif
393   if (!trigger) {
394     if (!is_io_in_progress() && !recursive) {
395       handleEvent(EVENT_NONE, (void *)nullptr);
396     } else {
397       trigger = avio->mutex->thread_holding->schedule_imm_local(this);
398     }
399   }
400 }
401 
402 bool
get_data(int i,void * data)403 CacheVC::get_data(int i, void *data)
404 {
405   switch (i) {
406   case CACHE_DATA_HTTP_INFO:
407     *(static_cast<CacheHTTPInfo **>(data)) = &alternate;
408     return true;
409   case CACHE_DATA_RAM_CACHE_HIT_FLAG:
410     *(static_cast<int *>(data)) = !f.not_from_ram_cache;
411     return true;
412   default:
413     break;
414   }
415   return false;
416 }
417 
418 int64_t
get_object_size()419 CacheVC::get_object_size()
420 {
421   return (this)->doc_len;
422 }
423 
424 bool
set_data(int,void *)425 CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */)
426 {
427   ink_assert(!"CacheVC::set_data should not be called!");
428   return true;
429 }
430 
431 void
get_http_info(CacheHTTPInfo ** ainfo)432 CacheVC::get_http_info(CacheHTTPInfo **ainfo)
433 {
434   *ainfo = &(this)->alternate;
435 }
436 
437 // set_http_info must be called before do_io_write
438 // cluster vc does an optimization where it calls do_io_write() before
439 // calling set_http_info(), but it guarantees that the info will
440 // be set before transferring any bytes
441 void
set_http_info(CacheHTTPInfo * ainfo)442 CacheVC::set_http_info(CacheHTTPInfo *ainfo)
443 {
444   ink_assert(!total_len);
445   if (f.update) {
446     ainfo->object_key_set(update_key);
447     ainfo->object_size_set(update_len);
448   } else {
449     ainfo->object_key_set(earliest_key);
450     // don't know the total len yet
451   }
452 
453   MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
454   if (field && !field->value_get_int64()) {
455     f.allow_empty_doc = 1;
456   } else {
457     f.allow_empty_doc = 0;
458   }
459 
460   alternate.copy_shallow(ainfo);
461   ainfo->clear();
462 }
463 
464 bool
set_pin_in_cache(time_t time_pin)465 CacheVC::set_pin_in_cache(time_t time_pin)
466 {
467   if (total_len) {
468     ink_assert(!"should Pin the document before writing");
469     return false;
470   }
471   if (vio.op != VIO::WRITE) {
472     ink_assert(!"Pinning only allowed while writing objects to the cache");
473     return false;
474   }
475   pin_in_cache = time_pin;
476   return true;
477 }
478 
479 time_t
get_pin_in_cache()480 CacheVC::get_pin_in_cache()
481 {
482   return pin_in_cache;
483 }
484 
485 int
begin_read(CacheVC * cont)486 Vol::begin_read(CacheVC *cont)
487 {
488   ink_assert(cont->mutex->thread_holding == this_ethread());
489   ink_assert(mutex->thread_holding == this_ethread());
490 #ifdef CACHE_STAT_PAGES
491   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
492   stat_cache_vcs.enqueue(cont, cont->stat_link);
493 #endif
494   // no need for evacuation as the entire document is already in memory
495   if (cont->f.single_fragment) {
496     return 0;
497   }
498   int i = dir_evac_bucket(&cont->earliest_dir);
499   EvacuationBlock *b;
500   for (b = evacuate[i].head; b; b = b->link.next) {
501     if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
502       continue;
503     }
504     if (b->readers) {
505       b->readers = b->readers + 1;
506     }
507     return 0;
508   }
509   // we don't actually need to preserve this block as it is already in
510   // memory, but this is easier, and evacuations are rare
511   EThread *t        = cont->mutex->thread_holding;
512   b                 = new_EvacuationBlock(t);
513   b->readers        = 1;
514   b->dir            = cont->earliest_dir;
515   b->evac_frags.key = cont->earliest_key;
516   evacuate[i].push(b);
517   return 1;
518 }
519 
520 int
close_read(CacheVC * cont)521 Vol::close_read(CacheVC *cont)
522 {
523   EThread *t = cont->mutex->thread_holding;
524   ink_assert(t == this_ethread());
525   ink_assert(t == mutex->thread_holding);
526   if (dir_is_empty(&cont->earliest_dir)) {
527     return 1;
528   }
529   int i = dir_evac_bucket(&cont->earliest_dir);
530   EvacuationBlock *b;
531   for (b = evacuate[i].head; b;) {
532     EvacuationBlock *next = b->link.next;
533     if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
534       b = next;
535       continue;
536     }
537     if (b->readers && !--b->readers) {
538       evacuate[i].remove(b);
539       free_EvacuationBlock(b, t);
540       break;
541     }
542     b = next;
543   }
544 #ifdef CACHE_STAT_PAGES
545   stat_cache_vcs.remove(cont, cont->stat_link);
546   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
547 #endif
548   return 1;
549 }
550 
551 // Cache Processor
552 
553 int
start(int,size_t)554 CacheProcessor::start(int, size_t)
555 {
556   return start_internal(0);
557 }
558 
559 static const int DEFAULT_CACHE_OPTIONS = (O_RDWR);
560 
561 int
start_internal(int flags)562 CacheProcessor::start_internal(int flags)
563 {
564   ink_assert((int)TS_EVENT_CACHE_OPEN_READ == (int)CACHE_EVENT_OPEN_READ);
565   ink_assert((int)TS_EVENT_CACHE_OPEN_READ_FAILED == (int)CACHE_EVENT_OPEN_READ_FAILED);
566   ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE == (int)CACHE_EVENT_OPEN_WRITE);
567   ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE_FAILED == (int)CACHE_EVENT_OPEN_WRITE_FAILED);
568   ink_assert((int)TS_EVENT_CACHE_REMOVE == (int)CACHE_EVENT_REMOVE);
569   ink_assert((int)TS_EVENT_CACHE_REMOVE_FAILED == (int)CACHE_EVENT_REMOVE_FAILED);
570   ink_assert((int)TS_EVENT_CACHE_SCAN == (int)CACHE_EVENT_SCAN);
571   ink_assert((int)TS_EVENT_CACHE_SCAN_FAILED == (int)CACHE_EVENT_SCAN_FAILED);
572   ink_assert((int)TS_EVENT_CACHE_SCAN_OBJECT == (int)CACHE_EVENT_SCAN_OBJECT);
573   ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED == (int)CACHE_EVENT_SCAN_OPERATION_BLOCKED);
574   ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_FAILED == (int)CACHE_EVENT_SCAN_OPERATION_FAILED);
575   ink_assert((int)TS_EVENT_CACHE_SCAN_DONE == (int)CACHE_EVENT_SCAN_DONE);
576 
577 #if AIO_MODE == AIO_MODE_NATIVE
578   int etype            = ET_NET;
579   int n_netthreads     = eventProcessor.n_threads_for_type[etype];
580   EThread **netthreads = eventProcessor.eventthread[etype];
581   for (int i = 0; i < n_netthreads; ++i) {
582     netthreads[i]->diskHandler = new DiskHandler();
583     netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
584   }
585 #endif
586 
587   start_internal_flags = flags;
588   clear                = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
589   fix                  = !!(flags & PROCESSOR_FIX);
590   check                = (flags & PROCESSOR_CHECK) != 0;
591   start_done           = 0;
592   Span *sd;
593 
594   /* read the config file and create the data structures corresponding
595      to the file */
596   gndisks = theCacheStore.n_disks;
597   gdisks  = static_cast<CacheDisk **>(ats_malloc(gndisks * sizeof(CacheDisk *)));
598 
599   gndisks = 0;
600   ink_aio_set_callback(new AIO_Callback_handler());
601 
602   config_volumes.read_config_file();
603 
604   /*
605    create CacheDisk objects for each span in the configuration file and store in gdisks
606    */
607   for (unsigned i = 0; i < theCacheStore.n_disks; i++) {
608     sd = theCacheStore.disk[i];
609     char path[PATH_NAME_MAX];
610     int opts = DEFAULT_CACHE_OPTIONS;
611 
612     ink_strlcpy(path, sd->pathname, sizeof(path));
613     if (!sd->file_pathname) {
614       ink_strlcat(path, "/cache.db", sizeof(path));
615       opts |= O_CREAT;
616     }
617 
618 #ifdef O_DIRECT
619     opts |= O_DIRECT;
620 #endif
621 #ifdef O_DSYNC
622     opts |= O_DSYNC;
623 #endif
624     if (check) {
625       opts &= ~O_CREAT;
626       opts |= O_RDONLY;
627     }
628 
629     int fd         = open(path, opts, 0644);
630     int64_t blocks = sd->blocks;
631 
632     if (fd < 0 && (opts & O_CREAT)) { // Try without O_DIRECT if this is a file on filesystem, e.g. tmpfs.
633       fd = open(path, DEFAULT_CACHE_OPTIONS | O_CREAT, 0644);
634     }
635 
636     if (fd >= 0) {
637       bool diskok = true;
638       if (!sd->file_pathname) {
639         if (!check) {
640           if (ftruncate(fd, blocks * STORE_BLOCK_SIZE) < 0) {
641             Warning("unable to truncate cache file '%s' to %" PRId64 " blocks", path, blocks);
642             diskok = false;
643           }
644         } else { // read-only mode checks
645           struct stat sbuf;
646           if (-1 == fstat(fd, &sbuf)) {
647             fprintf(stderr, "Failed to stat cache file for directory %s\n", path);
648             diskok = false;
649           } else if (blocks != sbuf.st_size / STORE_BLOCK_SIZE) {
650             fprintf(stderr, "Cache file for directory %s is %" PRId64 " bytes, expected %" PRId64 "\n", path, sbuf.st_size,
651                     blocks * static_cast<int64_t>(STORE_BLOCK_SIZE));
652             diskok = false;
653           }
654         }
655       }
656       if (diskok) {
657         int sector_size = sd->hw_sector_size;
658 
659         gdisks[gndisks] = new CacheDisk();
660         if (check) {
661           gdisks[gndisks]->read_only_p = true;
662         }
663         gdisks[gndisks]->forced_volume_num = sd->forced_volume_num;
664         if (sd->hash_base_string) {
665           gdisks[gndisks]->hash_base_string = ats_strdup(sd->hash_base_string);
666         }
667 
668         if (sector_size < cache_config_force_sector_size) {
669           sector_size = cache_config_force_sector_size;
670         }
671 
672         // It's actually common that the hardware I/O size is larger than the store block size as
673         // storage systems increasingly want larger I/Os. For example, on macOS, the filesystem block
674         // size is always reported as 1MB.
675         if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) {
676           Note("resetting hardware sector size from %d to %d", sector_size, STORE_BLOCK_SIZE);
677           sector_size = STORE_BLOCK_SIZE;
678         }
679 
680         off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset));
681         blocks     = blocks - (skip >> STORE_BLOCK_SHIFT);
682 #if AIO_MODE == AIO_MODE_NATIVE
683         eventProcessor.schedule_imm(new DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd, clear));
684 #else
685         gdisks[gndisks]->open(path, blocks, skip, sector_size, fd, clear);
686 #endif
687 
688         Debug("cache_hosting", "Disk: %d:%s, blocks: %" PRId64 "", gndisks, path, blocks);
689         fd = -1;
690         gndisks++;
691       }
692     } else {
693       if (errno == EINVAL) {
694         Warning("cache unable to open '%s': It must be placed on a file system that supports direct I/O.", path);
695       } else {
696         Warning("cache unable to open '%s': %s", path, strerror(errno));
697       }
698     }
699     if (fd >= 0) {
700       close(fd);
701     }
702   }
703 
704   start_done = 1;
705 
706   if (gndisks == 0) {
707     CacheProcessor::initialized = CACHE_INIT_FAILED;
708     // Have to do this here because no IO events were scheduled and so @c diskInitialized() won't be called.
709     if (cb_after_init) {
710       cb_after_init();
711     }
712 
713     if (this->waitForCache() > 1) {
714       Fatal("Cache initialization failed - no disks available but cache required");
715     } else {
716       Warning("unable to open cache disk(s): Cache Disabled\n");
717       return -1; // pointless, AFAICT this is ignored.
718     }
719   } else if (this->waitForCache() == 3 && static_cast<unsigned int>(gndisks) < theCacheStore.n_disks_in_config) {
720     CacheProcessor::initialized = CACHE_INIT_FAILED;
721     if (cb_after_init) {
722       cb_after_init();
723     }
724     Fatal("Cache initialization failed - only %d out of %d disks were valid and all were required.", gndisks,
725           theCacheStore.n_disks_in_config);
726   }
727 
728   return 0;
729 }
730 
731 void
diskInitialized()732 CacheProcessor::diskInitialized()
733 {
734   int n_init    = initialize_disk++;
735   int bad_disks = 0;
736   int res       = 0;
737   int i;
738 
739   // Wait for all the cache disks are initialized
740   if (n_init != gndisks - 1) {
741     return;
742   }
743 
744   // Check and remove bad disks from gdisks[]
745   for (i = 0; i < gndisks; i++) {
746     if (DISK_BAD(gdisks[i])) {
747       delete gdisks[i];
748       gdisks[i] = nullptr;
749       bad_disks++;
750     } else if (bad_disks > 0) {
751       gdisks[i - bad_disks] = gdisks[i];
752       gdisks[i]             = nullptr;
753     }
754   }
755   if (bad_disks > 0) {
756     // Update the number of available cache disks
757     gndisks -= bad_disks;
758     // Check if this is a fatal error
759     if (this->waitForCache() == 3 || (0 == gndisks && this->waitForCache() == 2)) {
760       // This could be passed off to @c cacheInitialized (as with volume config problems) but I think
761       // the more specific error message here is worth the extra code.
762       CacheProcessor::initialized = CACHE_INIT_FAILED;
763       if (cb_after_init) {
764         cb_after_init();
765       }
766       Fatal("Cache initialization failed - only %d of %d disks were available.", gndisks, theCacheStore.n_disks_in_config);
767     }
768   }
769 
770   /* Practically just took all bad_disks offline so update the stats. */
771   RecSetGlobalRawStatSum(cache_rsb, cache_span_offline_stat, bad_disks);
772   RecIncrGlobalRawStat(cache_rsb, cache_span_failing_stat, -bad_disks);
773   RecSetGlobalRawStatSum(cache_rsb, cache_span_online_stat, gndisks);
774 
775   /* create the cachevol list only if num volumes are greater than 0. */
776   if (config_volumes.num_volumes == 0) {
777     /* if no volumes, default to just an http cache */
778     res = cplist_reconfigure();
779   } else {
780     // else
781     /* create the cachevol list. */
782     cplist_init();
783     /* now change the cachevol list based on the config file */
784     res = cplist_reconfigure();
785   }
786 
787   if (res == -1) {
788     /* problems initializing the volume.config. Punt */
789     gnvol = 0;
790     cacheInitialized();
791     return;
792   } else {
793     CacheVol *cp = cp_list.head;
794     for (; cp; cp = cp->link.next) {
795       cp->vol_rsb = RecAllocateRawStatBlock(static_cast<int>(cache_stat_count));
796       char vol_stat_str_prefix[256];
797       snprintf(vol_stat_str_prefix, sizeof(vol_stat_str_prefix), "proxy.process.cache.volume_%d", cp->vol_number);
798       register_cache_stats(cp->vol_rsb, vol_stat_str_prefix);
799     }
800   }
801 
802   gvol = static_cast<Vol **>(ats_malloc(gnvol * sizeof(Vol *)));
803   memset(gvol, 0, gnvol * sizeof(Vol *));
804   gnvol = 0;
805   for (i = 0; i < gndisks; i++) {
806     CacheDisk *d = gdisks[i];
807     if (is_debug_tag_set("cache_hosting")) {
808       int j;
809       Debug("cache_hosting", "Disk: %d:%s: Vol Blocks: %u: Free space: %" PRIu64, i, d->path, d->header->num_diskvol_blks,
810             d->free_space);
811       for (j = 0; j < static_cast<int>(d->header->num_volumes); j++) {
812         Debug("cache_hosting", "\tVol: %d Size: %" PRIu64, d->disk_vols[j]->vol_number, d->disk_vols[j]->size);
813       }
814       for (j = 0; j < static_cast<int>(d->header->num_diskvol_blks); j++) {
815         Debug("cache_hosting", "\tBlock No: %d Size: %" PRIu64 " Free: %u", d->header->vol_info[j].number,
816               d->header->vol_info[j].len, d->header->vol_info[j].free);
817       }
818     }
819     if (!check) {
820       d->sync();
821     }
822   }
823   if (config_volumes.num_volumes == 0) {
824     theCache         = new Cache();
825     theCache->scheme = CACHE_HTTP_TYPE;
826     theCache->open(clear, fix);
827     return;
828   }
829   if (config_volumes.num_http_volumes != 0) {
830     theCache         = new Cache();
831     theCache->scheme = CACHE_HTTP_TYPE;
832     theCache->open(clear, fix);
833   }
834 }
835 
836 void
cacheInitialized()837 CacheProcessor::cacheInitialized()
838 {
839   int i;
840 
841   if (theCache && (theCache->ready == CACHE_INITIALIZING)) {
842     return;
843   }
844 
845   int caches_ready  = 0;
846   int cache_init_ok = 0;
847   /* allocate ram size in proportion to the disk space the
848      volume occupies */
849   int64_t total_size             = 0; // count in HTTP & MIXT
850   uint64_t total_cache_bytes     = 0; // bytes that can used in total_size
851   uint64_t total_direntries      = 0; // all the direntries in the cache
852   uint64_t used_direntries       = 0; //   and used
853   uint64_t vol_total_cache_bytes = 0;
854   uint64_t vol_total_direntries  = 0;
855   uint64_t vol_used_direntries   = 0;
856   Vol *vol;
857 
858   ProxyMutex *mutex = this_ethread()->mutex.get();
859 
860   if (theCache) {
861     total_size += theCache->cache_size;
862     Debug("cache_init", "CacheProcessor::cacheInitialized - theCache, total_size = %" PRId64 " = %" PRId64 " MB", total_size,
863           total_size / ((1024 * 1024) / STORE_BLOCK_SIZE));
864     if (theCache->ready == CACHE_INIT_FAILED) {
865       Debug("cache_init", "CacheProcessor::cacheInitialized - failed to initialize the cache for http: cache disabled");
866       Warning("failed to initialize the cache for http: cache disabled\n");
867     } else {
868       caches_ready                 = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
869       caches_ready                 = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
870       caches[CACHE_FRAG_TYPE_HTTP] = theCache;
871       caches[CACHE_FRAG_TYPE_NONE] = theCache;
872     }
873   }
874 
875   // Update stripe version data.
876   if (gnvol) { // start with whatever the first stripe is.
877     cacheProcessor.min_stripe_version = cacheProcessor.max_stripe_version = gvol[0]->header->version;
878   }
879   // scan the rest of the stripes.
880   for (i = 1; i < gnvol; i++) {
881     Vol *v = gvol[i];
882     if (v->header->version < cacheProcessor.min_stripe_version) {
883       cacheProcessor.min_stripe_version = v->header->version;
884     }
885     if (cacheProcessor.max_stripe_version < v->header->version) {
886       cacheProcessor.max_stripe_version = v->header->version;
887     }
888   }
889 
890   if (caches_ready) {
891     Debug("cache_init", "CacheProcessor::cacheInitialized - caches_ready=0x%0X, gnvol=%d", (unsigned int)caches_ready,
892           gnvol.load());
893 
894     int64_t ram_cache_bytes = 0;
895 
896     if (gnvol) {
897       // new ram_caches, with algorithm from the config
898       for (i = 0; i < gnvol; i++) {
899         switch (cache_config_ram_cache_algorithm) {
900         default:
901         case RAM_CACHE_ALGORITHM_CLFUS:
902           gvol[i]->ram_cache = new_RamCacheCLFUS();
903           break;
904         case RAM_CACHE_ALGORITHM_LRU:
905           gvol[i]->ram_cache = new_RamCacheLRU();
906           break;
907         }
908       }
909       // let us calculate the Size
910       if (cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE) {
911         Debug("cache_init", "CacheProcessor::cacheInitialized - cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE");
912         for (i = 0; i < gnvol; i++) {
913           vol = gvol[i];
914           gvol[i]->ram_cache->init(vol->dirlen() * DEFAULT_RAM_CACHE_MULTIPLIER, vol);
915           ram_cache_bytes += gvol[i]->dirlen();
916           Debug("cache_init", "CacheProcessor::cacheInitialized - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", ram_cache_bytes,
917                 ram_cache_bytes / (1024 * 1024));
918           CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)gvol[i]->dirlen());
919 
920           vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen();
921           total_cache_bytes += vol_total_cache_bytes;
922           Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
923                 total_cache_bytes, total_cache_bytes / (1024 * 1024));
924 
925           CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
926 
927           vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
928           total_direntries += vol_total_direntries;
929           CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
930 
931           vol_used_direntries = dir_entries_used(gvol[i]);
932           CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
933           used_direntries += vol_used_direntries;
934         }
935 
936       } else {
937         // we got configured memory size
938         // TODO, should we check the available system memories, or you will
939         //   OOM or swapout, that is not a good situation for the server
940         Debug("cache_init", "CacheProcessor::cacheInitialized - %" PRId64 " != AUTO_SIZE_RAM_CACHE", cache_config_ram_cache_size);
941         int64_t http_ram_cache_size =
942           (theCache) ?
943             static_cast<int64_t>((static_cast<double>(theCache->cache_size) / total_size) * cache_config_ram_cache_size) :
944             0;
945         Debug("cache_init", "CacheProcessor::cacheInitialized - http_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
946               http_ram_cache_size, http_ram_cache_size / (1024 * 1024));
947         int64_t stream_ram_cache_size = cache_config_ram_cache_size - http_ram_cache_size;
948         Debug("cache_init", "CacheProcessor::cacheInitialized - stream_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
949               stream_ram_cache_size, stream_ram_cache_size / (1024 * 1024));
950 
951         // Dump some ram_cache size information in debug mode.
952         Debug("ram_cache", "config: size = %" PRId64 ", cutoff = %" PRId64 "", cache_config_ram_cache_size,
953               cache_config_ram_cache_cutoff);
954 
955         for (i = 0; i < gnvol; i++) {
956           vol = gvol[i];
957           double factor;
958           if (gvol[i]->cache == theCache) {
959             ink_assert(gvol[i]->cache != nullptr);
960             factor = static_cast<double>(static_cast<int64_t>(gvol[i]->len >> STORE_BLOCK_SHIFT)) / theCache->cache_size;
961             Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor);
962             gvol[i]->ram_cache->init(static_cast<int64_t>(http_ram_cache_size * factor), vol);
963             ram_cache_bytes += static_cast<int64_t>(http_ram_cache_size * factor);
964             CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)(http_ram_cache_size * factor));
965           } else {
966             ink_release_assert(!"Unexpected non-HTTP cache volume");
967           }
968           Debug("cache_init", "CacheProcessor::cacheInitialized[%d] - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", i,
969                 ram_cache_bytes, ram_cache_bytes / (1024 * 1024));
970           vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen();
971           total_cache_bytes += vol_total_cache_bytes;
972           CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
973           Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
974                 total_cache_bytes, total_cache_bytes / (1024 * 1024));
975 
976           vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
977           total_direntries += vol_total_direntries;
978           CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
979 
980           vol_used_direntries = dir_entries_used(gvol[i]);
981           CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
982           used_direntries += vol_used_direntries;
983         }
984       }
985       switch (cache_config_ram_cache_compress) {
986       default:
987         Fatal("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
988       case CACHE_COMPRESSION_NONE:
989       case CACHE_COMPRESSION_FASTLZ:
990         break;
991       case CACHE_COMPRESSION_LIBZ:
992 #ifndef HAVE_ZLIB_H
993         Fatal("libz not available for RAM cache compression");
994 #endif
995         break;
996       case CACHE_COMPRESSION_LIBLZMA:
997 #ifndef HAVE_LZMA_H
998         Fatal("lzma not available for RAM cache compression");
999 #endif
1000         break;
1001       }
1002 
1003       GLOBAL_CACHE_SET_DYN_STAT(cache_ram_cache_bytes_total_stat, ram_cache_bytes);
1004       GLOBAL_CACHE_SET_DYN_STAT(cache_bytes_total_stat, total_cache_bytes);
1005       GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_total_stat, total_direntries);
1006       GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_used_stat, used_direntries);
1007       if (!check) {
1008         dir_sync_init();
1009       }
1010       cache_init_ok = 1;
1011     } else {
1012       Warning("cache unable to open any vols, disabled");
1013     }
1014   }
1015   if (cache_init_ok) {
1016     // Initialize virtual cache
1017     CacheProcessor::initialized = CACHE_INITIALIZED;
1018     CacheProcessor::cache_ready = caches_ready;
1019     Note("cache enabled");
1020   } else {
1021     CacheProcessor::initialized = CACHE_INIT_FAILED;
1022     Note("cache disabled");
1023   }
1024 
1025   // Fire callback to signal initialization finished.
1026   if (cb_after_init) {
1027     cb_after_init();
1028   }
1029 
1030   // TS-3848
1031   if (CACHE_INIT_FAILED == CacheProcessor::initialized && cacheProcessor.waitForCache() > 1) {
1032     Fatal("Cache initialization failed with cache required, exiting.");
1033   }
1034 }
1035 
1036 void
stop()1037 CacheProcessor::stop()
1038 {
1039 }
1040 
1041 int
dir_check(bool afix)1042 CacheProcessor::dir_check(bool afix)
1043 {
1044   for (int i = 0; i < gnvol; i++) {
1045     gvol[i]->dir_check(afix);
1046   }
1047   return 0;
1048 }
1049 
1050 int
db_check(bool afix)1051 CacheProcessor::db_check(bool afix)
1052 {
1053   for (int i = 0; i < gnvol; i++) {
1054     gvol[i]->db_check(afix);
1055   }
1056   return 0;
1057 }
1058 
1059 Action *
lookup(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int host_len)1060 CacheProcessor::lookup(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len)
1061 {
1062   return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len);
1063 }
1064 
1065 inkcoreapi Action *
open_read(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int hostlen)1066 CacheProcessor::open_read(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int hostlen)
1067 {
1068   return caches[frag_type]->open_read(cont, key, frag_type, hostname, hostlen);
1069 }
1070 
1071 inkcoreapi Action *
open_write(Continuation * cont,CacheKey * key,CacheFragType frag_type,int expected_size ATS_UNUSED,int options,time_t pin_in_cache,char * hostname,int host_len)1072 CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, int expected_size ATS_UNUSED, int options,
1073                            time_t pin_in_cache, char *hostname, int host_len)
1074 {
1075   return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
1076 }
1077 
1078 Action *
remove(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int host_len)1079 CacheProcessor::remove(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len)
1080 {
1081   Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %u", cache_hash(*key));
1082   return caches[frag_type]->remove(cont, key, frag_type, hostname, host_len);
1083 }
1084 
1085 Action *
lookup(Continuation * cont,const HttpCacheKey * key,CacheFragType frag_type)1086 CacheProcessor::lookup(Continuation *cont, const HttpCacheKey *key, CacheFragType frag_type)
1087 {
1088   return lookup(cont, &key->hash, frag_type, key->hostname, key->hostlen);
1089 }
1090 
1091 Action *
scan(Continuation * cont,char * hostname,int host_len,int KB_per_second)1092 CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second)
1093 {
1094   return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
1095 }
1096 
1097 int
IsCacheEnabled()1098 CacheProcessor::IsCacheEnabled()
1099 {
1100   return CacheProcessor::initialized;
1101 }
1102 
1103 bool
IsCacheReady(CacheFragType type)1104 CacheProcessor::IsCacheReady(CacheFragType type)
1105 {
1106   if (IsCacheEnabled() != CACHE_INITIALIZED) {
1107     return false;
1108   }
1109   return static_cast<bool>(cache_ready & (1 << type));
1110 }
1111 
1112 int
db_check(bool)1113 Vol::db_check(bool /* fix ATS_UNUSED */)
1114 {
1115   char tt[256];
1116   printf("    Data for [%s]\n", hash_text.get());
1117   printf("        Length:          %" PRIu64 "\n", static_cast<uint64_t>(len));
1118   printf("        Write Position:  %" PRIu64 "\n", static_cast<uint64_t>(header->write_pos - skip));
1119   printf("        Phase:           %d\n", static_cast<int>(!!header->phase));
1120   ink_ctime_r(&header->create_time, tt);
1121   tt[strlen(tt) - 1] = 0;
1122   printf("        Create Time:     %s\n", tt);
1123   printf("        Sync Serial:     %u\n", static_cast<unsigned int>(header->sync_serial));
1124   printf("        Write Serial:    %u\n", static_cast<unsigned int>(header->write_serial));
1125   printf("\n");
1126 
1127   return 0;
1128 }
1129 
1130 static void
vol_init_data_internal(Vol * d)1131 vol_init_data_internal(Vol *d)
1132 {
1133   // step1: calculate the number of entries.
1134   off_t total_entries = (d->len - (d->start - d->skip)) / cache_config_min_average_object_size;
1135   // step2: calculate the number of buckets
1136   off_t total_buckets = total_entries / DIR_DEPTH;
1137   // step3: calculate the number of segments, no segment has more than 16384 buckets
1138   d->segments = (total_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
1139   // step4: divide total_buckets into segments on average.
1140   d->buckets = (total_buckets + d->segments - 1) / d->segments;
1141   // step5: set the start pointer.
1142   d->start = d->skip + 2 * d->dirlen();
1143 }
1144 
1145 static void
vol_init_data(Vol * d)1146 vol_init_data(Vol *d)
1147 {
1148   // iteratively calculate start + buckets
1149   vol_init_data_internal(d);
1150   vol_init_data_internal(d);
1151   vol_init_data_internal(d);
1152 }
1153 
1154 void
vol_init_dir(Vol * d)1155 vol_init_dir(Vol *d)
1156 {
1157   int b, s, l;
1158 
1159   for (s = 0; s < d->segments; s++) {
1160     d->header->freelist[s] = 0;
1161     Dir *seg               = d->dir_segment(s);
1162     for (l = 1; l < DIR_DEPTH; l++) {
1163       for (b = 0; b < d->buckets; b++) {
1164         Dir *bucket = dir_bucket(b, seg);
1165         dir_free_entry(dir_bucket_row(bucket, l), s, d);
1166       }
1167     }
1168   }
1169 }
1170 
1171 void
vol_clear_init(Vol * d)1172 vol_clear_init(Vol *d)
1173 {
1174   size_t dir_len = d->dirlen();
1175   memset(d->raw_dir, 0, dir_len);
1176   vol_init_dir(d);
1177   d->header->magic          = VOL_MAGIC;
1178   d->header->version._major = CACHE_DB_MAJOR_VERSION;
1179   d->header->version._minor = CACHE_DB_MINOR_VERSION;
1180   d->scan_pos = d->header->agg_pos = d->header->write_pos = d->start;
1181   d->header->last_write_pos                               = d->header->write_pos;
1182   d->header->phase                                        = 0;
1183   d->header->cycle                                        = 0;
1184   d->header->create_time                                  = time(nullptr);
1185   d->header->dirty                                        = 0;
1186   d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
1187   *d->footer                              = *d->header;
1188 }
1189 
1190 int
vol_dir_clear(Vol * d)1191 vol_dir_clear(Vol *d)
1192 {
1193   size_t dir_len = d->dirlen();
1194   vol_clear_init(d);
1195 
1196   if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) {
1197     Warning("unable to clear cache directory '%s'", d->hash_text.get());
1198     return -1;
1199   }
1200   return 0;
1201 }
1202 
1203 int
clear_dir()1204 Vol::clear_dir()
1205 {
1206   size_t dir_len = this->dirlen();
1207   vol_clear_init(this);
1208 
1209   SET_HANDLER(&Vol::handle_dir_clear);
1210 
1211   io.aiocb.aio_fildes = fd;
1212   io.aiocb.aio_buf    = raw_dir;
1213   io.aiocb.aio_nbytes = dir_len;
1214   io.aiocb.aio_offset = skip;
1215   io.action           = this;
1216   io.thread           = AIO_CALLBACK_THREAD_ANY;
1217   io.then             = nullptr;
1218   ink_assert(ink_aio_write(&io));
1219   return 0;
1220 }
1221 
1222 int
init(char * s,off_t blocks,off_t dir_skip,bool clear)1223 Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
1224 {
1225   char *seed_str              = disk->hash_base_string ? disk->hash_base_string : s;
1226   const size_t hash_seed_size = strlen(seed_str);
1227   const size_t hash_text_size = hash_seed_size + 32;
1228 
1229   hash_text = static_cast<char *>(ats_malloc(hash_text_size));
1230   ink_strlcpy(hash_text, seed_str, hash_text_size);
1231   snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" PRIu64 ":%" PRIu64 "",
1232            static_cast<uint64_t>(dir_skip), static_cast<uint64_t>(blocks));
1233   CryptoContext().hash_immediate(hash_id, hash_text, strlen(hash_text));
1234 
1235   dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : dir_skip));
1236   path     = ats_strdup(s);
1237   len      = blocks * STORE_BLOCK_SIZE;
1238   ink_assert(len <= MAX_VOL_SIZE);
1239   skip             = dir_skip;
1240   prev_recover_pos = 0;
1241 
1242   // successive approximation, directory/meta data eats up some storage
1243   start = dir_skip;
1244   vol_init_data(this);
1245   data_blocks         = (len - (start - skip)) / STORE_BLOCK_SIZE;
1246   hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100;
1247 
1248   evacuate_size = static_cast<int>(len / EVACUATION_BUCKET_SIZE) + 2;
1249   int evac_len  = evacuate_size * sizeof(DLL<EvacuationBlock>);
1250   evacuate      = static_cast<DLL<EvacuationBlock> *>(ats_malloc(evac_len));
1251   memset(static_cast<void *>(evacuate), 0, evac_len);
1252 
1253   Debug("cache_init", "Vol %s: allocating %zu directory bytes for a %lld byte volume (%lf%%)", hash_text.get(), dirlen(),
1254         (long long)this->len, (double)dirlen() / (double)this->len * 100.0);
1255 
1256   raw_dir = nullptr;
1257   if (ats_hugepage_enabled()) {
1258     raw_dir = static_cast<char *>(ats_alloc_hugepage(this->dirlen()));
1259   }
1260   if (raw_dir == nullptr) {
1261     raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->dirlen()));
1262   }
1263 
1264   dir    = reinterpret_cast<Dir *>(raw_dir + this->headerlen());
1265   header = reinterpret_cast<VolHeaderFooter *>(raw_dir);
1266   footer = reinterpret_cast<VolHeaderFooter *>(raw_dir + this->dirlen() - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)));
1267 
1268   if (clear) {
1269     Note("clearing cache directory '%s'", hash_text.get());
1270     return clear_dir();
1271   }
1272 
1273   init_info           = new VolInitInfo();
1274   int footerlen       = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1275   off_t footer_offset = this->dirlen() - footerlen;
1276   // try A
1277   off_t as = skip;
1278 
1279   Debug("cache_init", "reading directory '%s'", hash_text.get());
1280   SET_HANDLER(&Vol::handle_header_read);
1281   init_info->vol_aio[0].aiocb.aio_offset = as;
1282   init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset;
1283   off_t bs                               = skip + this->dirlen();
1284   init_info->vol_aio[2].aiocb.aio_offset = bs;
1285   init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset;
1286 
1287   for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
1288     AIOCallback *aio      = &(init_info->vol_aio[i]);
1289     aio->aiocb.aio_fildes = fd;
1290     aio->aiocb.aio_buf    = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
1291     aio->aiocb.aio_nbytes = footerlen;
1292     aio->action           = this;
1293     aio->thread           = AIO_CALLBACK_THREAD_ANY;
1294     aio->then             = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr;
1295   }
1296 #if AIO_MODE == AIO_MODE_NATIVE
1297   ink_assert(ink_aio_readv(init_info->vol_aio));
1298 #else
1299   ink_assert(ink_aio_read(init_info->vol_aio));
1300 #endif
1301   return 0;
1302 }
1303 
1304 int
handle_dir_clear(int event,void * data)1305 Vol::handle_dir_clear(int event, void *data)
1306 {
1307   size_t dir_len = this->dirlen();
1308   AIOCallback *op;
1309 
1310   if (event == AIO_EVENT_DONE) {
1311     op = static_cast<AIOCallback *>(data);
1312     if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1313       Warning("unable to clear cache directory '%s'", hash_text.get());
1314       disk->incrErrors(op);
1315       fd = -1;
1316     }
1317 
1318     if (op->aiocb.aio_nbytes == dir_len) {
1319       /* clear the header for directory B. We don't need to clear the
1320          whole of directory B. The header for directory B starts at
1321          skip + len */
1322       op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1323       op->aiocb.aio_offset = skip + dir_len;
1324       ink_assert(ink_aio_write(op));
1325       return EVENT_DONE;
1326     }
1327     set_io_not_in_progress();
1328     SET_HANDLER(&Vol::dir_init_done);
1329     dir_init_done(EVENT_IMMEDIATE, nullptr);
1330     /* mark the volume as bad */
1331   }
1332   return EVENT_DONE;
1333 }
1334 
1335 int
handle_dir_read(int event,void * data)1336 Vol::handle_dir_read(int event, void *data)
1337 {
1338   AIOCallback *op = static_cast<AIOCallback *>(data);
1339 
1340   if (event == AIO_EVENT_DONE) {
1341     if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1342       Note("Directory read failed: clearing cache directory %s", this->hash_text.get());
1343       clear_dir();
1344       return EVENT_DONE;
1345     }
1346   }
1347 
1348   if (!(header->magic == VOL_MAGIC && footer->magic == VOL_MAGIC && CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version._major &&
1349         header->version._major <= CACHE_DB_MAJOR_VERSION)) {
1350     Warning("bad footer in cache directory for '%s', clearing", hash_text.get());
1351     Note("VOL_MAGIC %d\n header magic: %d\n footer_magic %d\n CACHE_DB_MAJOR_VERSION_COMPATIBLE %d\n major version %d\n"
1352          "CACHE_DB_MAJOR_VERSION %d\n",
1353          VOL_MAGIC, header->magic, footer->magic, CACHE_DB_MAJOR_VERSION_COMPATIBLE, header->version._major,
1354          CACHE_DB_MAJOR_VERSION);
1355     Note("clearing cache directory '%s'", hash_text.get());
1356     clear_dir();
1357     return EVENT_DONE;
1358   }
1359   CHECK_DIR(this);
1360 
1361   sector_size = header->sector_size;
1362 
1363   return this->recover_data();
1364 
1365   return EVENT_CONT;
1366 }
1367 
1368 int
recover_data()1369 Vol::recover_data()
1370 {
1371   SET_HANDLER(&Vol::handle_recover_from_data);
1372   return handle_recover_from_data(EVENT_IMMEDIATE, nullptr);
1373 }
1374 
1375 /*
1376    Philosophy:  The idea is to find the region of disk that could be
1377    inconsistent and remove all directory entries pointing to that potentially
1378    inconsistent region.
1379    Start from a consistent position (the write_pos of the last directory
1380    synced to disk) and scan forward. Two invariants for docs that were
1381    written to the disk after the directory was synced:
1382 
1383    1. doc->magic == DOC_MAGIC
1384 
1385    The following two cases happen only when the previous generation
1386    documents are aligned with the current ones.
1387 
1388    2. All the docs written to the disk
1389    after the directory was synced will have their sync_serial <=
1390    header->sync_serial + 1,  because the write aggregation can take
1391    indeterminate amount of time to sync. The doc->sync_serial can be
1392    equal to header->sync_serial + 1, because we increment the sync_serial
1393    before we sync the directory to disk.
1394 
1395    3. The doc->sync_serial will always increase. If doc->sync_serial
1396    decreases, the document was written in the previous phase
1397 
1398    If either of these conditions fail and we are not too close to the end
1399    (see the next comment ) then we're done
1400 
1401    We actually start from header->last_write_pos instead of header->write_pos
1402    to make sure that we haven't wrapped around the whole disk without
1403    syncing the directory.  Since the sync serial is 60 seconds, it is
1404    entirely possible to write through the whole cache without
1405    once syncing the directory. In this case, we need to clear the
1406    cache.The documents written right before we synced the
1407    directory to disk should have the write_serial <= header->sync_serial.
1408 
1409       */
1410 
1411 int
handle_recover_from_data(int event,void *)1412 Vol::handle_recover_from_data(int event, void * /* data ATS_UNUSED */)
1413 {
1414   uint32_t got_len         = 0;
1415   uint32_t max_sync_serial = header->sync_serial;
1416   char *s, *e;
1417   if (event == EVENT_IMMEDIATE) {
1418     if (header->sync_serial == 0) {
1419       io.aiocb.aio_buf = nullptr;
1420       SET_HANDLER(&Vol::handle_recover_write_dir);
1421       return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
1422     }
1423     // initialize
1424     recover_wrapped   = false;
1425     last_sync_serial  = 0;
1426     last_write_serial = 0;
1427     recover_pos       = header->last_write_pos;
1428     if (recover_pos >= skip + len) {
1429       recover_wrapped = true;
1430       recover_pos     = start;
1431     }
1432     io.aiocb.aio_buf    = static_cast<char *>(ats_memalign(ats_pagesize(), RECOVERY_SIZE));
1433     io.aiocb.aio_nbytes = RECOVERY_SIZE;
1434     if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
1435       io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1436     }
1437   } else if (event == AIO_EVENT_DONE) {
1438     if (io.aiocb.aio_nbytes != static_cast<size_t>(io.aio_result)) {
1439       Warning("disk read error on recover '%s', clearing", hash_text.get());
1440       disk->incrErrors(&io);
1441       goto Lclear;
1442     }
1443     if (io.aiocb.aio_offset == header->last_write_pos) {
1444       /* check that we haven't wrapped around without syncing
1445          the directory. Start from last_write_serial (write pos the documents
1446          were written to just before syncing the directory) and make sure
1447          that all documents have write_serial <= header->write_serial.
1448        */
1449       uint32_t to_check = header->write_pos - header->last_write_pos;
1450       ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
1451       uint32_t done = 0;
1452       s             = static_cast<char *>(io.aiocb.aio_buf);
1453       while (done < to_check) {
1454         Doc *doc = reinterpret_cast<Doc *>(s + done);
1455         if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
1456           Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
1457           goto Lclear;
1458         }
1459         done += round_to_approx_size(doc->len);
1460         if (doc->sync_serial > last_write_serial) {
1461           last_sync_serial = doc->sync_serial;
1462         }
1463       }
1464       ink_assert(done == to_check);
1465 
1466       got_len = io.aiocb.aio_nbytes - done;
1467       recover_pos += io.aiocb.aio_nbytes;
1468       s = static_cast<char *>(io.aiocb.aio_buf) + done;
1469       e = s + got_len;
1470     } else {
1471       got_len = io.aiocb.aio_nbytes;
1472       recover_pos += io.aiocb.aio_nbytes;
1473       s = static_cast<char *>(io.aiocb.aio_buf);
1474       e = s + got_len;
1475     }
1476   }
1477   // examine what we got
1478   if (got_len) {
1479     Doc *doc = nullptr;
1480 
1481     if (recover_wrapped && start == io.aiocb.aio_offset) {
1482       doc = reinterpret_cast<Doc *>(s);
1483       if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
1484         recover_pos = skip + len - EVACUATION_SIZE;
1485         goto Ldone;
1486       }
1487     }
1488 
1489     // If execution reaches here, then @c got_len > 0 and e == s + got_len therefore s < e
1490     // clang analyzer can't figure this out, so be explicit.
1491     ink_assert(s < e);
1492     while (s < e) {
1493       doc = reinterpret_cast<Doc *>(s);
1494 
1495       if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
1496         if (doc->magic == DOC_MAGIC) {
1497           if (doc->sync_serial > header->sync_serial) {
1498             max_sync_serial = doc->sync_serial;
1499           }
1500 
1501           /*
1502              doc->magic == DOC_MAGIC, but doc->sync_serial != last_sync_serial
1503              This might happen in the following situations
1504              1. We are starting off recovery. In this case the
1505              last_sync_serial == header->sync_serial, but the doc->sync_serial
1506              can be anywhere in the range (0, header->sync_serial + 1]
1507              If this is the case, update last_sync_serial and continue;
1508 
1509              2. A dir sync started between writing documents to the
1510              aggregation buffer and hence the doc->sync_serial went up.
1511              If the doc->sync_serial is greater than the last
1512              sync serial and less than (header->sync_serial + 2) then
1513              continue;
1514 
1515              3. If the position we are recovering from is within AGG_SIZE
1516              from the disk end, then we can't trust this document. The
1517              aggregation buffer might have been larger than the remaining space
1518              at the end and we decided to wrap around instead of writing
1519              anything at that point. In this case, wrap around and start
1520              from the beginning.
1521 
1522              If neither of these 3 cases happen, then we are indeed done.
1523 
1524            */
1525 
1526           // case 1
1527           // case 2
1528           if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
1529             last_sync_serial = doc->sync_serial;
1530             s += round_to_approx_size(doc->len);
1531             continue;
1532           }
1533           // case 3 - we have already recovered some data and
1534           // (doc->sync_serial < last_sync_serial) ||
1535           // (doc->sync_serial > header->sync_serial + 1).
1536           // if we are too close to the end, wrap around
1537           else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
1538             recover_wrapped     = true;
1539             recover_pos         = start;
1540             io.aiocb.aio_nbytes = RECOVERY_SIZE;
1541 
1542             break;
1543           }
1544           // we are done. This doc was written in the earlier phase
1545           recover_pos -= e - s;
1546           goto Ldone;
1547         } else {
1548           // doc->magic != DOC_MAGIC
1549           // If we are in the danger zone - recover_pos is within AGG_SIZE
1550           // from the end, then wrap around
1551           recover_pos -= e - s;
1552           if (recover_pos > (skip + len) - AGG_SIZE) {
1553             recover_wrapped     = true;
1554             recover_pos         = start;
1555             io.aiocb.aio_nbytes = RECOVERY_SIZE;
1556 
1557             break;
1558           }
1559           // we ar not in the danger zone
1560           goto Ldone;
1561         }
1562       }
1563       // doc->magic == DOC_MAGIC && doc->sync_serial == last_sync_serial
1564       last_write_serial = doc->write_serial;
1565       s += round_to_approx_size(doc->len);
1566     }
1567 
1568     /* if (s > e) then we gone through RECOVERY_SIZE; we need to
1569        read more data off disk and continue recovering */
1570     if (s >= e) {
1571       /* In the last iteration, we increment s by doc->len...need to undo
1572          that change */
1573       if (s > e) {
1574         s -= round_to_approx_size(doc->len);
1575       }
1576       recover_pos -= e - s;
1577       if (recover_pos >= skip + len) {
1578         recover_wrapped = true;
1579         recover_pos     = start;
1580       }
1581       io.aiocb.aio_nbytes = RECOVERY_SIZE;
1582       if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
1583         io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1584       }
1585     }
1586   }
1587   if (recover_pos == prev_recover_pos) { // this should never happen, but if it does break the loop
1588     goto Lclear;
1589   }
1590   prev_recover_pos    = recover_pos;
1591   io.aiocb.aio_offset = recover_pos;
1592   ink_assert(ink_aio_read(&io));
1593   return EVENT_CONT;
1594 
1595 Ldone : {
1596   /* if we come back to the starting position, then we don't have to recover anything */
1597   if (recover_pos == header->write_pos && recover_wrapped) {
1598     SET_HANDLER(&Vol::handle_recover_write_dir);
1599     if (is_debug_tag_set("cache_init")) {
1600       Note("recovery wrapped around. nothing to clear\n");
1601     }
1602     return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
1603   }
1604 
1605   recover_pos += EVACUATION_SIZE; // safely cover the max write size
1606   if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
1607     Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
1608     Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
1609     goto Lclear;
1610   }
1611 
1612   if (recover_pos > skip + len) {
1613     recover_pos -= skip + len;
1614   }
1615   // bump sync number so it is different from that in the Doc structs
1616   uint32_t next_sync_serial = max_sync_serial + 1;
1617   // make that the next sync does not overwrite our good copy!
1618   if (!(header->sync_serial & 1) == !(next_sync_serial & 1)) {
1619     next_sync_serial++;
1620   }
1621   // clear effected portion of the cache
1622   off_t clear_start = this->offset_to_vol_offset(header->write_pos);
1623   off_t clear_end   = this->offset_to_vol_offset(recover_pos);
1624   if (clear_start <= clear_end) {
1625     dir_clear_range(clear_start, clear_end, this);
1626   } else {
1627     dir_clear_range(clear_start, DIR_OFFSET_MAX, this);
1628     dir_clear_range(1, clear_end, this);
1629   }
1630 
1631   Note("recovery clearing offsets of Vol %s : [%" PRIu64 ", %" PRIu64 "] sync_serial %d next %d\n", hash_text.get(),
1632        header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
1633 
1634   footer->sync_serial = header->sync_serial = next_sync_serial;
1635 
1636   for (int i = 0; i < 3; i++) {
1637     AIOCallback *aio      = &(init_info->vol_aio[i]);
1638     aio->aiocb.aio_fildes = fd;
1639     aio->action           = this;
1640     aio->thread           = AIO_CALLBACK_THREAD_ANY;
1641     aio->then             = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr;
1642   }
1643   int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1644   size_t dirlen = this->dirlen();
1645   int B         = header->sync_serial & 1;
1646   off_t ss      = skip + (B ? dirlen : 0);
1647 
1648   init_info->vol_aio[0].aiocb.aio_buf    = raw_dir;
1649   init_info->vol_aio[0].aiocb.aio_nbytes = footerlen;
1650   init_info->vol_aio[0].aiocb.aio_offset = ss;
1651   init_info->vol_aio[1].aiocb.aio_buf    = raw_dir + footerlen;
1652   init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
1653   init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen;
1654   init_info->vol_aio[2].aiocb.aio_buf    = raw_dir + dirlen - footerlen;
1655   init_info->vol_aio[2].aiocb.aio_nbytes = footerlen;
1656   init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
1657 
1658   SET_HANDLER(&Vol::handle_recover_write_dir);
1659 #if AIO_MODE == AIO_MODE_NATIVE
1660   ink_assert(ink_aio_writev(init_info->vol_aio));
1661 #else
1662   ink_assert(ink_aio_write(init_info->vol_aio));
1663 #endif
1664   return EVENT_CONT;
1665 }
1666 
1667 Lclear:
1668   free(static_cast<char *>(io.aiocb.aio_buf));
1669   delete init_info;
1670   init_info = nullptr;
1671   clear_dir();
1672   return EVENT_CONT;
1673 }
1674 
1675 int
handle_recover_write_dir(int,void *)1676 Vol::handle_recover_write_dir(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1677 {
1678   if (io.aiocb.aio_buf) {
1679     free(static_cast<char *>(io.aiocb.aio_buf));
1680   }
1681   delete init_info;
1682   init_info = nullptr;
1683   set_io_not_in_progress();
1684   scan_pos = header->write_pos;
1685   periodic_scan();
1686   SET_HANDLER(&Vol::dir_init_done);
1687   return dir_init_done(EVENT_IMMEDIATE, nullptr);
1688 }
1689 
1690 int
handle_header_read(int event,void * data)1691 Vol::handle_header_read(int event, void *data)
1692 {
1693   AIOCallback *op;
1694   VolHeaderFooter *hf[4];
1695   switch (event) {
1696   case AIO_EVENT_DONE:
1697     op = static_cast<AIOCallback *>(data);
1698     for (auto &i : hf) {
1699       ink_assert(op != nullptr);
1700       i = static_cast<VolHeaderFooter *>(op->aiocb.aio_buf);
1701       if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1702         Note("Header read failed: clearing cache directory %s", this->hash_text.get());
1703         clear_dir();
1704         return EVENT_DONE;
1705       }
1706       op = op->then;
1707     }
1708 
1709     io.aiocb.aio_fildes = fd;
1710     io.aiocb.aio_nbytes = this->dirlen();
1711     io.aiocb.aio_buf    = raw_dir;
1712     io.action           = this;
1713     io.thread           = AIO_CALLBACK_THREAD_ANY;
1714     io.then             = nullptr;
1715 
1716     if (hf[0]->sync_serial == hf[1]->sync_serial &&
1717         (hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) {
1718       SET_HANDLER(&Vol::handle_dir_read);
1719       if (is_debug_tag_set("cache_init")) {
1720         Note("using directory A for '%s'", hash_text.get());
1721       }
1722       io.aiocb.aio_offset = skip;
1723       ink_assert(ink_aio_read(&io));
1724     }
1725     // try B
1726     else if (hf[2]->sync_serial == hf[3]->sync_serial) {
1727       SET_HANDLER(&Vol::handle_dir_read);
1728       if (is_debug_tag_set("cache_init")) {
1729         Note("using directory B for '%s'", hash_text.get());
1730       }
1731       io.aiocb.aio_offset = skip + this->dirlen();
1732       ink_assert(ink_aio_read(&io));
1733     } else {
1734       Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get());
1735       Note("Header A: %d\nFooter A: %d\n Header B: %d\n Footer B %d\n", hf[0]->sync_serial, hf[1]->sync_serial, hf[2]->sync_serial,
1736            hf[3]->sync_serial);
1737       clear_dir();
1738       delete init_info;
1739       init_info = nullptr;
1740     }
1741     return EVENT_DONE;
1742   default:
1743     ink_assert(!"not reach here");
1744   }
1745   return EVENT_DONE;
1746 }
1747 
1748 int
dir_init_done(int,void *)1749 Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1750 {
1751   if (!cache->cache_read_done) {
1752     eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL);
1753     return EVENT_CONT;
1754   } else {
1755     int vol_no = gnvol++;
1756     ink_assert(!gvol[vol_no]);
1757     gvol[vol_no] = this;
1758     SET_HANDLER(&Vol::aggWrite);
1759     if (fd == -1) {
1760       cache->vol_initialized(false);
1761     } else {
1762       cache->vol_initialized(true);
1763     }
1764     return EVENT_DONE;
1765   }
1766 }
1767 
1768 // explicit pair for random table in build_vol_hash_table
1769 struct rtable_pair {
1770   unsigned int rval; ///< relative value, used to sort.
1771   unsigned int idx;  ///< volume mapping table index.
1772 };
1773 
1774 // comparison operator for random table in build_vol_hash_table
1775 // sorts based on the randomly assigned rval
1776 static int
cmprtable(const void * aa,const void * bb)1777 cmprtable(const void *aa, const void *bb)
1778 {
1779   rtable_pair *a = (rtable_pair *)aa;
1780   rtable_pair *b = (rtable_pair *)bb;
1781   if (a->rval < b->rval) {
1782     return -1;
1783   }
1784   if (a->rval > b->rval) {
1785     return 1;
1786   }
1787   return 0;
1788 }
1789 
1790 void
build_vol_hash_table(CacheHostRecord * cp)1791 build_vol_hash_table(CacheHostRecord *cp)
1792 {
1793   int num_vols          = cp->num_vols;
1794   unsigned int *mapping = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1795   Vol **p               = static_cast<Vol **>(ats_malloc(sizeof(Vol *) * num_vols));
1796 
1797   memset(mapping, 0, num_vols * sizeof(unsigned int));
1798   memset(p, 0, num_vols * sizeof(Vol *));
1799   uint64_t total = 0;
1800   int bad_vols   = 0;
1801   int map        = 0;
1802   uint64_t used  = 0;
1803   // initialize number of elements per vol
1804   for (int i = 0; i < num_vols; i++) {
1805     if (DISK_BAD(cp->vols[i]->disk)) {
1806       bad_vols++;
1807       continue;
1808     }
1809     mapping[map] = i;
1810     p[map++]     = cp->vols[i];
1811     total += (cp->vols[i]->len >> STORE_BLOCK_SHIFT);
1812   }
1813 
1814   num_vols -= bad_vols;
1815 
1816   if (!num_vols || !total) {
1817     // all the disks are corrupt,
1818     if (cp->vol_hash_table) {
1819       new_Freer(cp->vol_hash_table, CACHE_MEM_FREE_TIMEOUT);
1820     }
1821     cp->vol_hash_table = nullptr;
1822     ats_free(mapping);
1823     ats_free(p);
1824     return;
1825   }
1826 
1827   unsigned int *forvol   = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1828   unsigned int *gotvol   = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1829   unsigned int *rnd      = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1830   unsigned short *ttable = static_cast<unsigned short *>(ats_malloc(sizeof(unsigned short) * VOL_HASH_TABLE_SIZE));
1831   unsigned short *old_table;
1832   unsigned int *rtable_entries = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1833   unsigned int rtable_size     = 0;
1834 
1835   // estimate allocation
1836   for (int i = 0; i < num_vols; i++) {
1837     forvol[i] = (VOL_HASH_TABLE_SIZE * (p[i]->len >> STORE_BLOCK_SHIFT)) / total;
1838     used += forvol[i];
1839     rtable_entries[i] = p[i]->len / VOL_HASH_ALLOC_SIZE;
1840     rtable_size += rtable_entries[i];
1841     gotvol[i] = 0;
1842   }
1843   // spread around the excess
1844   int extra = VOL_HASH_TABLE_SIZE - used;
1845   for (int i = 0; i < extra; i++) {
1846     forvol[i % num_vols]++;
1847   }
1848   // seed random number generator
1849   for (int i = 0; i < num_vols; i++) {
1850     uint64_t x = p[i]->hash_id.fold();
1851     rnd[i]     = static_cast<unsigned int>(x);
1852   }
1853   // initialize table to "empty"
1854   for (int i = 0; i < VOL_HASH_TABLE_SIZE; i++) {
1855     ttable[i] = VOL_HASH_EMPTY;
1856   }
1857   // generate random numbers proportional to allocation
1858   rtable_pair *rtable = static_cast<rtable_pair *>(ats_malloc(sizeof(rtable_pair) * rtable_size));
1859   int rindex          = 0;
1860   for (int i = 0; i < num_vols; i++) {
1861     for (int j = 0; j < static_cast<int>(rtable_entries[i]); j++) {
1862       rtable[rindex].rval = next_rand(&rnd[i]);
1863       rtable[rindex].idx  = i;
1864       rindex++;
1865     }
1866   }
1867   ink_assert(rindex == (int)rtable_size);
1868   // sort (rand #, vol $ pairs)
1869   qsort(rtable, rtable_size, sizeof(rtable_pair), cmprtable);
1870   unsigned int width = (1LL << 32) / VOL_HASH_TABLE_SIZE;
1871   unsigned int pos; // target position to allocate
1872   // select vol with closest random number for each bucket
1873   int i = 0; // index moving through the random numbers
1874   for (int j = 0; j < VOL_HASH_TABLE_SIZE; j++) {
1875     pos = width / 2 + j * width; // position to select closest to
1876     while (pos > rtable[i].rval && i < static_cast<int>(rtable_size) - 1) {
1877       i++;
1878     }
1879     ttable[j] = mapping[rtable[i].idx];
1880     gotvol[rtable[i].idx]++;
1881   }
1882   for (int i = 0; i < num_vols; i++) {
1883     Debug("cache_init", "build_vol_hash_table index %d mapped to %d requested %d got %d", i, mapping[i], forvol[i], gotvol[i]);
1884   }
1885   // install new table
1886   if (nullptr != (old_table = ink_atomic_swap(&(cp->vol_hash_table), ttable))) {
1887     new_Freer(old_table, CACHE_MEM_FREE_TIMEOUT);
1888   }
1889   ats_free(mapping);
1890   ats_free(p);
1891   ats_free(forvol);
1892   ats_free(gotvol);
1893   ats_free(rnd);
1894   ats_free(rtable_entries);
1895   ats_free(rtable);
1896 }
1897 
1898 void
vol_initialized(bool result)1899 Cache::vol_initialized(bool result)
1900 {
1901   if (result) {
1902     ink_atomic_increment(&total_good_nvol, 1);
1903   }
1904   if (total_nvol == ink_atomic_increment(&total_initialized_vol, 1) + 1) {
1905     open_done();
1906   }
1907 }
1908 
1909 /** Set the state of a disk programmatically.
1910  */
1911 bool
mark_storage_offline(CacheDisk * d,bool admin)1912 CacheProcessor::mark_storage_offline(CacheDisk *d, ///< Target disk
1913                                      bool admin)
1914 {
1915   bool zret; // indicates whether there's any online storage left.
1916   int p;
1917   uint64_t total_bytes_delete = 0;
1918   uint64_t total_dir_delete   = 0;
1919   uint64_t used_dir_delete    = 0;
1920 
1921   /* Don't mark it again, it will invalidate the stats! */
1922   if (!d->online) {
1923     return this->has_online_storage();
1924   }
1925 
1926   d->online = false;
1927 
1928   if (!DISK_BAD(d)) {
1929     SET_DISK_BAD(d);
1930   }
1931 
1932   for (p = 0; p < gnvol; p++) {
1933     if (d->fd == gvol[p]->fd) {
1934       total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH;
1935       used_dir_delete += dir_entries_used(gvol[p]);
1936       total_bytes_delete += gvol[p]->len - gvol[p]->dirlen();
1937     }
1938   }
1939 
1940   RecIncrGlobalRawStat(cache_rsb, cache_bytes_total_stat, -total_bytes_delete);
1941   RecIncrGlobalRawStat(cache_rsb, cache_direntries_total_stat, -total_dir_delete);
1942   RecIncrGlobalRawStat(cache_rsb, cache_direntries_used_stat, -used_dir_delete);
1943 
1944   /* Update the span metrics, if failing then move the span from "failing" to "offline" bucket
1945    * if operator took it offline, move it from "online" to "offline" bucket */
1946   RecIncrGlobalRawStat(cache_rsb, admin ? cache_span_online_stat : cache_span_failing_stat, -1);
1947   RecIncrGlobalRawStat(cache_rsb, cache_span_offline_stat, 1);
1948 
1949   if (theCache) {
1950     rebuild_host_table(theCache);
1951   }
1952 
1953   zret = this->has_online_storage();
1954   if (!zret) {
1955     Warning("All storage devices offline, cache disabled");
1956     CacheProcessor::cache_ready = 0;
1957   } else { // check cache types specifically
1958     if (theCache && !theCache->hosttable->gen_host_rec.vol_hash_table) {
1959       unsigned int caches_ready = 0;
1960       caches_ready              = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
1961       caches_ready              = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
1962       caches_ready              = ~caches_ready;
1963       CacheProcessor::cache_ready &= caches_ready;
1964       Warning("all volumes for http cache are corrupt, http cache disabled");
1965     }
1966   }
1967 
1968   return zret;
1969 }
1970 
1971 bool
has_online_storage() const1972 CacheProcessor::has_online_storage() const
1973 {
1974   CacheDisk **dptr = gdisks;
1975   for (int disk_no = 0; disk_no < gndisks; ++disk_no, ++dptr) {
1976     if (!DISK_BAD(*dptr) && (*dptr)->online) {
1977       return true;
1978     }
1979   }
1980   return false;
1981 }
1982 
1983 int
handle_disk_failure(int,void * data)1984 AIO_Callback_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data)
1985 {
1986   /* search for the matching file descriptor */
1987   if (!CacheProcessor::cache_ready) {
1988     return EVENT_DONE;
1989   }
1990   int disk_no     = 0;
1991   AIOCallback *cb = static_cast<AIOCallback *>(data);
1992 
1993   for (; disk_no < gndisks; disk_no++) {
1994     CacheDisk *d = gdisks[disk_no];
1995 
1996     if (d->fd == cb->aiocb.aio_fildes) {
1997       char message[256];
1998       d->incrErrors(cb);
1999 
2000       if (!DISK_BAD(d)) {
2001         snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors);
2002         Warning("%s", message);
2003         RecSignalManager(REC_SIGNAL_CACHE_WARNING, message);
2004       } else if (!DISK_BAD_SIGNALLED(d)) {
2005         snprintf(message, sizeof(message), "too many errors accessing disk %s [%d/%d]: declaring disk bad", d->path, d->num_errors,
2006                  cache_config_max_disk_errors);
2007         Warning("%s", message);
2008         RecSignalManager(REC_SIGNAL_CACHE_ERROR, message);
2009         cacheProcessor.mark_storage_offline(d); // take it out of service
2010       }
2011       break;
2012     }
2013   }
2014 
2015   delete cb;
2016   return EVENT_DONE;
2017 }
2018 
2019 int
open_done()2020 Cache::open_done()
2021 {
2022   Action *register_ShowCache(Continuation * c, HTTPHdr * h);
2023   Action *register_ShowCacheInternal(Continuation * c, HTTPHdr * h);
2024   statPagesManager.register_http("cache", register_ShowCache);
2025   statPagesManager.register_http("cache-internal", register_ShowCacheInternal);
2026 
2027   if (total_good_nvol == 0) {
2028     ready = CACHE_INIT_FAILED;
2029     cacheProcessor.cacheInitialized();
2030     return 0;
2031   }
2032 
2033   hosttable = new CacheHostTable(this, scheme);
2034   hosttable->register_config_callback(&hosttable);
2035 
2036   if (hosttable->gen_host_rec.num_cachevols == 0) {
2037     ready = CACHE_INIT_FAILED;
2038   } else {
2039     ready = CACHE_INITIALIZED;
2040   }
2041 
2042   // TS-3848
2043   if (ready == CACHE_INIT_FAILED && cacheProcessor.waitForCache() >= 2) {
2044     Fatal("Failed to initialize cache host table");
2045   }
2046 
2047   cacheProcessor.cacheInitialized();
2048 
2049   return 0;
2050 }
2051 
2052 int
open(bool clear,bool)2053 Cache::open(bool clear, bool /* fix ATS_UNUSED */)
2054 {
2055   int i;
2056   off_t blocks          = 0;
2057   cache_read_done       = 0;
2058   total_initialized_vol = 0;
2059   total_nvol            = 0;
2060   total_good_nvol       = 0;
2061 
2062   REC_EstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size");
2063   Debug("cache_init", "Cache::open - proxy.config.cache.min_average_object_size = %d", (int)cache_config_min_average_object_size);
2064 
2065   CacheVol *cp = cp_list.head;
2066   for (; cp; cp = cp->link.next) {
2067     if (cp->scheme == scheme) {
2068       cp->vols   = static_cast<Vol **>(ats_malloc(cp->num_vols * sizeof(Vol *)));
2069       int vol_no = 0;
2070       for (i = 0; i < gndisks; i++) {
2071         if (cp->disk_vols[i] && !DISK_BAD(cp->disk_vols[i]->disk)) {
2072           DiskVolBlockQueue *q = cp->disk_vols[i]->dpb_queue.head;
2073           for (; q; q = q->link.next) {
2074             cp->vols[vol_no]            = new Vol();
2075             CacheDisk *d                = cp->disk_vols[i]->disk;
2076             cp->vols[vol_no]->disk      = d;
2077             cp->vols[vol_no]->fd        = d->fd;
2078             cp->vols[vol_no]->cache     = this;
2079             cp->vols[vol_no]->cache_vol = cp;
2080             blocks                      = q->b->len;
2081 
2082             bool vol_clear = clear || d->cleared || q->new_block;
2083 #if AIO_MODE == AIO_MODE_NATIVE
2084             eventProcessor.schedule_imm(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear));
2085 #else
2086             cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear);
2087 #endif
2088             vol_no++;
2089             cache_size += blocks;
2090           }
2091         }
2092       }
2093       total_nvol += vol_no;
2094     }
2095   }
2096   if (total_nvol == 0) {
2097     return open_done();
2098   }
2099   cache_read_done = 1;
2100   return 0;
2101 }
2102 
2103 int
close()2104 Cache::close()
2105 {
2106   return -1;
2107 }
2108 
2109 int
dead(int,Event *)2110 CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */)
2111 {
2112   ink_assert(0);
2113   return EVENT_DONE;
2114 }
2115 
2116 bool
is_pread_capable()2117 CacheVC::is_pread_capable()
2118 {
2119   return !f.read_from_writer_called;
2120 }
2121 
2122 #define STORE_COLLISION 1
2123 
2124 static void
unmarshal_helper(Doc * doc,Ptr<IOBufferData> & buf,int & okay)2125 unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
2126 {
2127   using UnmarshalFunc           = int(char *buf, int len, RefCountObj *block_ref);
2128   UnmarshalFunc *unmarshal_func = &HTTPInfo::unmarshal;
2129   ts::VersionNumber version(doc->v_major, doc->v_minor);
2130 
2131   // introduced by https://github.com/apache/trafficserver/pull/4874, this is used to distinguish the doc version
2132   // before and after #4847
2133   if (version < CACHE_DB_VERSION) {
2134     unmarshal_func = &HTTPInfo::unmarshal_v24_1;
2135   }
2136 
2137   char *tmp = doc->hdr();
2138   int len   = doc->hlen;
2139   while (len > 0) {
2140     int r = unmarshal_func(tmp, len, buf.get());
2141     if (r < 0) {
2142       ink_assert(!"CacheVC::handleReadDone unmarshal failed");
2143       okay = 0;
2144       break;
2145     }
2146     len -= r;
2147     tmp += r;
2148   }
2149 }
2150 
2151 // [amc] I think this is where all disk reads from cache funnel through here.
2152 int
handleReadDone(int event,Event * e)2153 CacheVC::handleReadDone(int event, Event *e)
2154 {
2155   cancel_trigger();
2156   ink_assert(this_ethread() == mutex->thread_holding);
2157 
2158   Doc *doc = nullptr;
2159   if (event == AIO_EVENT_DONE) {
2160     set_io_not_in_progress();
2161   } else if (is_io_in_progress()) {
2162     return EVENT_CONT;
2163   }
2164   {
2165     MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
2166     if (!lock.is_locked()) {
2167       VC_SCHED_LOCK_RETRY();
2168     }
2169     if ((!dir_valid(vol, &dir)) || (!io.ok())) {
2170       if (!io.ok()) {
2171         Debug("cache_disk_error", "Read error on disk %s\n \
2172 	    read range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
2173               vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
2174               (uint64_t)io.aiocb.aio_offset / 512, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
2175       }
2176       goto Ldone;
2177     }
2178 
2179     doc = reinterpret_cast<Doc *>(buf->data());
2180     ink_assert(vol->mutex->nthread_holding < 1000);
2181     ink_assert(doc->magic == DOC_MAGIC);
2182 
2183     if (ts::VersionNumber(doc->v_major, doc->v_minor) > CACHE_DB_VERSION) {
2184       // future version, count as corrupted
2185       doc->magic = DOC_CORRUPT;
2186       Debug("cache_bc", "Object is future version %d:%d - disk %s - doc id = %" PRIx64 ":%" PRIx64 "", doc->v_major, doc->v_minor,
2187             vol->hash_text.get(), read_key->slice64(0), read_key->slice64(1));
2188       goto Ldone;
2189     }
2190 
2191 #ifdef VERIFY_JTEST_DATA
2192     char xx[500];
2193     if (read_key && *read_key == doc->key && request.valid() && !dir_head(&dir) && !vio.ndone) {
2194       int ib = 0, xd = 0;
2195       request.url_get()->print(xx, 500, &ib, &xd);
2196       char *x = xx;
2197       for (int q = 0; q < 3; q++)
2198         x = strchr(x + 1, '/');
2199       ink_assert(!memcmp(doc->data(), x, ib - (x - xx)));
2200     }
2201 #endif
2202 
2203     if (is_debug_tag_set("cache_read")) {
2204       char xt[CRYPTO_HEX_SIZE];
2205       Debug("cache_read", "Read complete on fragment %s. Length: data payload=%d this fragment=%d total doc=%" PRId64 " prefix=%d",
2206             doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len, doc->prefix_len());
2207     }
2208 
2209     // put into ram cache?
2210     if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) && doc->magic == DOC_MAGIC) {
2211       int okay = 1;
2212       if (!f.doc_from_ram_cache) {
2213         f.not_from_ram_cache = 1;
2214       }
2215       if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
2216         // verify that the checksum matches
2217         uint32_t checksum = 0;
2218         for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
2219           checksum += *b;
2220         }
2221         ink_assert(checksum == doc->checksum);
2222         if (checksum != doc->checksum) {
2223           Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
2224                doc->first_key.b[0], doc->first_key.b[1], doc->len, doc->hlen, vol->path, (uint64_t)io.aiocb.aio_offset,
2225                (size_t)io.aiocb.aio_nbytes);
2226           doc->magic = DOC_CORRUPT;
2227           okay       = 0;
2228         }
2229       }
2230       (void)e; // Avoid compiler warnings
2231       bool http_copy_hdr = false;
2232       http_copy_hdr =
2233         cache_config_ram_cache_compress && !f.doc_from_ram_cache && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
2234       // If http doc we need to unmarshal the headers before putting in the ram cache
2235       // unless it could be compressed
2236       if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
2237         unmarshal_helper(doc, buf, okay);
2238       }
2239       // Put the request in the ram cache only if its a open_read or lookup
2240       if (vio.op == VIO::READ && okay) {
2241         bool cutoff_check;
2242         // cutoff_check :
2243         // doc_len == 0 for the first fragment (it is set from the vector)
2244         //                The decision on the first fragment is based on
2245         //                doc->total_len
2246         // After that, the decision is based of doc_len (doc_len != 0)
2247         // (cache_config_ram_cache_cutoff == 0) : no cutoffs
2248         cutoff_check =
2249           ((!doc_len && static_cast<int64_t>(doc->total_len) < cache_config_ram_cache_cutoff) ||
2250            (doc_len && static_cast<int64_t>(doc_len) < cache_config_ram_cache_cutoff) || !cache_config_ram_cache_cutoff);
2251         if (cutoff_check && !f.doc_from_ram_cache) {
2252           uint64_t o = dir_offset(&dir);
2253           vol->ram_cache->put(read_key, buf.get(), doc->len, http_copy_hdr, static_cast<uint32_t>(o >> 32),
2254                               static_cast<uint32_t>(o));
2255         }
2256         if (!doc_len) {
2257           // keep a pointer to it. In case the state machine decides to
2258           // update this document, we don't have to read it back in memory
2259           // again
2260           vol->first_fragment_key    = *read_key;
2261           vol->first_fragment_offset = dir_offset(&dir);
2262           vol->first_fragment_data   = buf;
2263         }
2264       } // end VIO::READ check
2265       // If it could be compressed, unmarshal after
2266       if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
2267         unmarshal_helper(doc, buf, okay);
2268       }
2269     } // end io.ok() check
2270   }
2271 Ldone:
2272   POP_HANDLER;
2273   return handleEvent(AIO_EVENT_DONE, nullptr);
2274 }
2275 
2276 int
handleRead(int,Event *)2277 CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
2278 {
2279   cancel_trigger();
2280 
2281   f.doc_from_ram_cache = false;
2282 
2283   // check ram cache
2284   ink_assert(vol->mutex->thread_holding == this_ethread());
2285   int64_t o           = dir_offset(&dir);
2286   int ram_hit_state   = vol->ram_cache->get(read_key, &buf, static_cast<uint32_t>(o >> 32), static_cast<uint32_t>(o));
2287   f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
2288   if (ram_hit_state >= RAM_HIT_COMPRESS_NONE) {
2289     goto LramHit;
2290   }
2291 
2292   // check if it was read in the last open_read call
2293   if (*read_key == vol->first_fragment_key && dir_offset(&dir) == vol->first_fragment_offset) {
2294     buf = vol->first_fragment_data;
2295     goto LmemHit;
2296   }
2297   // see if its in the aggregation buffer
2298   if (dir_agg_buf_valid(vol, &dir)) {
2299     int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos;
2300     buf            = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
2301     ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos);
2302     char *doc = buf->data();
2303     char *agg = vol->agg_buffer + agg_offset;
2304     memcpy(doc, agg, io.aiocb.aio_nbytes);
2305     io.aio_result = io.aiocb.aio_nbytes;
2306     SET_HANDLER(&CacheVC::handleReadDone);
2307     return EVENT_RETURN;
2308   }
2309 
2310   io.aiocb.aio_fildes = vol->fd;
2311   io.aiocb.aio_offset = vol->vol_offset(&dir);
2312   if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(vol->skip + vol->len)) {
2313     io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
2314   }
2315   buf              = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
2316   io.aiocb.aio_buf = buf->data();
2317   io.action        = this;
2318   io.thread        = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
2319   SET_HANDLER(&CacheVC::handleReadDone);
2320   ink_assert(ink_aio_read(&io) >= 0);
2321   CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
2322   return EVENT_CONT;
2323 
2324 LramHit : {
2325   f.doc_from_ram_cache = true;
2326   io.aio_result        = io.aiocb.aio_nbytes;
2327   Doc *doc             = reinterpret_cast<Doc *>(buf->data());
2328   if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
2329     SET_HANDLER(&CacheVC::handleReadDone);
2330     return EVENT_RETURN;
2331   }
2332 }
2333 LmemHit:
2334   f.doc_from_ram_cache = true;
2335   io.aio_result        = io.aiocb.aio_nbytes;
2336   POP_HANDLER;
2337   return EVENT_RETURN; // allow the caller to release the volume lock
2338 }
2339 
2340 Action *
lookup(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)2341 Cache::lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
2342 {
2343   if (!CacheProcessor::IsCacheReady(type)) {
2344     cont->handleEvent(CACHE_EVENT_LOOKUP_FAILED, nullptr);
2345     return ACTION_RESULT_DONE;
2346   }
2347 
2348   Vol *vol          = key_to_vol(key, hostname, host_len);
2349   ProxyMutex *mutex = cont->mutex.get();
2350   CacheVC *c        = new_CacheVC(cont);
2351   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
2352   c->vio.op    = VIO::READ;
2353   c->base_stat = cache_lookup_active_stat;
2354   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
2355   c->first_key = c->key = *key;
2356   c->frag_type          = type;
2357   c->f.lookup           = 1;
2358   c->vol                = vol;
2359   c->last_collision     = nullptr;
2360 
2361   if (c->handleEvent(EVENT_INTERVAL, nullptr) == EVENT_CONT) {
2362     return &c->_action;
2363   } else {
2364     return ACTION_RESULT_DONE;
2365   }
2366 }
2367 
2368 int
removeEvent(int,Event *)2369 CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
2370 {
2371   cancel_trigger();
2372   set_io_not_in_progress();
2373   {
2374     MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
2375     if (!lock.is_locked()) {
2376       VC_SCHED_LOCK_RETRY();
2377     }
2378     if (_action.cancelled) {
2379       if (od) {
2380         vol->close_write(this);
2381         od = nullptr;
2382       }
2383       goto Lfree;
2384     }
2385     if (!f.remove_aborted_writers) {
2386       if (vol->open_write(this, true, 1)) {
2387         // writer  exists
2388         od = vol->open_read(&key);
2389         ink_release_assert(od);
2390         od->dont_update_directory = true;
2391         od                        = nullptr;
2392       } else {
2393         od->dont_update_directory = true;
2394       }
2395       f.remove_aborted_writers = 1;
2396     }
2397   Lread:
2398     SET_HANDLER(&CacheVC::removeEvent);
2399     if (!buf) {
2400       goto Lcollision;
2401     }
2402     if (!dir_valid(vol, &dir)) {
2403       last_collision = nullptr;
2404       goto Lcollision;
2405     }
2406     // check read completed correct FIXME: remove bad vols
2407     if (static_cast<size_t>(io.aio_result) != io.aiocb.aio_nbytes) {
2408       goto Ldone;
2409     }
2410     {
2411       // verify that this is our document
2412       Doc *doc = reinterpret_cast<Doc *>(buf->data());
2413       /* should be first_key not key..right?? */
2414       if (doc->first_key == key) {
2415         ink_assert(doc->magic == DOC_MAGIC);
2416         if (dir_delete(&key, vol, &dir) > 0) {
2417           if (od) {
2418             vol->close_write(this);
2419           }
2420           od = nullptr;
2421           goto Lremoved;
2422         }
2423         goto Ldone;
2424       }
2425     }
2426   Lcollision:
2427     // check for collision
2428     if (dir_probe(&key, vol, &dir, &last_collision) > 0) {
2429       int ret = do_read_call(&key);
2430       if (ret == EVENT_RETURN) {
2431         goto Lread;
2432       }
2433       return ret;
2434     }
2435   Ldone:
2436     CACHE_INCREMENT_DYN_STAT(cache_remove_failure_stat);
2437     if (od) {
2438       vol->close_write(this);
2439     }
2440   }
2441   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
2442   _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void *)-ECACHE_NO_DOC);
2443   goto Lfree;
2444 Lremoved:
2445   _action.continuation->handleEvent(CACHE_EVENT_REMOVE, nullptr);
2446 Lfree:
2447   return free_CacheVC(this);
2448 }
2449 
2450 Action *
remove(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)2451 Cache::remove(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
2452 {
2453   if (!CacheProcessor::IsCacheReady(type)) {
2454     if (cont) {
2455       cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, nullptr);
2456     }
2457     return ACTION_RESULT_DONE;
2458   }
2459 
2460   Ptr<ProxyMutex> mutex;
2461   if (!cont) {
2462     cont = new_CacheRemoveCont();
2463   }
2464 
2465   CACHE_TRY_LOCK(lock, cont->mutex, this_ethread());
2466   ink_assert(lock.is_locked());
2467   Vol *vol = key_to_vol(key, hostname, host_len);
2468   // coverity[var_decl]
2469   Dir result;
2470   dir_clear(&result); // initialized here, set result empty so we can recognize missed lock
2471   mutex = cont->mutex;
2472 
2473   CacheVC *c   = new_CacheVC(cont);
2474   c->vio.op    = VIO::NONE;
2475   c->frag_type = type;
2476   c->base_stat = cache_remove_active_stat;
2477   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
2478   c->first_key = c->key = *key;
2479   c->vol                = vol;
2480   c->dir                = result;
2481   c->f.remove           = 1;
2482 
2483   SET_CONTINUATION_HANDLER(c, &CacheVC::removeEvent);
2484   int ret = c->removeEvent(EVENT_IMMEDIATE, nullptr);
2485   if (ret == EVENT_DONE) {
2486     return ACTION_RESULT_DONE;
2487   } else {
2488     return &c->_action;
2489   }
2490 }
2491 // CacheVConnection
2492 
CacheVConnection()2493 CacheVConnection::CacheVConnection() : VConnection(nullptr) {}
2494 
2495 void
cplist_init()2496 cplist_init()
2497 {
2498   cp_list_len = 0;
2499   for (int i = 0; i < gndisks; i++) {
2500     CacheDisk *d = gdisks[i];
2501     DiskVol **dp = d->disk_vols;
2502     for (unsigned int j = 0; j < d->header->num_volumes; j++) {
2503       ink_assert(dp[j]->dpb_queue.head);
2504       CacheVol *p = cp_list.head;
2505       while (p) {
2506         if (p->vol_number == dp[j]->vol_number) {
2507           ink_assert(p->scheme == (int)dp[j]->dpb_queue.head->b->type);
2508           p->size += dp[j]->size;
2509           p->num_vols += dp[j]->num_volblocks;
2510           p->disk_vols[i] = dp[j];
2511           break;
2512         }
2513         p = p->link.next;
2514       }
2515       if (!p) {
2516         // did not find a volume in the cache vol list...create
2517         // a new one
2518         CacheVol *new_p   = new CacheVol();
2519         new_p->vol_number = dp[j]->vol_number;
2520         new_p->num_vols   = dp[j]->num_volblocks;
2521         new_p->size       = dp[j]->size;
2522         new_p->scheme     = dp[j]->dpb_queue.head->b->type;
2523         new_p->disk_vols  = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2524         memset(new_p->disk_vols, 0, gndisks * sizeof(DiskVol *));
2525         new_p->disk_vols[i] = dp[j];
2526         cp_list.enqueue(new_p);
2527         cp_list_len++;
2528       }
2529     }
2530   }
2531 }
2532 
2533 void
cplist_update()2534 cplist_update()
2535 {
2536   /* go through cplist and delete volumes that are not in the volume.config */
2537   CacheVol *cp = cp_list.head;
2538   ConfigVol *config_vol;
2539 
2540   while (cp) {
2541     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2542       if (config_vol->number == cp->vol_number) {
2543         if (cp->scheme == config_vol->scheme) {
2544           config_vol->cachep = cp;
2545         } else {
2546           /* delete this volume from all the disks */
2547           int d_no;
2548           int clearCV = 1;
2549 
2550           for (d_no = 0; d_no < gndisks; d_no++) {
2551             if (cp->disk_vols[d_no]) {
2552               if (cp->disk_vols[d_no]->disk->forced_volume_num == cp->vol_number) {
2553                 clearCV            = 0;
2554                 config_vol->cachep = cp;
2555               } else {
2556                 cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
2557                 cp->disk_vols[d_no] = nullptr;
2558               }
2559             }
2560           }
2561           if (clearCV) {
2562             config_vol = nullptr;
2563           }
2564         }
2565         break;
2566       }
2567     }
2568 
2569     if (!config_vol) {
2570       // did not find a matching volume in the config file.
2571       // Delete hte volume from the cache vol list
2572       int d_no;
2573       for (d_no = 0; d_no < gndisks; d_no++) {
2574         if (cp->disk_vols[d_no]) {
2575           cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
2576         }
2577       }
2578       CacheVol *temp_cp = cp;
2579       cp                = cp->link.next;
2580       cp_list.remove(temp_cp);
2581       cp_list_len--;
2582       delete temp_cp;
2583       continue;
2584     } else {
2585       cp = cp->link.next;
2586     }
2587   }
2588 }
2589 
2590 static int
fillExclusiveDisks(CacheVol * cp)2591 fillExclusiveDisks(CacheVol *cp)
2592 {
2593   int diskCount     = 0;
2594   int volume_number = cp->vol_number;
2595 
2596   Debug("cache_init", "volume %d", volume_number);
2597   for (int i = 0; i < gndisks; i++) {
2598     if (gdisks[i]->forced_volume_num != volume_number) {
2599       continue;
2600     }
2601     /* The user had created several volumes before - clear the disk
2602        and create one volume for http */
2603     for (int j = 0; j < static_cast<int>(gdisks[i]->header->num_volumes); j++) {
2604       if (volume_number != gdisks[i]->disk_vols[j]->vol_number) {
2605         Note("Clearing Disk: %s", gdisks[i]->path);
2606         gdisks[i]->delete_all_volumes();
2607         break;
2608       }
2609     }
2610     diskCount++;
2611 
2612     int64_t size_diff = gdisks[i]->num_usable_blocks;
2613     DiskVolBlock *dpb;
2614 
2615     do {
2616       dpb = gdisks[i]->create_volume(volume_number, size_diff, cp->scheme);
2617       if (dpb) {
2618         if (!cp->disk_vols[i]) {
2619           cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
2620         }
2621         size_diff -= dpb->len;
2622         cp->size += dpb->len;
2623         cp->num_vols++;
2624       } else {
2625         Debug("cache_init", "create_volume failed");
2626         break;
2627       }
2628     } while ((size_diff > 0));
2629   }
2630   return diskCount;
2631 }
2632 
2633 int
cplist_reconfigure()2634 cplist_reconfigure()
2635 {
2636   int64_t size;
2637   int volume_number;
2638   off_t size_in_blocks;
2639   ConfigVol *config_vol;
2640 
2641   gnvol = 0;
2642   if (config_volumes.num_volumes == 0) {
2643     /* only the http cache */
2644     CacheVol *cp   = new CacheVol();
2645     cp->vol_number = 0;
2646     cp->scheme     = CACHE_HTTP_TYPE;
2647     cp->disk_vols  = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2648     memset(cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
2649     cp_list.enqueue(cp);
2650     cp_list_len++;
2651     for (int i = 0; i < gndisks; i++) {
2652       if (gdisks[i]->header->num_volumes != 1 || gdisks[i]->disk_vols[0]->vol_number != 0) {
2653         /* The user had created several volumes before - clear the disk
2654            and create one volume for http */
2655         Note("Clearing Disk: %s", gdisks[i]->path);
2656         gdisks[i]->delete_all_volumes();
2657       }
2658       if (gdisks[i]->cleared) {
2659         uint64_t free_space = gdisks[i]->free_space * STORE_BLOCK_SIZE;
2660         int vols            = (free_space / MAX_VOL_SIZE) + 1;
2661         for (int p = 0; p < vols; p++) {
2662           off_t b = gdisks[i]->free_space / (vols - p);
2663           Debug("cache_hosting", "blocks = %" PRId64, (int64_t)b);
2664           DiskVolBlock *dpb = gdisks[i]->create_volume(0, b, CACHE_HTTP_TYPE);
2665           ink_assert(dpb && dpb->len == (uint64_t)b);
2666         }
2667         ink_assert(gdisks[i]->free_space == 0);
2668       }
2669 
2670       ink_assert(gdisks[i]->header->num_volumes == 1);
2671       DiskVol **dp = gdisks[i]->disk_vols;
2672       gnvol += dp[0]->num_volblocks;
2673       cp->size += dp[0]->size;
2674       cp->num_vols += dp[0]->num_volblocks;
2675       cp->disk_vols[i] = dp[0];
2676     }
2677 
2678   } else {
2679     for (int i = 0; i < gndisks; i++) {
2680       if (gdisks[i]->header->num_volumes == 1 && gdisks[i]->disk_vols[0]->vol_number == 0) {
2681         /* The user had created several volumes before - clear the disk
2682            and create one volume for http */
2683         Note("Clearing Disk: %s", gdisks[i]->path);
2684         gdisks[i]->delete_all_volumes();
2685       }
2686     }
2687 
2688     /* change percentages in the config partitions to absolute value */
2689     off_t tot_space_in_blks = 0;
2690     off_t blocks_per_vol    = VOL_BLOCK_SIZE / STORE_BLOCK_SIZE;
2691     /* sum up the total space available on all the disks.
2692        round down the space to 128 megabytes */
2693     for (int i = 0; i < gndisks; i++) {
2694       tot_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol;
2695     }
2696 
2697     double percent_remaining = 100.00;
2698     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2699       if (config_vol->in_percent) {
2700         if (config_vol->percent > percent_remaining) {
2701           Warning("total volume sizes added up to more than 100%%!");
2702           Warning("no volumes created");
2703           return -1;
2704         }
2705         int64_t space_in_blks = static_cast<int64_t>(((config_vol->percent / percent_remaining)) * tot_space_in_blks);
2706 
2707         space_in_blks = space_in_blks >> (20 - STORE_BLOCK_SHIFT);
2708         /* round down to 128 megabyte multiple */
2709         space_in_blks    = (space_in_blks >> 7) << 7;
2710         config_vol->size = space_in_blks;
2711         tot_space_in_blks -= space_in_blks << (20 - STORE_BLOCK_SHIFT);
2712         percent_remaining -= (config_vol->size < 128) ? 0 : config_vol->percent;
2713       }
2714       if (config_vol->size < 128) {
2715         Warning("the size of volume %d (%" PRId64 ") is less than the minimum required volume size %d", config_vol->number,
2716                 (int64_t)config_vol->size, 128);
2717         Warning("volume %d is not created", config_vol->number);
2718       }
2719       Debug("cache_hosting", "Volume: %d Size: %" PRId64, config_vol->number, (int64_t)config_vol->size);
2720     }
2721     cplist_update();
2722     /* go through volume config and grow and create volumes */
2723 
2724     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2725       // if volume is given exclusive disks, fill here and continue
2726       if (!config_vol->cachep) {
2727         continue;
2728       }
2729       fillExclusiveDisks(config_vol->cachep);
2730     }
2731 
2732     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2733       size = config_vol->size;
2734