Merge pull request #1506

3ff54bdd Check for correct thread before ending batch transaction (Howard Chu)
eaf8470b Must wait for previous batch to finish before starting new one (Howard Chu)
c903c554 Don't cache block height, always get from DB (Howard Chu)
eb1fb601 Tweak default db-sync-mode to fast:async:1 (Howard Chu)
0693cff9 Use batch transactions when syncing (Howard Chu)
This commit is contained in:
Riccardo Spagni 2017-01-15 14:43:12 -05:00
commit 65e33b1bc5
No known key found for this signature in database
GPG Key ID: 55432DF31CCD4FCD
9 changed files with 71 additions and 33 deletions

View File

@ -1813,9 +1813,10 @@ bool BlockchainBDB::has_key_image(const crypto::key_image& img) const
// Ostensibly BerkeleyDB has batch transaction support built-in, // Ostensibly BerkeleyDB has batch transaction support built-in,
// so the following few functions will be NOP. // so the following few functions will be NOP.
void BlockchainBDB::batch_start(uint64_t batch_num_blocks) bool BlockchainBDB::batch_start(uint64_t batch_num_blocks)
{ {
LOG_PRINT_L3("BlockchainBDB::" << __func__); LOG_PRINT_L3("BlockchainBDB::" << __func__);
return false;
} }
void BlockchainBDB::batch_commit() void BlockchainBDB::batch_commit()

View File

@ -324,7 +324,7 @@ public:
); );
virtual void set_batch_transactions(bool batch_transactions); virtual void set_batch_transactions(bool batch_transactions);
virtual void batch_start(uint64_t batch_num_blocks=0); virtual bool batch_start(uint64_t batch_num_blocks=0);
virtual void batch_commit(); virtual void batch_commit();
virtual void batch_stop(); virtual void batch_stop();
virtual void batch_abort(); virtual void batch_abort();

View File

@ -655,16 +655,17 @@ public:
* been called. In either case, it should end the batch and write to its * been called. In either case, it should end the batch and write to its
* backing store. * backing store.
* *
* If a batch is already in-progress, this function should throw a DB_ERROR. * If a batch is already in-progress, this function must return false.
* This exception may change in the future if it is deemed necessary to * If a batch was started by this call, it must return true.
* have a more granular exception type for this scenario.
* *
* If any of this cannot be done, the subclass should throw the corresponding * If any of this cannot be done, the subclass should throw the corresponding
* subclass of DB_EXCEPTION * subclass of DB_EXCEPTION
* *
* @param batch_num_blocks number of blocks to batch together * @param batch_num_blocks number of blocks to batch together
*
* @return true if we started the batch, false if already started
*/ */
virtual void batch_start(uint64_t batch_num_blocks=0) = 0; virtual bool batch_start(uint64_t batch_num_blocks=0) = 0;
/** /**
* @brief ends a batch transaction * @brief ends a batch transaction

View File

@ -543,6 +543,7 @@ uint64_t BlockchainLMDB::get_estimated_batch_size(uint64_t batch_num_blocks) con
uint64_t min_block_size = 4 * 1024; uint64_t min_block_size = 4 * 1024;
uint64_t block_stop = 0; uint64_t block_stop = 0;
uint64_t m_height = height();
if (m_height > 1) if (m_height > 1)
block_stop = m_height - 1; block_stop = m_height - 1;
uint64_t block_start = 0; uint64_t block_start = 0;
@ -593,6 +594,7 @@ void BlockchainLMDB::add_block(const block& blk, const size_t& block_size, const
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
mdb_txn_cursors *m_cursors = &m_wcursors; mdb_txn_cursors *m_cursors = &m_wcursors;
uint64_t m_height = height();
CURSOR(block_heights) CURSOR(block_heights)
blk_height bh = {blk_hash, m_height}; blk_height bh = {blk_hash, m_height};
@ -654,6 +656,7 @@ void BlockchainLMDB::remove_block()
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
uint64_t m_height = height();
if (m_height == 0) if (m_height == 0)
throw0(BLOCK_DNE ("Attempting to remove block from an empty blockchain")); throw0(BLOCK_DNE ("Attempting to remove block from an empty blockchain"));
@ -691,6 +694,7 @@ uint64_t BlockchainLMDB::add_transaction_data(const crypto::hash& blk_hash, cons
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
mdb_txn_cursors *m_cursors = &m_wcursors; mdb_txn_cursors *m_cursors = &m_wcursors;
uint64_t m_height = height();
int result; int result;
uint64_t tx_id = m_num_txs; uint64_t tx_id = m_num_txs;
@ -787,6 +791,7 @@ uint64_t BlockchainLMDB::add_output(const crypto::hash& tx_hash,
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
mdb_txn_cursors *m_cursors = &m_wcursors; mdb_txn_cursors *m_cursors = &m_wcursors;
uint64_t m_height = height();
int result = 0; int result = 0;
@ -1018,7 +1023,6 @@ BlockchainLMDB::BlockchainLMDB(bool batch_transactions)
m_write_txn = nullptr; m_write_txn = nullptr;
m_write_batch_txn = nullptr; m_write_batch_txn = nullptr;
m_batch_active = false; m_batch_active = false;
m_height = 0;
m_cum_size = 0; m_cum_size = 0;
m_cum_count = 0; m_cum_count = 0;
@ -1143,7 +1147,7 @@ void BlockchainLMDB::open(const std::string& filename, const int mdb_flags)
if ((result = mdb_stat(txn, m_blocks, &db_stats))) if ((result = mdb_stat(txn, m_blocks, &db_stats)))
throw0(DB_ERROR(lmdb_error("Failed to query m_blocks: ", result).c_str())); throw0(DB_ERROR(lmdb_error("Failed to query m_blocks: ", result).c_str()));
LOG_PRINT_L2("Setting m_height to: " << db_stats.ms_entries); LOG_PRINT_L2("Setting m_height to: " << db_stats.ms_entries);
m_height = db_stats.ms_entries; uint64_t m_height = db_stats.ms_entries;
// get and keep current number of txs // get and keep current number of txs
if ((result = mdb_stat(txn, m_txs, &db_stats))) if ((result = mdb_stat(txn, m_txs, &db_stats)))
@ -1294,7 +1298,6 @@ void BlockchainLMDB::reset()
throw0(DB_ERROR(lmdb_error("Failed to write version to database: ", result).c_str())); throw0(DB_ERROR(lmdb_error("Failed to write version to database: ", result).c_str()));
txn.commit(); txn.commit();
m_height = 0;
m_num_outputs = 0; m_num_outputs = 0;
m_cum_size = 0; m_cum_size = 0;
m_cum_count = 0; m_cum_count = 0;
@ -1515,6 +1518,7 @@ uint64_t BlockchainLMDB::get_top_block_timestamp() const
{ {
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
uint64_t m_height = height();
// if no blocks, return 0 // if no blocks, return 0
if (m_height == 0) if (m_height == 0)
@ -1666,6 +1670,7 @@ crypto::hash BlockchainLMDB::top_block_hash() const
{ {
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
uint64_t m_height = height();
if (m_height != 0) if (m_height != 0)
{ {
return get_block_hash_from_height(m_height - 1); return get_block_hash_from_height(m_height - 1);
@ -1678,6 +1683,7 @@ block BlockchainLMDB::get_top_block() const
{ {
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
uint64_t m_height = height();
if (m_height != 0) if (m_height != 0)
{ {
@ -1692,8 +1698,14 @@ uint64_t BlockchainLMDB::height() const
{ {
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
TXN_PREFIX_RDONLY();
int result;
return m_height; // get current height
MDB_stat db_stats;
if ((result = mdb_stat(m_txn, m_blocks, &db_stats)))
throw0(DB_ERROR(lmdb_error("Failed to query m_blocks: ", result).c_str()));
return db_stats.ms_entries;
} }
bool BlockchainLMDB::tx_exists(const crypto::hash& h) const bool BlockchainLMDB::tx_exists(const crypto::hash& h) const
@ -2242,15 +2254,15 @@ bool BlockchainLMDB::for_all_outputs(std::function<bool(uint64_t amount, const c
} }
// batch_num_blocks: (optional) Used to check if resize needed before batch transaction starts. // batch_num_blocks: (optional) Used to check if resize needed before batch transaction starts.
void BlockchainLMDB::batch_start(uint64_t batch_num_blocks) bool BlockchainLMDB::batch_start(uint64_t batch_num_blocks)
{ {
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
if (! m_batch_transactions) if (! m_batch_transactions)
throw0(DB_ERROR("batch transactions not enabled")); throw0(DB_ERROR("batch transactions not enabled"));
if (m_batch_active) if (m_batch_active)
throw0(DB_ERROR("batch transaction already in progress")); return false;
if (m_write_batch_txn != nullptr) if (m_write_batch_txn != nullptr)
throw0(DB_ERROR("batch transaction already in progress")); return false;
if (m_write_txn) if (m_write_txn)
throw0(DB_ERROR("batch transaction attempted, but m_write_txn already in use")); throw0(DB_ERROR("batch transaction attempted, but m_write_txn already in use"));
check_open(); check_open();
@ -2276,6 +2288,7 @@ void BlockchainLMDB::batch_start(uint64_t batch_num_blocks)
memset(&m_wcursors, 0, sizeof(m_wcursors)); memset(&m_wcursors, 0, sizeof(m_wcursors));
LOG_PRINT_L3("batch transaction: begin"); LOG_PRINT_L3("batch transaction: begin");
return true;
} }
void BlockchainLMDB::batch_commit() void BlockchainLMDB::batch_commit()
@ -2287,6 +2300,9 @@ void BlockchainLMDB::batch_commit()
throw0(DB_ERROR("batch transaction not in progress")); throw0(DB_ERROR("batch transaction not in progress"));
if (m_write_batch_txn == nullptr) if (m_write_batch_txn == nullptr)
throw0(DB_ERROR("batch transaction not in progress")); throw0(DB_ERROR("batch transaction not in progress"));
if (m_writer != boost::this_thread::get_id())
return; // batch txn owned by other thread
check_open(); check_open();
LOG_PRINT_L3("batch transaction: committing..."); LOG_PRINT_L3("batch transaction: committing...");
@ -2311,6 +2327,8 @@ void BlockchainLMDB::batch_stop()
throw0(DB_ERROR("batch transaction not in progress")); throw0(DB_ERROR("batch transaction not in progress"));
if (m_write_batch_txn == nullptr) if (m_write_batch_txn == nullptr)
throw0(DB_ERROR("batch transaction not in progress")); throw0(DB_ERROR("batch transaction not in progress"));
if (m_writer != boost::this_thread::get_id())
return; // batch txn owned by other thread
check_open(); check_open();
LOG_PRINT_L3("batch transaction: committing..."); LOG_PRINT_L3("batch transaction: committing...");
TIME_MEASURE_START(time1); TIME_MEASURE_START(time1);
@ -2333,6 +2351,8 @@ void BlockchainLMDB::batch_abort()
throw0(DB_ERROR("batch transactions not enabled")); throw0(DB_ERROR("batch transactions not enabled"));
if (! m_batch_active) if (! m_batch_active)
throw0(DB_ERROR("batch transaction not in progress")); throw0(DB_ERROR("batch transaction not in progress"));
if (m_writer != boost::this_thread::get_id())
return; // batch txn owned by other thread
check_open(); check_open();
// for destruction of batch transaction // for destruction of batch transaction
m_write_txn = nullptr; m_write_txn = nullptr;
@ -2505,6 +2525,7 @@ uint64_t BlockchainLMDB::add_block(const block& blk, const size_t& block_size, c
{ {
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open(); check_open();
uint64_t m_height = height();
if (m_height % 1000 == 0) if (m_height % 1000 == 0)
{ {
@ -2558,8 +2579,6 @@ void BlockchainLMDB::pop_block(block& blk, std::vector<transaction>& txs)
block_txn_abort(); block_txn_abort();
throw; throw;
} }
--m_height;
} }
void BlockchainLMDB::get_output_tx_and_index_from_global(const std::vector<uint64_t> &global_indices, void BlockchainLMDB::get_output_tx_and_index_from_global(const std::vector<uint64_t> &global_indices,
@ -2850,7 +2869,7 @@ void BlockchainLMDB::fixup()
void BlockchainLMDB::migrate_0_1() void BlockchainLMDB::migrate_0_1()
{ {
LOG_PRINT_L3("BlockchainLMDB::" << __func__); LOG_PRINT_L3("BlockchainLMDB::" << __func__);
uint64_t i, z; uint64_t i, z, m_height;
int result; int result;
mdb_txn_safe txn(false); mdb_txn_safe txn(false);
MDB_val k, v; MDB_val k, v;
@ -2859,17 +2878,22 @@ void BlockchainLMDB::migrate_0_1()
LOG_PRINT_YELLOW("Migrating blockchain from DB version 0 to 1 - this may take a while:", LOG_LEVEL_0); LOG_PRINT_YELLOW("Migrating blockchain from DB version 0 to 1 - this may take a while:", LOG_LEVEL_0);
LOG_PRINT_L0("updating blocks, hf_versions, outputs, txs, and spent_keys tables..."); LOG_PRINT_L0("updating blocks, hf_versions, outputs, txs, and spent_keys tables...");
LOG_PRINT_L0("Total number of blocks: " << m_height);
LOG_PRINT_L1("block migration will update block_heights, block_info, and hf_versions...");
do { do {
result = mdb_txn_begin(m_env, NULL, 0, txn);
if (result)
throw0(DB_ERROR(lmdb_error("Failed to create a transaction for the db: ", result).c_str()));
MDB_stat db_stats;
if ((result = mdb_stat(txn, m_blocks, &db_stats)))
throw0(DB_ERROR(lmdb_error("Failed to query m_blocks: ", result).c_str()));
m_height = db_stats.ms_entries;
LOG_PRINT_L0("Total number of blocks: " << m_height);
LOG_PRINT_L1("block migration will update block_heights, block_info, and hf_versions...");
LOG_PRINT_L1("migrating block_heights:"); LOG_PRINT_L1("migrating block_heights:");
MDB_dbi o_heights; MDB_dbi o_heights;
unsigned int flags; unsigned int flags;
result = mdb_txn_begin(m_env, NULL, 0, txn);
if (result)
throw0(DB_ERROR(lmdb_error("Failed to create a transaction for the db: ", result).c_str()));
result = mdb_dbi_flags(txn, m_block_heights, &flags); result = mdb_dbi_flags(txn, m_block_heights, &flags);
if (result) if (result)
throw0(DB_ERROR(lmdb_error("Failed to retrieve block_heights flags: ", result).c_str())); throw0(DB_ERROR(lmdb_error("Failed to retrieve block_heights flags: ", result).c_str()));

View File

@ -247,7 +247,7 @@ public:
); );
virtual void set_batch_transactions(bool batch_transactions); virtual void set_batch_transactions(bool batch_transactions);
virtual void batch_start(uint64_t batch_num_blocks=0); virtual bool batch_start(uint64_t batch_num_blocks=0);
virtual void batch_commit(); virtual void batch_commit();
virtual void batch_stop(); virtual void batch_stop();
virtual void batch_abort(); virtual void batch_abort();
@ -369,7 +369,6 @@ private:
MDB_dbi m_properties; MDB_dbi m_properties;
uint64_t m_height;
uint64_t m_num_txs; uint64_t m_num_txs;
uint64_t m_num_outputs; uint64_t m_num_outputs;
mutable uint64_t m_cum_size; // used in batch size estimation mutable uint64_t m_cum_size; // used in batch size estimation

View File

@ -119,9 +119,9 @@ struct fake_core_db
return m_storage.get_db().add_block(blk, block_size, cumulative_difficulty, coins_generated, txs); return m_storage.get_db().add_block(blk, block_size, cumulative_difficulty, coins_generated, txs);
} }
void batch_start(uint64_t batch_num_blocks = 0) bool batch_start(uint64_t batch_num_blocks = 0)
{ {
m_storage.get_db().batch_start(batch_num_blocks); return m_storage.get_db().batch_start(batch_num_blocks);
} }
void batch_stop() void batch_stop()

View File

@ -3381,9 +3381,10 @@ bool Blockchain::add_new_block(const block& bl_, block_verification_context& bvc
void Blockchain::check_against_checkpoints(const checkpoints& points, bool enforce) void Blockchain::check_against_checkpoints(const checkpoints& points, bool enforce)
{ {
const auto& pts = points.get_points(); const auto& pts = points.get_points();
bool stop_batch;
CRITICAL_REGION_LOCAL(m_blockchain_lock); CRITICAL_REGION_LOCAL(m_blockchain_lock);
m_db->batch_start(); stop_batch = m_db->batch_start();
for (const auto& pt : pts) for (const auto& pt : pts)
{ {
// if the checkpoint is for a block we don't have yet, move on // if the checkpoint is for a block we don't have yet, move on
@ -3407,7 +3408,8 @@ void Blockchain::check_against_checkpoints(const checkpoints& points, bool enfor
} }
} }
} }
m_db->batch_stop(); if (stop_batch)
m_db->batch_stop();
} }
//------------------------------------------------------------------ //------------------------------------------------------------------
// returns false if any of the checkpoints loading returns false. // returns false if any of the checkpoints loading returns false.
@ -3481,6 +3483,7 @@ bool Blockchain::cleanup_handle_incoming_blocks(bool force_sync)
CRITICAL_REGION_LOCAL(m_blockchain_lock); CRITICAL_REGION_LOCAL(m_blockchain_lock);
TIME_MEASURE_START(t1); TIME_MEASURE_START(t1);
m_db->batch_stop();
if (m_sync_counter > 0) if (m_sync_counter > 0)
{ {
if (force_sync) if (force_sync)
@ -3545,11 +3548,18 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
{ {
LOG_PRINT_YELLOW("Blockchain::" << __func__, LOG_LEVEL_3); LOG_PRINT_YELLOW("Blockchain::" << __func__, LOG_LEVEL_3);
TIME_MEASURE_START(prepare); TIME_MEASURE_START(prepare);
bool stop_batch;
CRITICAL_REGION_LOCAL(m_blockchain_lock); CRITICAL_REGION_LOCAL(m_blockchain_lock);
if(blocks_entry.size() == 0) if(blocks_entry.size() == 0)
return false; return false;
while (!(stop_batch = m_db->batch_start(blocks_entry.size()))) {
m_blockchain_lock.unlock();
epee::misc_utils::sleep_no_w(1000);
m_blockchain_lock.lock();
}
if ((m_db->height() + blocks_entry.size()) < m_blocks_hash_check.size()) if ((m_db->height() + blocks_entry.size()) < m_blocks_hash_check.size())
return true; return true;

View File

@ -312,9 +312,9 @@ namespace cryptonote
LOG_PRINT_L0("Loading blockchain from folder " << folder.string() << " ..."); LOG_PRINT_L0("Loading blockchain from folder " << folder.string() << " ...");
const std::string filename = folder.string(); const std::string filename = folder.string();
// temporarily default to fastest:async:1000 // default to fast:async:1
blockchain_db_sync_mode sync_mode = db_async; blockchain_db_sync_mode sync_mode = db_async;
uint64_t blocks_per_sync = 1000; uint64_t blocks_per_sync = 1;
try try
{ {
@ -327,12 +327,12 @@ namespace cryptonote
for(const auto &option : options) for(const auto &option : options)
LOG_PRINT_L0("option: " << option); LOG_PRINT_L0("option: " << option);
// default to fast:async:1000 // default to fast:async:1
uint64_t DEFAULT_FLAGS = DBS_FAST_MODE; uint64_t DEFAULT_FLAGS = DBS_FAST_MODE;
if(options.size() == 0) if(options.size() == 0)
{ {
// temporarily default to fastest:async:1000 // default to fast:async:1
db_flags = DEFAULT_FLAGS; db_flags = DEFAULT_FLAGS;
} }
@ -348,7 +348,10 @@ namespace cryptonote
else if(options[0] == "fast") else if(options[0] == "fast")
db_flags = DBS_FAST_MODE; db_flags = DBS_FAST_MODE;
else if(options[0] == "fastest") else if(options[0] == "fastest")
{
db_flags = DBS_FASTEST_MODE; db_flags = DBS_FASTEST_MODE;
blocks_per_sync = 1000; // default to fastest:async:1000
}
else else
db_flags = DEFAULT_FLAGS; db_flags = DEFAULT_FLAGS;
} }

View File

@ -51,7 +51,7 @@ public:
virtual std::string get_db_name() const { return std::string(); } virtual std::string get_db_name() const { return std::string(); }
virtual bool lock() { return true; } virtual bool lock() { return true; }
virtual void unlock() { } virtual void unlock() { }
virtual void batch_start(uint64_t batch_num_blocks=0) {} virtual bool batch_start(uint64_t batch_num_blocks=0) {}
virtual void batch_stop() {} virtual void batch_stop() {}
virtual void set_batch_transactions(bool) {} virtual void set_batch_transactions(bool) {}
virtual void block_txn_start(bool readonly=false) {} virtual void block_txn_start(bool readonly=false) {}