bes  Updated for version 3.20.5
GlobalMetadataStore.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of HYrax, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2018 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <fcntl.h> // for posix_advise
28 #include <unistd.h>
29 
30 #include <cerrno>
31 #include <cstring>
32 
33 #include <iostream>
34 #include <string>
35 #include <fstream>
36 #include <sstream>
37 #include <functional>
38 #include <memory>
39 
40 #include <DapObj.h>
41 #include <DDS.h>
42 #include <DAS.h>
43 #include <DMR.h>
44 #include <D4ParserSax2.h>
45 #include <XMLWriter.h>
46 #include <BaseTypeFactory.h>
47 #include <D4BaseTypeFactory.h>
48 
49 #include "PicoSHA2/picosha2.h"
50 
51 #include "TempFile.h"
52 #include "TheBESKeys.h"
53 #include "BESUtil.h"
54 #include "BESLog.h"
55 #include "BESContextManager.h"
56 #include "BESDebug.h"
57 
58 #include "BESInternalError.h"
59 #include "BESInternalFatalError.h"
60 
61 #include "GlobalMetadataStore.h"
62 
63 #define DEBUG_KEY "metadata_store"
64 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
65 
66 #ifdef HAVE_ATEXIT
67 #define AT_EXIT(x) atexit((x))
68 #else
69 #define AT_EXIT(x)
70 #endif
71 
81 #undef SYMETRIC_ADD_RESPONSES
82 
83 using namespace std;
84 using namespace libdap;
85 using namespace bes;
86 
87 static const unsigned int default_cache_size = 20; // 20 GB
88 static const string default_cache_prefix = "mds";
89 static const string default_cache_dir = ""; // I'm making the default empty so that no key == no caching. jhrg 9.26.16
90 static const string default_ledger_name = "mds_ledger.txt";
91 
92 static const string PATH_KEY = "DAP.GlobalMetadataStore.path";
93 static const string PREFIX_KEY = "DAP.GlobalMetadataStore.prefix";
94 static const string SIZE_KEY = "DAP.GlobalMetadataStore.size";
95 static const string LEDGER_KEY = "DAP.GlobalMetadataStore.ledger";
96 static const string LOCAL_TIME_KEY = "BES.LogTimeLocal";
97 
98 GlobalMetadataStore *GlobalMetadataStore::d_instance = 0;
99 bool GlobalMetadataStore::d_enabled = true;
100 
115 void GlobalMetadataStore::transfer_bytes(int fd, ostream &os)
116 {
117  static const int BUFFER_SIZE = 16*1024;
118 
119 #if _POSIX_C_SOURCE >= 200112L
120  /* Advise the kernel of our access pattern. */
121  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
122  if (status != 0)
123  ERROR("Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
124 #endif
125 
126  char buf[BUFFER_SIZE + 1];
127 
128  while(int bytes_read = read(fd, buf, BUFFER_SIZE))
129  {
130  if(bytes_read == -1)
131  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
132  if (!bytes_read)
133  break;
134 
135  os.write(buf, bytes_read);
136  }
137 }
138 
151 void GlobalMetadataStore::insert_xml_base(int fd, ostream &os, const string &xml_base)
152 {
153  static const int BUFFER_SIZE = 1024;
154 
155 #if _POSIX_C_SOURCE >= 200112L
156  /* Advise the kernel of our access pattern. */
157  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
158  if (status != 0)
159  ERROR("Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
160 #endif
161 
162  char buf[BUFFER_SIZE + 1];
163  size_t bytes_read = read(fd, buf, BUFFER_SIZE);
164 
165  if(bytes_read == (size_t)-1)
166  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
167 
168  if (bytes_read == 0)
169  return;
170 
171  // Every valid DMR/++ response in the MDS starts with:
172  // <?xml version="1.0" encoding="ISO‌-8859-1"?>
173  //
174  // and has one of two kinds of <Dataset...> tags
175  // 1: <Dataset xmlns="..." xml:base="file:DMR_1.xml" ... >
176  // 2: <Dataset xmlns="..." ... >
177  //
178  // Assume it is well formed and always includes the prolog,
179  // but might not use <CR> <CRLF> chars
180 
181  // transfer the prolog (<?xml version="1.0" encoding="ISO‌-8859-1"?>)
182  size_t i = 0;
183  while (buf[i++] != '>')
184  ; // 'i' now points one char past the xml prolog
185  os.write(buf, i);
186 
187  // transfer <Dataset ...> with new value for xml:base
188  size_t s = i; // start of <Dataset ...>
189  size_t j = 0;
190  char xml_base_literal[] = "xml:base";
191  while (i < bytes_read) {
192  if (buf[i] == '>') { // Found end of Dataset; no xml:base was present
193  os.write(buf + s, i - s);
194  os << " xml:base=\"" << xml_base << "\"";
195  break;
196  }
197  else if (j == sizeof(xml_base_literal) - 1) { // found 'xml:base' literal
198  os.write(buf + s, i - s); // This will include all of <Dataset... including 'xml:base'
199  while (buf[i++] != '=')
200  ; // read/discard '="..."'
201  while (buf[i++] != '"')
202  ;
203  while (buf[i++] != '"')
204  ;
205  os << "=\"" << xml_base << "\""; // write the new xml:base value
206  break;
207  }
208  else if (buf[i] == xml_base_literal[j]) {
209  ++j;
210  }
211  else {
212  j = 0;
213  }
214 
215  ++i;
216  }
217 
218  // transfer the rest
219  os.write(buf + i, bytes_read - i);
220 
221  // Now, if the response is more than 1k, use faster code to finish the tx
222  transfer_bytes(fd, os);
223 }
224 
225 unsigned long GlobalMetadataStore::get_cache_size_from_config()
226 {
227  bool found;
228  string size;
229  unsigned long size_in_megabytes = default_cache_size;
230  TheBESKeys::TheKeys()->get_value(SIZE_KEY, size, found);
231  if (found) {
232  BESDEBUG(DEBUG_KEY,
233  "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY << "=" << size << endl);
234  istringstream iss(size);
235  iss >> size_in_megabytes;
236  }
237 
238  return size_in_megabytes;
239 }
240 
241 string GlobalMetadataStore::get_cache_prefix_from_config()
242 {
243  bool found;
244  string prefix = default_cache_prefix;
245  TheBESKeys::TheKeys()->get_value(PREFIX_KEY, prefix, found);
246  if (found) {
247  BESDEBUG(DEBUG_KEY,
248  "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY << "=" << prefix << endl);
249  prefix = BESUtil::lowercase(prefix);
250  }
251 
252  return prefix;
253 }
254 
255 // If the cache prefix is the empty string, the cache is turned off.
256 string GlobalMetadataStore::get_cache_dir_from_config()
257 {
258  bool found;
259 
260  string cacheDir = default_cache_dir;
261  TheBESKeys::TheKeys()->get_value(PATH_KEY, cacheDir, found);
262  if (found) {
263  BESDEBUG(DEBUG_KEY,
264  "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<< "=" << cacheDir << endl);
265  }
266 
267  return cacheDir;
268 }
269 
283 
302 GlobalMetadataStore::get_instance(const string &cache_dir, const string &prefix, unsigned long long size)
303 {
304  if (d_enabled && d_instance == 0) {
305  d_instance = new GlobalMetadataStore(cache_dir, prefix, size); // never returns null_ptr
306  d_enabled = d_instance->cache_enabled();
307  if (!d_enabled) {
308  delete d_instance;
309  d_instance = 0;
310 
311  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
312  }
313  else {
314  AT_EXIT(delete_instance);
315 
316  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
317  }
318  }
319 
320  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
321 
322  return d_instance;
323 }
324 
332 GlobalMetadataStore::get_instance()
333 {
334  if (d_enabled && d_instance == 0) {
335  d_instance = new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
336  get_cache_size_from_config());
337  d_enabled = d_instance->cache_enabled();
338  if (!d_enabled) {
339  delete d_instance;
340  d_instance = NULL;
341  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
342  }
343  else {
344  AT_EXIT(delete_instance);
345 
346  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
347  }
348  }
349 
350  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance() - d_instance: " << (void *) d_instance << endl);
351 
352  return d_instance;
353 }
355 
359 void
360 GlobalMetadataStore::initialize()
361 {
362  bool found;
363 
364  TheBESKeys::TheKeys()->get_value(LEDGER_KEY, d_ledger_name, found);
365  if (found) {
366  BESDEBUG(DEBUG_KEY, "Located BES key " << LEDGER_KEY << "=" << d_ledger_name << endl);
367  }
368  else {
369  d_ledger_name = default_ledger_name;
370  }
371 
372  // By default, use UTC in the logs.
373  string local_time = "no";
374  TheBESKeys::TheKeys()->get_value(LOCAL_TIME_KEY, local_time, found);
375  d_use_local_time = (local_time == "YES" || local_time == "Yes" || local_time == "yes");
376 }
377 
392 GlobalMetadataStore::GlobalMetadataStore()
394  : BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
395 {
396  initialize();
397 }
398 
399 GlobalMetadataStore::GlobalMetadataStore(const string &cache_dir, const string &prefix,
400  unsigned long long size) : BESFileLockingCache(cache_dir, prefix, size)
401 {
402  initialize();
403 }
405 
410 static void dump_time(ostream &os, bool use_local_time)
411 {
412  time_t now;
413  time(&now);
414  char buf[sizeof "YYYY-MM-DDTHH:MM:SSzone"];
415  int status = 0;
416 
417  // From StackOverflow:
418  // This will work too, if your compiler doesn't support %F or %T:
419  // strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%S%Z", gmtime(&now));
420  //
421  // Apologies for the twisted logic - UTC is the default. Override to
422  // local time using BES.LogTimeLocal=yes in bes.conf. jhrg 11/15/17
423  if (!use_local_time)
424  status = strftime(buf, sizeof buf, "%FT%T%Z", gmtime(&now));
425  else
426  status = strftime(buf, sizeof buf, "%FT%T%Z", localtime(&now));
427 
428  if (!status)
429  LOG("Error getting time for Metadata Store ledger.");
430 
431  os << buf;
432 }
433 
437 void
439 {
440  // TODO open just once
441  // FIXME Protect this with an exclusive lock!
442  ofstream of(d_ledger_name.c_str(), ios::app);
443  if (of) {
444  dump_time(of, d_use_local_time);
445  of << " " << d_ledger_entry << endl;
446  VERBOSE("MD Ledger name: '" << d_ledger_name << "', entry: '" << d_ledger_entry + "'.");
447  }
448  else {
449  LOG("Warning: Metadata store could not write to is ledger file.");
450  }
451 }
452 
459 string
460 GlobalMetadataStore::get_hash(const string &name)
461 {
462  if (name.empty())
463  throw BESInternalError("Empty name passed to the Metadata Store.", __FILE__, __LINE__);
464 
465  return picosha2::hash256_hex_string(name[0] == '/' ? name : "/" + name);
466 }
467 
485 {
486  if (d_dds) {
487  D4BaseTypeFactory factory;
488  DMR dmr(&factory, *d_dds);
489  XMLWriter xml;
490  dmr.print_dap4(xml);
491  os << xml.get_doc();
492  }
493  else if (d_dmr) {
494  XMLWriter xml;
495  d_dmr->print_dap4(xml);
496  os << xml.get_doc();
497  }
498  else {
499  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
500  }
501 }
502 
505  if (d_dds)
506  d_dds->print(os);
507  else if (d_dmr)
508  d_dmr->getDDS()->print(os);
509  else
510  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
511 }
512 
515  if (d_dds)
516  d_dds->print_das(os);
517  else if (d_dmr)
518  d_dmr->getDDS()->print_das(os);
519  else
520  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
521 }
522 
537 bool
538 GlobalMetadataStore::store_dap_response(StreamDAP &writer, const string &key, const string &name,
539  const string &response_name)
540 {
541  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " BEGIN " << key << endl);
542 
543  string item_name = get_cache_file_name(key, false /*mangle*/);
544 
545  int fd;
546  if (create_and_lock(item_name, fd)) {
547  // If here, the cache_file_name could not be locked for read access;
548  // try to build it. First make an empty files and get an exclusive lock on them.
549  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Storing " << item_name << endl);
550 
551  // Get an output stream directed at the locked cache file
552  ofstream response(item_name.c_str(), ios::out|ios::app);
553  if (!response.is_open())
554  throw BESInternalError("Could not open '" + key + "' to write the response.", __FILE__, __LINE__);
555 
556  try {
557  // for the different writers, look at the StreamDAP struct in the class
558  // definition. jhrg 2.27.18
559  writer(response); // different writers can write the DDS, DAS or DMR
560 
561  // Compute/update/maintain the cache size? This is extra work
562  // that might never be used. It also locks the cache...
563  if (!is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
564  // This enables the call to update_cache_info() below.
566 
567  unsigned long long size = update_cache_info(item_name);
568  if (!is_unlimited() && cache_too_big(size)) update_and_purge(item_name);
569  }
570 
571  unlock_and_close(item_name);
572  }
573  catch (...) {
574  // Bummer. There was a problem doing The Stuff. Now we gotta clean up.
575  response.close();
576  this->purge_file(item_name);
577  unlock_and_close(item_name);
578  throw;
579  }
580 
581  VERBOSE("Metadata store: Wrote " << response_name << " response for '" << name << "'." << endl);
582  d_ledger_entry.append(" ").append(key);
583 
584  return true;
585  }
586  else if (get_read_lock(item_name, fd)) {
587  // We found the key; didn't add this because it was already here
588  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Found " << item_name << " in the store already." << endl);
589  unlock_and_close(item_name);
590 
591  LOG("Metadata store: unable to store the " << response_name << " response for '" << name << "'." << endl);
592 
593  return false;
594  }
595  else {
596  throw BESInternalError("Could neither create or open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
597  }
598 }
599 
613 
626 bool
627 GlobalMetadataStore::add_responses(DDS *dds, const string &name)
628 {
629  // Start the index entry
630  d_ledger_entry = string("add DDS ").append(name);
631 
632  // I'm appending the 'dds r' string to the name before hashing so that
633  // the different hashes for the file's DDS, DAS, ..., are all very different.
634  // This will be useful if we use S3 instead of EFS for the Metadata Store.
635  //
636  // The helper() also updates the ledger string.
637  StreamDDS write_the_dds_response(dds);
638  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
639 
640  StreamDAS write_the_das_response(dds);
641  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
642 
643 #if SYMETRIC_ADD_RESPONSES
644  StreamDMR write_the_dmr_response(dds);
645  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
646 #endif
647 
648  write_ledger(); // write the index line
649 
650 #if SYMETRIC_ADD_RESPONSES
651  return (stored_dds && stored_das && stored_dmr);
652 #else
653  return (stored_dds && stored_das);
654 #endif
655 }
656 
668 bool
669 GlobalMetadataStore::add_responses(DMR *dmr, const string &name)
670 {
671  // Start the index entry
672  d_ledger_entry = string("add DMR ").append(name);
673 
674  // I'm appending the 'dds r' string to the name before hashing so that
675  // the different hashes for the file's DDS, DAS, ..., are all very different.
676  // This will be useful if we use S3 instead of EFS for the Metadata Store.
677  //
678  // The helper() also updates the ledger string.
679 #if SYMETRIC_ADD_RESPONSES
680  StreamDDS write_the_dds_response(dmr);
681  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
682 
683  StreamDAS write_the_das_response(dmr);
684  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
685 #endif
686 
687  StreamDMR write_the_dmr_response(dmr);
688  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
689 
690  write_ledger(); // write the index line
691 
692 #if SYMETRIC_ADD_RESPONSES
693  return (stored_dds && stored_das && stored_dmr);
694 #else
695  return(stored_dmr /* && stored_dmrpp */);
696 #endif
697 }
699 
713 GlobalMetadataStore::get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
714 {
715  BESDEBUG(DEBUG_KEY, __func__ << "() MDS hashing name '" << name << "', '" << suffix << "'"<< endl);
716 
717  if (name.empty())
718  throw BESInternalError("An empty name string was received by "
719  "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
720 
721  string item_name = get_cache_file_name(get_hash(name + suffix), false);
722  int fd;
723  MDSReadLock lock(item_name, get_read_lock(item_name, fd), this);
724  BESDEBUG(DEBUG_KEY, __func__ << "() MDS lock for " << item_name << ": " << lock() << endl);
725 
726  if (lock())
727  LOG("MDS Cache hit for '" << name << "' and response " << object_name << endl);
728  else
729  LOG("MDS Cache miss for '" << name << "' and response " << object_name << endl);
730 
731  return lock;
732  }
733 
753 {
754  return get_read_lock_helper(name, "dmr_r", "DMR");
755 }
756 
765 {
766  return get_read_lock_helper(name, "dds_r", "DDS");
767 }
768 
777 {
778  return get_read_lock_helper(name, "das_r", "DAS");
779 }
780 
800 {
801  return get_read_lock_helper(name, "dmrpp_r", "DMR++");
802 }
803 
806 
814 void
815 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &object_name)
816 {
817  string item_name = get_cache_file_name(get_hash(name + suffix), false);
818  int fd; // value-result parameter;
819  if (get_read_lock(item_name, fd)) {
820  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
821  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
822  try {
823  transfer_bytes(fd, os);
824  unlock_and_close(item_name); // closes fd
825  }
826  catch (...) {
827  unlock_and_close(item_name);
828  throw;
829  }
830  }
831  else {
832  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
833  }
834 }
835 
844 void
845 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &xml_base,
846  const string &object_name)
847 {
848  string item_name = get_cache_file_name(get_hash(name + suffix), false);
849  int fd; // value-result parameter;
850  if (get_read_lock(item_name, fd)) {
851  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
852  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
853  try {
854  insert_xml_base(fd, os, xml_base);
855 
856  transfer_bytes(fd, os);
857  unlock_and_close(item_name); // closes fd
858  }
859  catch (...) {
860  unlock_and_close(item_name);
861  throw;
862  }
863  }
864  else {
865  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
866  }
867 }
869 
876 void
877 GlobalMetadataStore::write_dds_response(const std::string &name, ostream &os)
878 {
879  write_response_helper(name, os, "dds_r", "DDS");
880 }
881 
888 void
889 GlobalMetadataStore::write_das_response(const std::string &name, ostream &os)
890 {
891  write_response_helper(name, os, "das_r", "DAS");
892 }
893 
900 void
901 GlobalMetadataStore::write_dmr_response(const std::string &name, ostream &os)
902 {
903  bool found = false;
904  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
905  if (!found) {
906 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
907  write_response_helper(name, os, "dmr_r", "DMR");
908 #else
909  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
910 #endif
911  }
912  else {
913  write_response_helper(name, os, "dmr_r", xml_base, "DMR");
914  }
915 }
916 
923 void
924 GlobalMetadataStore::write_dmrpp_response(const std::string &name, ostream &os)
925 {
926  bool found = false;
927  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
928  if (!found) {
929 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
930  write_response_helper(name, os, "dmrpp_r", "DMR++");
931 #else
932  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
933 #endif
934  }
935  else {
936  write_response_helper(name, os, "dmrpp_r", xml_base, "DMR++");
937  }
938 }
939 
947 bool
948 GlobalMetadataStore::remove_response_helper(const string& name, const string &suffix, const string &object_name)
949 {
950  string hash = get_hash(name + suffix);
951  if (unlink(get_cache_file_name(hash, false).c_str()) == 0) {
952  VERBOSE("Metadata store: Removed " << object_name << " response for '" << hash << "'." << endl);
953  d_ledger_entry.append(" ").append(hash);
954  return true;
955  }
956  else {
957  LOG("Metadata store: unable to remove the " << object_name << " response for '" << name << "' (" << strerror(errno) << ")."<< endl);
958  }
959 
960  return false;
961 }
962 
969 bool
971 {
972  // Start the index entry
973  d_ledger_entry = string("remove ").append(name);
974 
975  bool removed_dds = remove_response_helper(name, "dds_r", "DDS");
976 
977  bool removed_das = remove_response_helper(name, "das_r", "DAS");
978 
979  bool removed_dmr = remove_response_helper(name, "dmr_r", "DMR");
980 
981  bool removed_dmrpp = remove_response_helper(name, "dmrpp_r", "DMR++");
982 
983  write_ledger(); // write the index line
984 
985 #if SYMETRIC_ADD_RESPONSES
986  return (removed_dds && removed_das && removed_dmr);
987 #else
988  return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
989 #endif
990 }
991 
1004 DMR *
1006 {
1007  stringstream oss;
1008  write_dmr_response(name, oss); // throws BESInternalError if not found
1009 
1010  D4BaseTypeFactory d4_btf;
1011  auto_ptr<DMR> dmr(new DMR(&d4_btf, "mds"));
1012 
1013  D4ParserSax2 parser;
1014  parser.intern(oss.str(), dmr.get());
1015 
1016  dmr->set_factory(0);
1017 
1018  return dmr.release();
1019 }
1020 
1042 DDS *
1044 {
1045  TempFile dds_tmp(get_cache_directory() + "/opendapXXXXXX");
1046 
1047  fstream dds_fs(dds_tmp.get_name().c_str(), std::fstream::out);
1048  try {
1049  write_dds_response(name, dds_fs); // throws BESInternalError if not found
1050  dds_fs.close();
1051  }
1052  catch (...) {
1053  dds_fs.close();
1054  throw;
1055  }
1056 
1057  BaseTypeFactory btf;
1058  auto_ptr<DDS> dds(new DDS(&btf));
1059  dds->parse(dds_tmp.get_name());
1060 
1061  TempFile das_tmp(get_cache_directory() + "/opendapXXXXXX");
1062  fstream das_fs(das_tmp.get_name().c_str(), std::fstream::out);
1063  try {
1064  write_das_response(name, das_fs); // throws BESInternalError if not found
1065  das_fs.close();
1066  }
1067  catch (...) {
1068  das_fs.close();
1069  throw;
1070  }
1071 
1072  auto_ptr<DAS> das(new DAS());
1073  das->parse(das_tmp.get_name());
1074 
1075  dds->transfer_attributes(das.get());
1076  dds->set_factory(0);
1077 
1078  return dds.release();
1079 }
1080 
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void operator()(ostream &os)
Use an object (DDS or DMR) to write data to the MDS.
Unlock and close the MDS item when the ReadLock goes out of scope.
virtual libdap::DMR * get_dmr_object(const std::string &name)
Build a DMR object from the cached Response.
exception thrown if an internal error is found and is fatal to the BES
exception thrown if inernal error encountered
std::string get_name() const
Definition: TempFile.h:70
static string lowercase(const string &s)
Definition: BESUtil.cc:197
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual void unlock_and_close(const std::string &target)
Instantiate with a DDS or DMR and use to write the DMR response.
void initialize()
Configure the ledger using LEDGER_KEY and LOCAL_TIME_KEY.
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
Definition: TheBESKeys.cc:420
virtual string get_context(const string &name, bool &found)
retrieve the value of the specified context from the BES
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
virtual void write_das_response(const std::string &name, std::ostream &os)
Write the stored DAS response to a stream.
virtual void write_dmrpp_response(const std::string &name, std::ostream &os)
Write the stored DMR++ response to a stream.
Implementation of a caching mechanism for compressed data.
static TheBESKeys * TheKeys()
Definition: TheBESKeys.cc:61
virtual MDSReadLock is_das_available(const std::string &name)
Is the DAS response for.
virtual void write_dds_response(const std::string &name, std::ostream &os)
Write the stored DDS response to a stream.
virtual libdap::DDS * get_dds_object(const std::string &name)
Build a DDS object from the cached Response.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual bool add_responses(libdap::DDS *dds, const std::string &name)
Add the DAP2 metadata responses using a DDS.
virtual MDSReadLock is_dds_available(const std::string &name)
Is the DDS response for.
const std::string get_cache_directory()
static void insert_xml_base(int fd, ostream &os, const string &xml_base)
like transfer_bytes(), but adds the xml:base attribute to the DMR/++
void write_response_helper(const std::string &name, std::ostream &os, const std::string &suffix, const std::string &object_name)
virtual bool remove_responses(const std::string &name)
Remove all cached responses and objects for a granule.
MDSReadLock get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
Instantiate with a DDS or DMR and use to write the DDS response.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
Instantiate with a DDS or DMR and use to write the DAS response.
bool store_dap_response(StreamDAP &writer, const std::string &key, const string &name, const string &response_name)
static void transfer_bytes(int fd, ostream &os)
std::string get_hash(const std::string &name)
virtual MDSReadLock is_dmr_available(const std::string &name)
Is the DMR response for.
virtual void write_dmr_response(const std::string &name, std::ostream &os)
Write the stored DMR response to a stream.
bool remove_response_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
Get a new temporary file.
Definition: TempFile.h:46
Store the DAP metadata responses.
virtual MDSReadLock is_dmrpp_available(const std::string &name)
Is the DMR++ response for.
virtual void purge_file(const std::string &file)
Purge a single file from the cache.