2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-09-01 23:25:38 +00:00

Clarify reference counting in QP databases

Change the names of the node reference counting functions
and add comments to make the mechanism easier to understand:

- newref() and decref() are now called qpcnode_acquire()/
  qpznode_acquire() and qpcnode_release()/qpznode_release()
  respectively; this reflects the fact that they modify both
  the internal and external reference counters for a node.

- qpcnode_newref() and qpznode_newref() are now called
  qpcnode_erefs_increment() and qpznode_erefs_increment(), and
  qpcnode_decref() and qpznode_decref() are now called
  qpcnode_erefs_decrement() and qpznode_erefs_decrement(),
  to reflect that they only increase and decrease the node's
  external reference counters, not internal.
This commit is contained in:
Evan Hunt
2025-01-30 14:42:57 -08:00
parent 431513d8b3
commit d4f791793e
2 changed files with 238 additions and 126 deletions

View File

@@ -171,6 +171,28 @@ struct qpcnode {
uint16_t locknum;
/*
* 'erefs' counts external references held by a caller: for
* example, it could be incremented by dns_db_findnode(),
* and decremented by dns_db_detachnode().
*
* 'references' counts internal references to the node object,
* including the one held by the QP trie so the node won't be
* deleted while it's quiescently stored in the database - even
* though 'erefs' may be zero because no external caller is
* using it at the time.
*
* Generally when 'erefs' is incremented or decremented,
* 'references' is too. When both go to zero (meaning callers
* and the database have both released the object) the object
* is freed.
*
* Whenever 'erefs' is incremented from zero, we also aquire a
* node use reference (see 'qpcache->references' below), and
* release it when 'erefs' goes back to zero. This prevents the
* database from being shut down until every caller has released
* all nodes.
*/
isc_refcount_t references;
isc_refcount_t erefs;
void *data;
@@ -204,15 +226,31 @@ struct qpcache {
isc_rwlock_t lock;
/* Locks the tree structure (prevents nodes appearing/disappearing) */
isc_rwlock_t tree_lock;
/*
* NOTE: 'references' is NOT the global reference counter for
* the database object handled by dns_db_attach() and _detach();
* that one is 'common.references'.
*
* Instead, 'references' counts the number of nodes being used by
* at least one external caller. (It's called 'references' to
* leverage the ISC_REFCOUNT_STATIC macros, but 'nodes_in_use'
* might be a clearer name.)
*
* One additional reference to this counter is held by the database
* object itself. When 'common.references' goes to zero, that
* reference is released. When in turn 'references' goes to zero,
* the database is shut down and freed.
*/
isc_refcount_t references;
/* Locks for individual tree nodes */
unsigned int node_lock_count;
dns_stats_t *rrsetstats; /* cache DB only */
isc_stats_t *cachestats; /* cache DB only */
isc_stats_t *gluecachestats; /* zone DB only */
isc_refcount_t references;
isc_rwlock_t *node_locks;
dns_stats_t *rrsetstats;
isc_stats_t *cachestats;
uint32_t maxrrperset; /* Maximum RRs per RRset */
uint32_t maxtypepername; /* Maximum number of RR types per owner */
@@ -624,10 +662,18 @@ delete_node(qpcache_t *qpdb, qpcnode_t *node) {
* It's okay for neither lock to be held if there are existing external
* references to the node, but if this is the first external reference,
* then the caller must be holding at least one lock.
*
* If incrementing erefs from zero, we also increment the node use counter
* in the qpcache object.
*
* This function is called from qpcnode_acquire(), so that internal
* and external references are acquired at the same time, and from
* qpcnode_release() when we only need to increase the internal references.
*/
static void
qpcnode_newref(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t nlocktype,
isc_rwlocktype_t tlocktype DNS__DB_FLARG) {
qpcnode_erefs_increment(qpcache_t *qpdb, qpcnode_t *node,
isc_rwlocktype_t nlocktype,
isc_rwlocktype_t tlocktype DNS__DB_FLARG) {
uint_fast32_t refs = isc_refcount_increment0(&node->erefs);
#if DNS_DB_NODETRACE
@@ -654,17 +700,23 @@ qpcnode_newref(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t nlocktype,
}
static void
newref(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t nlocktype,
isc_rwlocktype_t tlocktype DNS__DB_FLARG) {
qpcnode_acquire(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t nlocktype,
isc_rwlocktype_t tlocktype DNS__DB_FLARG) {
qpcnode_ref(node);
qpcnode_newref(qpdb, node, nlocktype, tlocktype DNS__DB_FLARG_PASS);
qpcnode_erefs_increment(qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
}
static void
cleanup_deadnodes(void *arg);
/*
* Decrement the external references to a node. If the counter
* goes to zero, decrement the node use counter in the qpcache object
* as well, and return true. Otherwise return false.
*/
static bool
qpcnode_decref(qpcache_t *qpdb, qpcnode_t *node DNS__DB_FLARG) {
qpcnode_erefs_decrement(qpcache_t *qpdb, qpcnode_t *node DNS__DB_FLARG) {
uint_fast32_t refs = isc_refcount_decrement(&node->erefs);
#if DNS_DB_NODETRACE
@@ -676,37 +728,32 @@ qpcnode_decref(qpcache_t *qpdb, qpcnode_t *node DNS__DB_FLARG) {
}
qpcache_unref(qpdb);
return true;
}
/*
* Caller must be holding the node lock; either the read or write lock.
* Caller must be holding a node lock, either read or write.
*
* Note that the lock must be held even when node references are
* atomically modified; in that case the decrement operation itself does not
* have to be protected, but we must avoid a race condition where multiple
* threads are decreasing the reference to zero simultaneously and at least
* one of them is going to free the node.
*
* This decrements both the internal and external node reference counters.
* If the external reference count drops to zero, then the node lock
* reference count is also decremented.
*
* This function returns true if and only if the node reference decreases
* to zero. (NOTE: Decrementing the reference count of a node to zero does
* not mean it will be immediately freed.)
* This calls dec_erefs() to decrement the external node reference counter,
* (and possibly the node use counter), cleans up and deletes the node
* if necessary, then decrements the internal reference counter as well.
*/
static void
decref(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t *nlocktypep,
isc_rwlocktype_t *tlocktypep, bool tryupgrade DNS__DB_FLARG) {
qpcnode_release(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t *nlocktypep,
isc_rwlocktype_t *tlocktypep, bool tryupgrade DNS__DB_FLARG) {
REQUIRE(*nlocktypep != isc_rwlocktype_none);
isc_result_t result;
bool locked = *tlocktypep != isc_rwlocktype_none;
bool write_locked = false;
isc_rwlock_t *nlock = &qpdb->node_locks[node->locknum];
REQUIRE(*nlocktypep != isc_rwlocktype_none);
if (!qpcnode_decref(qpdb, node DNS__DB_FLARG_PASS)) {
if (!qpcnode_erefs_decrement(qpdb, node DNS__DB_FLARG_PASS)) {
goto unref;
}
@@ -715,20 +762,23 @@ decref(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t *nlocktypep,
goto unref;
}
/*
* Node lock ref has decremented to 0 and we may need to clean up the
* node. To clean it up, the node ref needs to decrement to 0 under the
* node write lock, so we regain the ref and try again.
*/
qpcnode_newref(qpdb, node, *nlocktypep, *tlocktypep DNS__DB_FLARG_PASS);
/* Upgrade the lock? */
if (*nlocktypep == isc_rwlocktype_read) {
/*
* The external reference count went to zero and the node
* is dirty or has no data, so we might want to delete it.
* To do that, we'll need a write lock. If we don't already
* have one, we have to make sure nobody else has
* acquired a reference in the meantime, so we increment
* erefs (but NOT references!), upgrade the node lock,
* decrement erefs again, and see if it's still zero.
*/
isc_rwlock_t *nlock = &qpdb->node_locks[node->locknum];
qpcnode_erefs_increment(qpdb, node, *nlocktypep,
*tlocktypep DNS__DB_FLARG_PASS);
NODE_FORCEUPGRADE(nlock, nlocktypep);
}
if (!qpcnode_decref(qpdb, node DNS__DB_FLARG_PASS)) {
goto unref;
if (!qpcnode_erefs_decrement(qpdb, node DNS__DB_FLARG_PASS)) {
goto unref;
}
}
if (node->dirty) {
@@ -778,7 +828,8 @@ decref(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t *nlocktypep,
*/
delete_node(qpdb, node);
} else {
newref(qpdb, node, *nlocktypep, *tlocktypep DNS__DB_FLARG_PASS);
qpcnode_acquire(qpdb, node, *nlocktypep,
*tlocktypep DNS__DB_FLARG_PASS);
isc_queue_node_init(&node->deadlink);
if (!isc_queue_enqueue_entry(&qpdb->deadnodes[node->locknum],
@@ -794,7 +845,7 @@ decref(qpcache_t *qpdb, qpcnode_t *node, isc_rwlocktype_t *nlocktypep,
restore_locks:
/*
* Relock a read lock, or unlock the write lock if no lock was held.
* Unlock the tree lock if it wasn't held previously.
*/
if (!locked && write_locked) {
TREE_UNLOCK(&qpdb->tree_lock, tlocktypep);
@@ -918,12 +969,12 @@ expireheader(dns_slabheader_t *header, isc_rwlocktype_t *nlocktypep,
/*
* If no one else is using the node, we can clean it up now.
* We first need to gain a new reference to the node to meet a
* requirement of decref().
* requirement of qpcnode_release().
*/
newref(qpdb, HEADERNODE(header), *nlocktypep,
*tlocktypep DNS__DB_FLARG_PASS);
decref(qpdb, HEADERNODE(header), nlocktypep, tlocktypep,
true DNS__DB_FLARG_PASS);
qpcnode_acquire(qpdb, HEADERNODE(header), *nlocktypep,
*tlocktypep DNS__DB_FLARG_PASS);
qpcnode_release(qpdb, HEADERNODE(header), nlocktypep,
tlocktypep, true DNS__DB_FLARG_PASS);
if (qpdb->cachestats == NULL) {
return;
@@ -990,7 +1041,7 @@ bindrdataset(qpcache_t *qpdb, qpcnode_t *node, dns_slabheader_t *header,
return;
}
newref(qpdb, node, nlocktype, tlocktype DNS__DB_FLARG_PASS);
qpcnode_acquire(qpdb, node, nlocktype, tlocktype DNS__DB_FLARG_PASS);
INSIST(rdataset->methods == NULL); /* We must be disassociated. */
@@ -1205,7 +1256,7 @@ check_stale_header(qpcnode_t *node, dns_slabheader_t *header,
/*
* header->down can be non-NULL if the
* refcount has just decremented to 0
* but decref() has not
* but qpcnode_release() has not
* performed clean_cache_node(), in
* which case we need to purge the stale
* headers first.
@@ -1277,8 +1328,8 @@ check_zonecut(qpcnode_t *node, void *arg DNS__DB_FLARG) {
* We increment the reference count on node to ensure that
* search->zonecut_header will still be valid later.
*/
newref(search->qpdb, node, nlocktype,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
qpcnode_acquire(search->qpdb, node, nlocktype,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
search->zonecut = node;
search->zonecut_header = dname_header;
search->zonecut_sigheader = sigdname_header;
@@ -1363,8 +1414,9 @@ find_deepest_zonecut(qpc_search_t *search, qpcnode_t *node,
}
result = DNS_R_DELEGATION;
if (nodep != NULL) {
newref(search->qpdb, node, nlocktype,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
qpcnode_acquire(
search->qpdb, node, nlocktype,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
*nodep = (dns_dbnode_t *)node;
}
bindrdataset(search->qpdb, node, found, search->now,
@@ -1499,8 +1551,8 @@ find_coveringnsec(qpc_search_t *search, const dns_name_t *name,
nlocktype, isc_rwlocktype_none,
sigrdataset DNS__DB_FLARG_PASS);
}
newref(search->qpdb, node, nlocktype,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
qpcnode_acquire(search->qpdb, node, nlocktype,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
dns_name_copy(fname, foundname);
@@ -1794,8 +1846,8 @@ qpcache_find(dns_db_t *db, const dns_name_t *name, dns_dbversion_t *version,
nsecheader != NULL)
{
if (nodep != NULL) {
newref(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
qpcnode_acquire(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
*nodep = (dns_dbnode_t *)node;
}
bindrdataset(search.qpdb, node, nsecheader, search.now,
@@ -1838,8 +1890,8 @@ qpcache_find(dns_db_t *db, const dns_name_t *name, dns_dbversion_t *version,
*/
if (nsheader != NULL) {
if (nodep != NULL) {
newref(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
qpcnode_acquire(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
*nodep = (dns_dbnode_t *)node;
}
bindrdataset(search.qpdb, node, nsheader, search.now,
@@ -1872,8 +1924,8 @@ qpcache_find(dns_db_t *db, const dns_name_t *name, dns_dbversion_t *version,
*/
if (nodep != NULL) {
newref(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
qpcnode_acquire(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
*nodep = (dns_dbnode_t *)node;
}
@@ -1949,8 +2001,8 @@ tree_exit:
nlock = &search.qpdb->node_locks[node->locknum];
NODE_RDLOCK(nlock, &nlocktype);
decref(search.qpdb, node, &nlocktype, &tlocktype,
true DNS__DB_FLARG_PASS);
qpcnode_release(search.qpdb, node, &nlocktype, &tlocktype,
true DNS__DB_FLARG_PASS);
NODE_UNLOCK(nlock, &nlocktype);
INSIST(tlocktype == isc_rwlocktype_none);
}
@@ -2093,8 +2145,8 @@ qpcache_findzonecut(dns_db_t *db, const dns_name_t *name, unsigned int options,
}
if (nodep != NULL) {
newref(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
qpcnode_acquire(search.qpdb, node, nlocktype,
tlocktype DNS__DB_FLARG_PASS);
*nodep = (dns_dbnode_t *)node;
}
@@ -2520,9 +2572,6 @@ qpcache__destroy(qpcache_t *qpdb) {
if (qpdb->cachestats != NULL) {
isc_stats_detach(&qpdb->cachestats);
}
if (qpdb->gluecachestats != NULL) {
isc_stats_detach(&qpdb->gluecachestats);
}
isc_mem_cput(qpdb->common.mctx, qpdb->node_locks, qpdb->node_lock_count,
sizeof(qpdb->node_locks[0]));
@@ -2577,7 +2626,7 @@ cleanup_deadnodes(void *arg) {
RUNTIME_CHECK(isc_queue_splice(&deadnodes, &qpdb->deadnodes[locknum]));
isc_queue_for_each_entry_safe(&deadnodes, qpnode, qpnext, deadlink) {
decref(qpdb, qpnode, &nlocktype, &tlocktype, false);
qpcnode_release(qpdb, qpnode, &nlocktype, &tlocktype, false);
}
NODE_UNLOCK(nlock, &nlocktype);
@@ -2600,7 +2649,7 @@ reactivate_node(qpcache_t *qpdb, qpcnode_t *node,
isc_rwlock_t *nlock = &qpdb->node_locks[node->locknum];
NODE_RDLOCK(nlock, &nlocktype);
newref(qpdb, node, nlocktype, tlocktype DNS__DB_FLARG_PASS);
qpcnode_acquire(qpdb, node, nlocktype, tlocktype DNS__DB_FLARG_PASS);
NODE_UNLOCK(nlock, &nlocktype);
}
@@ -2670,8 +2719,8 @@ qpcache_attachnode(dns_db_t *db, dns_dbnode_t *source,
qpcache_t *qpdb = (qpcache_t *)db;
qpcnode_t *node = (qpcnode_t *)source;
newref(qpdb, node, isc_rwlocktype_none,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
qpcnode_acquire(qpdb, node, isc_rwlocktype_none,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
*targetp = source;
}
@@ -2692,13 +2741,15 @@ qpcache_detachnode(dns_db_t *db, dns_dbnode_t **nodep DNS__DB_FLARG) {
nlock = &qpdb->node_locks[node->locknum];
/*
* We can't destroy qpcache under nodelock, so we need
* to reference it before acquiring the lock.
* We can't destroy qpcache while holding a nodelock, so
* we need to reference it before acquiring the lock
* and release it afterward.
*/
qpcache_ref(qpdb);
NODE_RDLOCK(nlock, &nlocktype);
decref(qpdb, node, &nlocktype, &tlocktype, true DNS__DB_FLARG_PASS);
qpcnode_release(qpdb, node, &nlocktype, &tlocktype,
true DNS__DB_FLARG_PASS);
NODE_UNLOCK(nlock, &nlocktype);
qpcache_detach(&qpdb);
@@ -2754,8 +2805,8 @@ qpcache_allrdatasets(dns_db_t *db, dns_dbnode_t *node, dns_dbversion_t *version,
iterator->common.now = now;
iterator->current = NULL;
newref(qpdb, qpnode, isc_rwlocktype_none,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
qpcnode_acquire(qpdb, qpnode, isc_rwlocktype_none,
isc_rwlocktype_none DNS__DB_FLARG_PASS);
*iteratorp = (dns_rdatasetiter_t *)iterator;
@@ -3896,8 +3947,8 @@ dereference_iter_node(qpc_dbit_t *qpdbiter DNS__DB_FLARG) {
nlock = &qpdb->node_locks[node->locknum];
NODE_RDLOCK(nlock, &nlocktype);
decref(qpdb, node, &nlocktype, &qpdbiter->tree_locked,
false DNS__DB_FLARG_PASS);
qpcnode_release(qpdb, node, &nlocktype, &qpdbiter->tree_locked,
false DNS__DB_FLARG_PASS);
NODE_UNLOCK(nlock, &nlocktype);
INSIST(qpdbiter->tree_locked == tlocktype);
@@ -4151,8 +4202,8 @@ dbiterator_current(dns_dbiterator_t *iterator, dns_dbnode_t **nodep,
dns_name_copy(&node->name, name);
}
newref(qpdb, node, isc_rwlocktype_none,
qpdbiter->tree_locked DNS__DB_FLARG_PASS);
qpcnode_acquire(qpdb, node, isc_rwlocktype_none,
qpdbiter->tree_locked DNS__DB_FLARG_PASS);
*nodep = (dns_dbnode_t *)qpdbiter->node;
return ISC_R_SUCCESS;