20 #include <ripple/app/misc/NetworkOPs.h>
21 #include <ripple/app/rdb/ShardArchive.h>
22 #include <ripple/basics/Archive.h>
23 #include <ripple/basics/BasicConfig.h>
24 #include <ripple/core/ConfigSections.h>
25 #include <ripple/nodestore/DatabaseShard.h>
26 #include <ripple/rpc/ShardArchiveHandler.h>
27 #include <ripple/rpc/impl/Handler.h>
34 using namespace boost::filesystem;
35 using namespace std::chrono_literals;
37 boost::filesystem::path
40 return boost::filesystem::path{
52 return std::make_unique<ShardArchiveHandler>(app);
58 auto const downloadDir(getDownloadDirectory(app.
config()));
65 return std::make_unique<RecoveryHandler>(app);
74 , j_(app.journal(
"ShardArchiveHandler"))
75 , downloadDir_(getDownloadDirectory(app.config()))
76 , timer_(app_.getIOService())
77 , verificationScheduler_(
80 "shard_verification_retry_interval")),
84 "shard_verification_max_attempts"))
96 JLOG(
j_.
warn()) <<
"Archives already being processed";
122 <<
"exception: " << e.
what() <<
" in function: " << __func__;
136 using namespace boost::filesystem;
149 JLOG(
j_.
error()) <<
"Failed to parse url: " << url_;
154 add(state, std::move(url), lock);
168 <<
" in function: " << __func__;
206 if (!
add(shardIndex, std::forward<parsedURL>(url.first), lock))
222 JLOG(
j_.
error()) <<
"Download and import already in progress";
226 auto const it{
archives_.find(shardIndex)};
228 return url == it->second;
230 archives_.emplace(shardIndex, std::move(url));
241 JLOG(
j_.
error()) <<
"No shard store available";
246 JLOG(
j_.
warn()) <<
"Archives already being processed";
251 JLOG(
j_.
warn()) <<
"No archives to process";
259 shardIndexes.
begin(),
260 [](
auto const& entry) { return entry.first; });
306 auto const shardIndex{
archives_.begin()->first};
314 bool shouldHaveHash =
false;
326 if (ec != boost::asio::error::operation_aborted)
335 "failed to wrap closure for last ledger confirmation timer", l);
339 JLOG(
j_.
error()) <<
"failed to find last ledger hash for shard "
340 << shardIndex <<
", maximum attempts reached";
352 create_directory(dstDir);
363 auto const& url{
archives_.begin()->second};
365 auto const ssl = (url.scheme ==
"https");
366 auto const defaultPort = ssl ? 443 : 80;
373 dstDir /
"archive.tar.lz4",
374 [
this](path dstPath) { complete(dstPath); },
384 "failed to wrap closure for starting download", l);
401 if (!is_regular_file(dstPath))
405 <<
"Downloading shard id " << ar->first <<
" from URL "
406 << ar->second.domain << ar->second.path;
426 auto const mode{
app_.
getOPs().getOperatingMode()};
436 [=,
this, dstPath = std::move(dstPath)](
437 boost::system::error_code
const& ec)
mutable {
438 if (ec != boost::asio::error::operation_aborted)
444 "failed to wrap closure for operating mode timer",
447 timer_.async_wait(*wrapper);
462 JLOG(
j_.
error()) <<
"failed to wrap closure for process()";
481 auto const shardDir{dstPath.parent_path() /
std::to_string(shardIndex)};
488 if (!is_directory(shardDir))
490 JLOG(
j_.
error()) <<
"Shard " << shardIndex
491 <<
" mismatches archive shard directory";
504 JLOG(
j_.
error()) <<
"Importing shard " << shardIndex;
508 JLOG(
j_.
debug()) <<
"Shard " << shardIndex <<
" downloaded and imported";
516 auto const shardIndex{
archives_.begin()->first};
553 <<
" in function: " << __func__;
std::uint32_t lastLedgerSeq(std::uint32_t shardIndex) const noexcept
Calculates the last ledger sequence for a given shard index.
static boost::filesystem::path getDownloadDirectory(Config const &config)
std::shared_ptr< DatabaseDownloader > downloader_
const boost::filesystem::path downloadDir_
void remove(std::lock_guard< std::mutex > const &)
TimerOpCounter timerCounter_
ShardVerificationScheduler verificationScheduler_
static std::unique_ptr< ShardArchiveHandler > makeShardArchiveHandler(Application &app)
bool removeAndProceed(std::lock_guard< std::mutex > const &lock)
bool start()
Starts downloading and importing archives.
bool add(std::uint32_t shardIndex, std::pair< parsedURL, std::string > &&url)
LedgerIndex getValidLedgerIndex()
static std::string shardDatabase()
void doRelease(std::lock_guard< std::mutex > const &)
virtual NodeStore::DatabaseShard * getShardStore()=0
RecoveryHandler(Application &app)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
bool initFromDB(std::lock_guard< std::mutex > const &)
bool onClosureFailed(std::string const &errorMsg, std::lock_guard< std::mutex > const &lock)
void extractTarLz4(boost::filesystem::path const &src, boost::filesystem::path const &dst)
Extract a tar archive compressed with lz4.
std::optional< LedgerHash > walkHashBySeq(std::uint32_t index, InboundLedger::Reason reason)
Walk to a ledger's hash using the skip list.
std::atomic_bool stopping_
virtual NetworkOPs & getOPs()=0
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
void process(boost::filesystem::path const &dstPath)
virtual LedgerMaster & getLedgerMaster()=0
void deleteFromArchiveDB(DatabaseCon &db, std::uint32_t shardIndex)
deleteFromArchiveDB Deletes an entry from the shard archive database.
virtual Config & config()=0
virtual JobQueue & getJobQueue()=0
void dropArchiveDB(DatabaseCon &db)
dropArchiveDB Removes a table in the shard archive database.
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
void complete(boost::filesystem::path dstPath)
bool retry(Application &app, bool shouldHaveHash, retryFunction f)
virtual bool prepareShards(std::vector< std::uint32_t > const &shardIndexes)=0
Prepare one or more shard indexes to be imported into the database.
virtual boost::asio::io_service & getIOService()=0
static constexpr auto stateDBName
virtual bool importShard(std::uint32_t shardIndex, boost::filesystem::path const &srcDir)=0
Import a shard from the shard archive handler into the shard database.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool next(std::lock_guard< std::mutex > const &l)
void readArchiveDB(DatabaseCon &db, std::function< void(std::string const &, int)> const &func)
readArchiveDB Reads entries from the shard archive database and invokes the given callback for each e...
static std::unique_ptr< ShardArchiveHandler > tryMakeRecoveryHandler(Application &app)
std::map< std::uint32_t, parsedURL > archives_
Handles the download and import of one or more shard archives.
ShardArchiveHandler()=delete
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
void insertArchiveDB(DatabaseCon &db, std::uint32_t shardIndex, std::string const &url)
insertArchiveDB Adds an entry to the shard archive database.
std::unique_ptr< DatabaseCon > makeArchiveDB(boost::filesystem::path const &dir, std::string const &dbName)
makeArchiveDB Opens the shard archive database and returns its descriptor.
virtual void removePreShard(std::uint32_t shardIndex)=0
Remove a previously prepared shard index for import.
std::shared_ptr< DatabaseDownloader > make_DatabaseDownloader(boost::asio::io_service &io_service, Config const &config, beast::Journal j)
std::unique_ptr< DatabaseCon > sqlDB_
T & get(EitherAmount &amt)
Section & section(std::string const &name)
Returns the section with the given name.
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
@ FULL
we have the ledger and can even validate