gfs2.lists.linux.dev archive mirror
 help / color / mirror / Atom feed
* [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup
@ 2024-04-10 13:48 Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
                   ` (8 more replies)
  0 siblings, 9 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Hi,

this is a RFC for an immense change in DLM rsb hashtable logic. It
removes the double lookup functionality for rsb hashtables, convert to
rhashtable instead of own bucket hlist hash implementation.

At first there is a fix for scand that I detected while I was
implementing this patch series. It could be that remove messages are
still send when the lockspace is releasing the resource that could occur
into a use after free.

There is a conversion to use lists (keep/toss lists) instead of
iterating over the hash bucket. As we do a transition to rhashtable,
they don't like to being iterated regarding to their own bucket sizing
implementation that is sitting in the rhashtable implementation. We just
introduce the lists to do the iteration, as advantage we have a huge
reduce of code in the debugfs dump functionality as we use the dump list
helpers of debugfs. There is also a potential refcount bug when holding
rsb references of rsbs in toss state as receive remove message requires
no rsb references being hold. Another issue is also holding rsb in keep
state as they are not going into toss state when they required to.

It is now forbidden to hold references of rsbs in toss state. The
refcounter must be only functional in rsb keep state. That hopefully
will show is more invalid usage of the rsb refcounter if the rsb is in
toss state.

The scand was being fixed but now also it's removed. The scand process
was holding the hashtable/hash bucket lock for a longer timer because it
iterated over the whole hash. We use timers now to reduce the held time
of the hashtable lock. We take here the advantage of the timers
data structure that the rsb timer will delete the rsb itself from the
hash. No lookup required as this is part of the timer subsystem.

As end we move very likely lookup hotpath to read lock mostly. This
should for sure avoid contention in the most cases. Unlikely path need
to still hold the write lock and do some extra relookup and check if the
state of an rsb changed. However I think we hit over 90% the likely path
that we only need to hold the read lock and avoid contention between
processing DLM messages and the user triggers new DLM requests.

- Alex

Alexander Aring (9):
  dlm: increment ls_count on find_ls_to_scan()
  dlm: change to non per bucket hashtable lock
  dlm: merge toss and keep hash into one
  dlm: fix avoid rsb hold during debugfs dump
  dlm: switch to use rhashtable for rsbs
  dlm: remove refcounting if rsb is on toss
  dlm: drop scand kthread and use timers
  dlm: likely read lock path for rsb lookup
  dlm: convert lkbidr to rwlock

 fs/dlm/config.c       |   8 +
 fs/dlm/config.h       |   2 +
 fs/dlm/debug_fs.c     | 212 ++----------
 fs/dlm/dir.c          |  14 +-
 fs/dlm/dlm_internal.h |  39 +--
 fs/dlm/lock.c         | 774 ++++++++++++++++++++++--------------------
 fs/dlm/lock.h         |   3 +-
 fs/dlm/lockspace.c    | 151 ++------
 fs/dlm/recover.c      |  38 ++-
 fs/dlm/recoverd.c     |  40 +--
 10 files changed, 541 insertions(+), 740 deletions(-)

-- 
2.43.0


^ permalink raw reply	[flat|nested] 12+ messages in thread

* [RFC dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan()
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 2/9] dlm: change to non per bucket hashtable lock Alexander Aring
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch increments the ls_count atomic value that is used as a
reference counter in find_ls_to_scan(). An additional
dlm_put_lockspace() is necessary in dlm_scand() to drop this reference
again.

This handling is necessary because release_lockspace() calls
remove_lockspace() that removes the lockspace from the global lockspace
list. It only does that when the lockspace is not in use anymore
signaled by the menitoned ls_count atomic value. Currently dlm_scand()
can be executed when release_lockspace() is freeing all lockspace
resources. As example:

1. scand timeout occurs
2. find_ls_to_scan() returns ls A - lslist_lock is not held anymore
3. dlm_scan_rsbs(A) does its job
4. release_lockspace(A, 2) is called by the user. User has no control of
   dlm_scan_rsbs(A)
5. lockspace_busy(A, 2) returns 0, it's not busy
6. dlm_scan_rsbs(A) iterates over the whole rsb hash bucket, sometimes
   triggers send_remove()
7. release_lockspace(A, 2) does not stop scand because ls_count was >= 2
8. remove_lockspace() will ignore ongoing scand because scand does not
   hold a reference as ls_count is intended to do so
9. release_lockspace(A, 2) cleans up everything while 6. can occur

Point 8 is a problem because both access the same resources and
release_lockspace() frees them.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lockspace.c | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index c3681a50decb..731c48371a27 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -255,6 +255,7 @@ static struct dlm_ls *find_ls_to_scan(void)
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (time_after_eq(jiffies, ls->ls_scan_time +
 					    dlm_config.ci_scan_secs * HZ)) {
+			atomic_inc(&ls->ls_count);
 			spin_unlock_bh(&lslist_lock);
 			return ls;
 		}
@@ -277,6 +278,8 @@ static int dlm_scand(void *data)
 			} else {
 				ls->ls_scan_time += HZ;
 			}
+
+			dlm_put_lockspace(ls);
 			continue;
 		}
 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 2/9] dlm: change to non per bucket hashtable lock
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 3/9] dlm: merge toss and keep hash into one Alexander Aring
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch prepares to switch to use the resizable, scalable, concurrent
hash table (rhashtable) implementation instead of having a own per
bucket implementation. The rhashtable implementation deals with the
buckets internally for that reason we use a per lockspace rhashtable
lock to protect mutual rhashtable access. This might reduce the
perfromance of the hash, depends on how rhashtable lookup performs to
the current own DLM hash bucket implementation. However we switch to the
rhashtable implementation and a potential lockless access can be part of
future changes.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     | 24 +++++++-------
 fs/dlm/dir.c          |  4 +--
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lock.c         | 77 +++++++++++++++++++++----------------------
 fs/dlm/lockspace.c    |  2 +-
 fs/dlm/recover.c      |  4 +--
 fs/dlm/recoverd.c     |  8 ++---
 7 files changed, 60 insertions(+), 61 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index cba5514688ee..b8234eba5e34 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -452,7 +452,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 
 	tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	if (!RB_EMPTY_ROOT(tree)) {
 		for (node = rb_first(tree); node; node = rb_next(node)) {
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
@@ -460,12 +460,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 				dlm_hold_rsb(r);
 				ri->rsb = r;
 				ri->bucket = bucket;
-				spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+				spin_unlock_bh(&ls->ls_rsbtbl_lock);
 				return ri;
 			}
 		}
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	/*
 	 * move to the first rsb in the next non-empty bucket
@@ -484,18 +484,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			node = rb_first(tree);
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	}
 }
 
@@ -516,7 +516,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 	 * move to the next rsb in the same bucket
 	 */
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	rp = ri->rsb;
 	next = rb_next(&rp->res_hashnode);
 
@@ -524,12 +524,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_put_rsb(rp);
 		++*pos;
 		return ri;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	dlm_put_rsb(rp);
 
 	/*
@@ -550,18 +550,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			next = rb_first(tree);
 			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	}
 }
 
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index ff3a51c759b5..5315f4f46cc7 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -204,12 +204,12 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	hash = jhash(name, len, 0);
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
 	if (rv)
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
 					 name, len, &r);
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (!rv)
 		return r;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 269c12e0824f..2c961db53b27 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -105,7 +105,6 @@ do { \
 struct dlm_rsbtable {
 	struct rb_root		keep;
 	struct rb_root		toss;
-	spinlock_t		lock;
 	unsigned long		flags;
 };
 
@@ -593,6 +592,7 @@ struct dlm_ls {
 	spinlock_t		ls_lkbidr_spin;
 
 	struct dlm_rsbtable	*ls_rsbtbl;
+	spinlock_t		ls_rsbtbl_lock;
 	uint32_t		ls_rsbtbl_size;
 
 	spinlock_t		ls_waiters_lock;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 4ff4ef2a5f87..af57d9d12434 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -369,13 +369,12 @@ static inline int dlm_kref_put_lock_bh(struct kref *kref,
 static void put_rsb(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
-	uint32_t bucket = r->res_bucket;
 	int rv;
 
 	rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
-				  &ls->ls_rsbtbl[bucket].lock);
+				  &ls->ls_rsbtbl_lock);
 	if (rv)
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -615,7 +614,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 			goto out;
 	}
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -685,7 +684,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
@@ -734,7 +733,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
  out_add:
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	*r_ret = r;
 	return error;
@@ -759,7 +758,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	if (error < 0)
 		goto out;
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -817,7 +816,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
@@ -832,7 +831,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	*r_ret = r;
 	return error;
@@ -1049,7 +1048,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error < 0)
 		return error;
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error) {
 		/* because the rsb is active, we need to lock_rsb before
@@ -1057,7 +1056,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		 */
 
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		lock_rsb(r);
 
 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1083,14 +1082,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 	r->res_toss_time = jiffies;
 	/* the rsb was inactive (on toss list) */
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	return 0;
 
  not_found:
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
@@ -1108,7 +1107,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 
@@ -1116,7 +1115,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
  out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	return error;
 }
 
@@ -1126,15 +1125,15 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 	struct dlm_rsb *r;
 	int i;
 
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (r->res_hash == hash)
 				dlm_dump_rsb(r);
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
@@ -1146,7 +1145,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error)
 		goto out_dump;
@@ -1157,7 +1156,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
  out_dump:
 	dlm_dump_rsb(r);
  out:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void toss_rsb(struct kref *kref)
@@ -1621,10 +1620,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -1681,7 +1680,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
 	else
 		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	/*
 	 * While searching for rsb's to free, we found some that require
@@ -1696,16 +1695,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		name = ls->ls_remove_names[i];
 		len = ls->ls_remove_lens[i];
 
-		spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 		if (rv) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name not toss %s", name);
 			continue;
 		}
 
 		if (r->res_master_nodeid != our_nodeid) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name master %d dir %d our %d %s",
 				  r->res_master_nodeid, r->res_dir_nodeid,
 				  our_nodeid, name);
@@ -1714,7 +1713,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (r->res_dir_nodeid == our_nodeid) {
 			/* should never happen */
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_error(ls, "remove_name dir %d master %d our %d %s",
 				  r->res_dir_nodeid, r->res_master_nodeid,
 				  our_nodeid, name);
@@ -1723,21 +1722,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (!time_after_eq(jiffies, r->res_toss_time +
 				   dlm_config.ci_toss_secs * HZ)) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name toss_time %lu now %lu %s",
 				  r->res_toss_time, jiffies, name);
 			continue;
 		}
 
 		if (!kref_put(&r->res_ref, kill_rsb)) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_error(ls, "remove_name in use %s", name);
 			continue;
 		}
 
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 		send_remove(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 		dlm_free_rsb(r);
 	}
@@ -4201,7 +4200,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 	if (rv) {
@@ -4211,7 +4210,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			/* should not happen */
 			log_error(ls, "receive_remove from %d not found %s",
 				  from_nodeid, name);
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			return;
 		}
 		if (r->res_master_nodeid != from_nodeid) {
@@ -4219,14 +4218,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			log_error(ls, "receive_remove keep from %d master %d",
 				  from_nodeid, r->res_master_nodeid);
 			dlm_print_rsb(r);
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			return;
 		}
 
 		log_debug(ls, "receive_remove from %d master %d first %x %s",
 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
 			  name);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -4234,19 +4233,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		log_error(ls, "receive_remove toss from %d master %d",
 			  from_nodeid, r->res_master_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
 	} else {
 		log_error(ls, "receive_remove from %d rsb ref error",
 			  from_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	}
 }
 
@@ -5314,7 +5313,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 	struct rb_node *n;
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 
@@ -5325,10 +5324,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 			continue;
 		}
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return r;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	return NULL;
 }
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 731c48371a27..d33dbcd5f4a1 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -495,6 +495,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	 */
 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
 
+	spin_lock_init(&ls->ls_rsbtbl_lock);
 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
 	ls->ls_rsbtbl_size = size;
 
@@ -504,7 +505,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	for (i = 0; i < size; i++) {
 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
-		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 13bc845fa305..9a4c8e4b2442 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -886,8 +886,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	unsigned int count = 0;
 	int i;
 
+	spin_lock(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 			next = rb_next(n);
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -895,8 +895,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
 			dlm_free_rsb(r);
 			count++;
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (count)
 		log_rinfo(ls, "dlm_clear_toss %u done", count);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index c82cc48988c6..fa6608363302 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -33,8 +33,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		goto out;
 	}
 
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (r->res_nodeid)
@@ -43,8 +43,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 			list_add(&r->res_masters_list, &ls->ls_masters_list);
 			dlm_hold_rsb(r);
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	write_unlock_bh(&ls->ls_masters_lock);
 	return error;
@@ -68,8 +68,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 	struct dlm_rsb *r;
 	int i;
 
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, root_list);
@@ -78,8 +78,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 
 		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
 			log_error(ls, "%s toss not empty", __func__);
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void dlm_release_root_list(struct list_head *root_list)
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 3/9] dlm: merge toss and keep hash into one
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 2/9] dlm: change to non per bucket hashtable lock Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump Alexander Aring
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

There are several places in current DLM that can end in two lookups,
e.g. first look into the keep hash, then into the toss hash if its not
in the keep hash. However there can't be never the same rsb into both of
these hashs. This patch introduce a new rsb state flag "RSB_TOSS" that
signals if the rsb would be on the toss hash or not, which then would be
on the keep hash. Instead of those multiple lookups there is only one
now that reduce the amount of hash lookups in cases where the rsb wasn't
on keep hash. The move of an rsb from rsb toss to keep hash or vice versa
need to be done automatically anyway.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |  29 ++++++++++--
 fs/dlm/dir.c          |   6 +--
 fs/dlm/dlm_internal.h |   4 +-
 fs/dlm/lock.c         | 103 ++++++++++++++++++++++--------------------
 fs/dlm/lockspace.c    |  13 ++----
 fs/dlm/recover.c      |   7 ++-
 fs/dlm/recoverd.c     |  12 ++---
 7 files changed, 98 insertions(+), 76 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index b8234eba5e34..37f4dfca5e44 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -450,12 +450,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 	if (seq->op == &format4_seq_ops)
 		ri->format = 4;
 
-	tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+	tree = &ls->ls_rsbtbl[bucket].r;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	if (!RB_EMPTY_ROOT(tree)) {
 		for (node = rb_first(tree); node; node = rb_next(node)) {
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
+			if (toss) {
+				if (!rsb_flag(r, RSB_TOSS))
+					continue;
+			} else {
+				if (rsb_flag(r, RSB_TOSS))
+					continue;
+			}
+
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -482,12 +490,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 			kfree(ri);
 			return NULL;
 		}
-		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+		tree = &ls->ls_rsbtbl[bucket].r;
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			node = rb_first(tree);
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
+			if (toss) {
+				if (!rsb_flag(r, RSB_TOSS))
+					continue;
+			} else {
+				if (rsb_flag(r, RSB_TOSS))
+					continue;
+			}
+
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -548,12 +564,19 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 			++*pos;
 			return NULL;
 		}
-		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+		tree = &ls->ls_rsbtbl[bucket].r;
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			next = rb_first(tree);
 			r = rb_entry(next, struct dlm_rsb, res_hashnode);
+			if (toss) {
+				if (!rsb_flag(r, RSB_TOSS))
+					continue;
+			} else {
+				if (rsb_flag(r, RSB_TOSS))
+					continue;
+			}
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 5315f4f46cc7..f8039f3ee2d1 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -205,12 +205,8 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
-	if (rv)
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
-					 name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].r, name, len, &r);
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
 	if (!rv)
 		return r;
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 2c961db53b27..af88fc2f978c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -103,8 +103,7 @@ do { \
 #define DLM_RTF_SHRINK_BIT	0
 
 struct dlm_rsbtable {
-	struct rb_root		keep;
-	struct rb_root		toss;
+	struct rb_root          r;
 	unsigned long		flags;
 };
 
@@ -376,6 +375,7 @@ enum rsb_flags {
 	RSB_RECOVER_CONVERT,
 	RSB_RECOVER_GRANT,
 	RSB_RECOVER_LVB_INVAL,
+	RSB_TOSS,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index af57d9d12434..08ec1a04476a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -616,23 +616,22 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (error)
-		goto do_toss;
+		goto do_new;
 	
 	/*
 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
 	 */
 
+	if (rsb_flag(r, RSB_TOSS))
+		goto do_toss;
+
 	kref_get(&r->res_ref);
 	goto out_unlock;
 
 
  do_toss:
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
-		goto do_new;
-
 	/*
 	 * rsb found inactive (master_nodeid may be out of date unless
 	 * we are the dir_nodeid or were the master)  No other thread
@@ -669,8 +668,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		r->res_first_lkid = 0;
 	}
 
-	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
 
@@ -731,7 +729,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	}
 
  out_add:
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -760,8 +758,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (error)
+		goto do_new;
+
+	if (rsb_flag(r, RSB_TOSS))
 		goto do_toss;
 
 	/*
@@ -773,10 +774,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 
  do_toss:
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
-		goto do_new;
-
 	/*
 	 * rsb found inactive. No other thread is using this rsb because
 	 * it's on the toss list, so we can look at or update
@@ -804,8 +801,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		r->res_nodeid = 0;
 	}
 
-	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
 
@@ -829,7 +825,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -1049,8 +1045,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		return error;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (!error) {
+		if (rsb_flag(r, RSB_TOSS))
+			goto do_toss;
+
 		/* because the rsb is active, we need to lock_rsb before
 		 * checking/changing re_master_nodeid
 		 */
@@ -1067,12 +1066,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		put_rsb(r);
 
 		return 0;
-	}
-
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
+	} else {
 		goto not_found;
+	}
 
+ do_toss:
 	/* because the rsb is inactive (on toss list), it's not refcounted
 	 * and lock_rsb is not used, but is protected by the rsbtbl lock
 	 */
@@ -1102,8 +1100,9 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_nodeid = from_nodeid;
 	kref_init(&r->res_ref);
 	r->res_toss_time = jiffies;
+	rsb_set_flag(r, RSB_TOSS);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
@@ -1127,8 +1126,11 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (rsb_flag(r, RSB_TOSS))
+				continue;
+
 			if (r->res_hash == hash)
 				dlm_dump_rsb(r);
 		}
@@ -1146,14 +1148,10 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (!error)
-		goto out_dump;
-
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
 		goto out;
- out_dump:
+
 	dlm_dump_rsb(r);
  out:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1166,8 +1164,8 @@ static void toss_rsb(struct kref *kref)
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
-	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
+	WARN_ON(rsb_flag(r, RSB_TOSS));
+	rsb_set_flag(r, RSB_TOSS);
 	r->res_toss_time = jiffies;
 	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
 	if (r->res_lvbptr) {
@@ -1627,9 +1625,11 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		return;
 	}
 
-	for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+	for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
 		next = rb_next(n);
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
+		if (!rsb_flag(r, RSB_TOSS))
+			continue;
 
 		/* If we're the directory record for this rsb, and
 		   we're not the master of it, then we need to wait
@@ -1672,7 +1672,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		dlm_free_rsb(r);
 	}
 
@@ -1696,8 +1696,14 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		len = ls->ls_remove_lens[i];
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 		if (rv) {
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
+			log_error(ls, "remove_name not found %s", name);
+			continue;
+		}
+
+		if (!rsb_flag(r, RSB_TOSS)) {
 			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name not toss %s", name);
 			continue;
@@ -1734,7 +1740,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
@@ -4202,17 +4208,16 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (rv) {
-		/* verify the rsb is on keep list per comment above */
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
-		if (rv) {
-			/* should not happen */
-			log_error(ls, "receive_remove from %d not found %s",
-				  from_nodeid, name);
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			return;
-		}
+		/* should not happen */
+		log_error(ls, "%s from %d not found %s", __func__,
+			  from_nodeid, name);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		return;
+	}
+
+	if (!rsb_flag(r, RSB_TOSS)) {
 		if (r->res_master_nodeid != from_nodeid) {
 			/* should not happen */
 			log_error(ls, "receive_remove keep from %d master %d",
@@ -4238,7 +4243,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
 	} else {
@@ -5314,8 +5319,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 	struct dlm_rsb *r;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
+	for (n = rb_first(&ls->ls_rsbtbl[bucket].r); n; n = rb_next(n)) {
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
+		if (rsb_flag(r, RSB_TOSS))
+			continue;
 
 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
 			continue;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index d33dbcd5f4a1..b5184ad550fa 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -503,8 +503,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		ls->ls_rsbtbl[i].keep.rb_node = NULL;
-		ls->ls_rsbtbl[i].toss.rb_node = NULL;
+		ls->ls_rsbtbl[i].r.rb_node = NULL;
 	}
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
@@ -837,15 +836,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+		while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			rb_erase(n, &ls->ls_rsbtbl[i].keep);
-			dlm_free_rsb(rsb);
-		}
-
-		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
-			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			rb_erase(n, &ls->ls_rsbtbl[i].toss);
+			rb_erase(n, &ls->ls_rsbtbl[i].r);
 			dlm_free_rsb(rsb);
 		}
 	}
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 9a4c8e4b2442..e53d88e4ec93 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -888,10 +888,13 @@ void dlm_clear_toss(struct dlm_ls *ls)
 
 	spin_lock(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = next) {
 			next = rb_next(n);
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			rb_erase(n, &ls->ls_rsbtbl[i].toss);
+			if (!rsb_flag(r, RSB_TOSS))
+				continue;
+
+			rb_erase(n, &ls->ls_rsbtbl[i].r);
 			dlm_free_rsb(r);
 			count++;
 		}
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index fa6608363302..ad696528ebe7 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -35,9 +35,9 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (r->res_nodeid)
+			if (rsb_flag(r, RSB_TOSS) || r->res_nodeid)
 				continue;
 
 			list_add(&r->res_masters_list, &ls->ls_masters_list);
@@ -70,14 +70,14 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (WARN_ON_ONCE(rsb_flag(r, RSB_TOSS)))
+				continue;
+
 			list_add(&r->res_root_list, root_list);
 			dlm_hold_rsb(r);
 		}
-
-		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
-			log_error(ls, "%s toss not empty", __func__);
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (2 preceding siblings ...)
  2024-04-10 13:48 ` [RFC dlm/next 3/9] dlm: merge toss and keep hash into one Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 5/9] dlm: switch to use rhashtable for rsbs Alexander Aring
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Currently the debugfs calls dlm_hold_rsb() while iterating the
ls_rsbtbl hash and release the ls_rsbtbl_lock during dumping. We can't
do that in every situation because some rsbs don't get into the toss
state when they should be on the toss state e.g. when receive_remove()
is being called. To avoid this problem we don't use the hash as
iteratable datastructure. Moving to rhashtable implementation it should
be avoided to use the rhashtable to iterate over it due dynamic rehash
background operations that results into a whole new iteration of the
rhashtable datastructure. Instead of iterating over the hash we
introduce two lists ls_toss and ls_keep as DLM cares about the RSB_TOSS
state if iterate over all rsbs. On ls_toss are only rsbs that are on
RSB_TOSS state and on ls_keep the rsbs they are not in RSB_TOSS state.
There are only few situation to bring a rsb into toss state and back
which will be just a list_move() between those datastructures protected
by the ls_rsbtbl_lock to keep in sync with the hashtable and RSB_TOSS flag.

The advantage in debugfs is that we can simple use the predefined
debugfs dump functionality for linux lists. Instead of calling
dlm_hold_rsb() while dumping rsbs, we hold the ls_rsbtbl_lock the whole
debugfs dump.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     | 233 +++++++-----------------------------------
 fs/dlm/dlm_internal.h |   4 +
 fs/dlm/lock.c         |  47 ++++-----
 fs/dlm/lockspace.c    |   3 +
 fs/dlm/recover.c      |  24 ++---
 fs/dlm/recoverd.c     |  34 +++---
 6 files changed, 84 insertions(+), 261 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 37f4dfca5e44..70567919f1b7 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -366,12 +366,10 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s)
 	unlock_rsb(r);
 }
 
-struct rsbtbl_iter {
-	struct dlm_rsb *rsb;
-	unsigned bucket;
-	int format;
-	int header;
-};
+static const struct seq_operations format1_seq_ops;
+static const struct seq_operations format2_seq_ops;
+static const struct seq_operations format3_seq_ops;
+static const struct seq_operations format4_seq_ops;
 
 /*
  * If the buffer is full, seq_printf can be called again, but it
@@ -382,220 +380,61 @@ struct rsbtbl_iter {
 
 static int table_seq_show(struct seq_file *seq, void *iter_ptr)
 {
-	struct rsbtbl_iter *ri = iter_ptr;
-
-	switch (ri->format) {
-	case 1:
-		print_format1(ri->rsb, seq);
-		break;
-	case 2:
-		if (ri->header) {
-			seq_puts(seq, "id nodeid remid pid xid exflags flags sts grmode rqmode time_ms r_nodeid r_len r_name\n");
-			ri->header = 0;
-		}
-		print_format2(ri->rsb, seq);
-		break;
-	case 3:
-		if (ri->header) {
-			seq_puts(seq, "rsb ptr nodeid first_lkid flags !root_list_empty !recover_list_empty recover_locks_count len\n");
-			ri->header = 0;
-		}
-		print_format3(ri->rsb, seq);
-		break;
-	case 4:
-		if (ri->header) {
-			seq_puts(seq, "rsb ptr nodeid master_nodeid dir_nodeid our_nodeid toss_time flags len str|hex name\n");
-			ri->header = 0;
-		}
-		print_format4(ri->rsb, seq);
-		break;
-	}
+	struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_rsbs_list);
+
+	if (seq->op == &format1_seq_ops)
+		print_format1(rsb, seq);
+	else if (seq->op == &format2_seq_ops)
+		print_format2(rsb, seq);
+	else if (seq->op == &format3_seq_ops)
+		print_format3(rsb, seq);
+	else if (seq->op == &format4_seq_ops)
+		print_format4(rsb, seq);
 
 	return 0;
 }
 
-static const struct seq_operations format1_seq_ops;
-static const struct seq_operations format2_seq_ops;
-static const struct seq_operations format3_seq_ops;
-static const struct seq_operations format4_seq_ops;
-
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
-	struct rb_root *tree;
-	struct rb_node *node;
 	struct dlm_ls *ls = seq->private;
-	struct rsbtbl_iter *ri;
-	struct dlm_rsb *r;
-	loff_t n = *pos;
-	unsigned bucket, entry;
-	int toss = (seq->op == &format4_seq_ops);
-
-	bucket = n >> 32;
-	entry = n & ((1LL << 32) - 1);
-
-	if (bucket >= ls->ls_rsbtbl_size)
-		return NULL;
-
-	ri = kzalloc(sizeof(*ri), GFP_NOFS);
-	if (!ri)
-		return NULL;
-	if (n == 0)
-		ri->header = 1;
-	if (seq->op == &format1_seq_ops)
-		ri->format = 1;
-	if (seq->op == &format2_seq_ops)
-		ri->format = 2;
-	if (seq->op == &format3_seq_ops)
-		ri->format = 3;
-	if (seq->op == &format4_seq_ops)
-		ri->format = 4;
-
-	tree = &ls->ls_rsbtbl[bucket].r;
+	struct list_head *list;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	if (!RB_EMPTY_ROOT(tree)) {
-		for (node = rb_first(tree); node; node = rb_next(node)) {
-			r = rb_entry(node, struct dlm_rsb, res_hashnode);
-			if (toss) {
-				if (!rsb_flag(r, RSB_TOSS))
-					continue;
-			} else {
-				if (rsb_flag(r, RSB_TOSS))
-					continue;
-			}
-
-			if (!entry--) {
-				dlm_hold_rsb(r);
-				ri->rsb = r;
-				ri->bucket = bucket;
-				spin_unlock_bh(&ls->ls_rsbtbl_lock);
-				return ri;
-			}
-		}
+	if (!*pos) {
+		if (seq->op == &format2_seq_ops)
+			seq_puts(seq, "id nodeid remid pid xid exflags flags sts grmode rqmode time_ms r_nodeid r_len r_name\n");
+		else if (seq->op == &format3_seq_ops)
+			seq_puts(seq, "rsb ptr nodeid first_lkid flags !root_list_empty !recover_list_empty recover_locks_count len\n");
+		else if (seq->op == &format4_seq_ops)
+			seq_puts(seq, "rsb ptr nodeid master_nodeid dir_nodeid our_nodeid toss_time flags len str|hex name\n");
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
-	/*
-	 * move to the first rsb in the next non-empty bucket
-	 */
-
-	/* zero the entry */
-	n &= ~((1LL << 32) - 1);
 
-	while (1) {
-		bucket++;
-		n += 1LL << 32;
+	if (seq->op == &format4_seq_ops)
+		list = &ls->ls_toss;
+	else
+		list = &ls->ls_keep;
 
-		if (bucket >= ls->ls_rsbtbl_size) {
-			kfree(ri);
-			return NULL;
-		}
-		tree = &ls->ls_rsbtbl[bucket].r;
-
-		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		if (!RB_EMPTY_ROOT(tree)) {
-			node = rb_first(tree);
-			r = rb_entry(node, struct dlm_rsb, res_hashnode);
-			if (toss) {
-				if (!rsb_flag(r, RSB_TOSS))
-					continue;
-			} else {
-				if (rsb_flag(r, RSB_TOSS))
-					continue;
-			}
-
-			dlm_hold_rsb(r);
-			ri->rsb = r;
-			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			*pos = n;
-			return ri;
-		}
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	}
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	return seq_list_start(list, *pos);
 }
 
 static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
-	struct rsbtbl_iter *ri = iter_ptr;
-	struct rb_root *tree;
-	struct rb_node *next;
-	struct dlm_rsb *r, *rp;
-	loff_t n = *pos;
-	unsigned bucket;
-	int toss = (seq->op == &format4_seq_ops);
-
-	bucket = n >> 32;
-
-	/*
-	 * move to the next rsb in the same bucket
-	 */
-
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	rp = ri->rsb;
-	next = rb_next(&rp->res_hashnode);
-
-	if (next) {
-		r = rb_entry(next, struct dlm_rsb, res_hashnode);
-		dlm_hold_rsb(r);
-		ri->rsb = r;
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-		dlm_put_rsb(rp);
-		++*pos;
-		return ri;
-	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	dlm_put_rsb(rp);
-
-	/*
-	 * move to the first rsb in the next non-empty bucket
-	 */
+	struct list_head *list;
 
-	/* zero the entry */
-	n &= ~((1LL << 32) - 1);
-
-	while (1) {
-		bucket++;
-		n += 1LL << 32;
+	if (seq->op == &format4_seq_ops)
+		list = &ls->ls_toss;
+	else
+		list = &ls->ls_keep;
 
-		if (bucket >= ls->ls_rsbtbl_size) {
-			kfree(ri);
-			++*pos;
-			return NULL;
-		}
-		tree = &ls->ls_rsbtbl[bucket].r;
-
-		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		if (!RB_EMPTY_ROOT(tree)) {
-			next = rb_first(tree);
-			r = rb_entry(next, struct dlm_rsb, res_hashnode);
-			if (toss) {
-				if (!rsb_flag(r, RSB_TOSS))
-					continue;
-			} else {
-				if (rsb_flag(r, RSB_TOSS))
-					continue;
-			}
-			dlm_hold_rsb(r);
-			ri->rsb = r;
-			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			*pos = n;
-			return ri;
-		}
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	}
+	return seq_list_next(iter_ptr, list, pos);
 }
 
 static void table_seq_stop(struct seq_file *seq, void *iter_ptr)
 {
-	struct rsbtbl_iter *ri = iter_ptr;
+	struct dlm_ls *ls = seq->private;
 
-	if (ri) {
-		dlm_put_rsb(ri->rsb);
-		kfree(ri);
-	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static const struct seq_operations format1_seq_ops = {
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index af88fc2f978c..6d06840029c3 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -339,6 +339,7 @@ struct dlm_rsb {
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
 
+	struct list_head	res_rsbs_list;
 	struct list_head	res_root_list;	    /* used for recovery */
 	struct list_head	res_masters_list;   /* used for recovery */
 	struct list_head	res_recover_list;   /* used for recovery */
@@ -595,6 +596,9 @@ struct dlm_ls {
 	spinlock_t		ls_rsbtbl_lock;
 	uint32_t		ls_rsbtbl_size;
 
+	struct list_head	ls_toss;
+	struct list_head	ls_keep;
+
 	spinlock_t		ls_waiters_lock;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 08ec1a04476a..a70b8edb5d3f 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -668,6 +668,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		r->res_first_lkid = 0;
 	}
 
+	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
@@ -730,6 +731,8 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
  out_add:
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	if (!error)
+		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -801,6 +804,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		r->res_nodeid = 0;
 	}
 
+	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
@@ -826,6 +830,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	kref_init(&r->res_ref);
 
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	if (!error)
+		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -1110,6 +1116,8 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		goto retry;
 	}
 
+	list_add(&r->res_rsbs_list, &ls->ls_toss);
+
 	if (result)
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
@@ -1120,20 +1128,12 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
-	int i;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (rsb_flag(r, RSB_TOSS))
-				continue;
-
-			if (r->res_hash == hash)
-				dlm_dump_rsb(r);
-		}
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+		if (r->res_hash == hash)
+			dlm_dump_rsb(r);
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
@@ -1166,6 +1166,7 @@ static void toss_rsb(struct kref *kref)
 	kref_init(&r->res_ref);
 	WARN_ON(rsb_flag(r, RSB_TOSS));
 	rsb_set_flag(r, RSB_TOSS);
+	list_move(&r->res_rsbs_list, &ls->ls_toss);
 	r->res_toss_time = jiffies;
 	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
 	if (r->res_lvbptr) {
@@ -1672,6 +1673,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
+		list_del(&r->res_rsbs_list);
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		dlm_free_rsb(r);
 	}
@@ -1740,6 +1742,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
+		list_del(&r->res_rsbs_list);
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -4243,6 +4246,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
+		list_del(&r->res_rsbs_list);
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
@@ -5313,17 +5317,12 @@ void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
 			  lkb_count, nodes_count);
 }
 
-static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (n = rb_first(&ls->ls_rsbtbl[bucket].r); n; n = rb_next(n)) {
-		r = rb_entry(n, struct dlm_rsb, res_hashnode);
-		if (rsb_flag(r, RSB_TOSS))
-			continue;
-
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
 			continue;
 		if (!is_master(r)) {
@@ -5358,19 +5357,15 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 void dlm_recover_grant(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
-	int bucket = 0;
 	unsigned int count = 0;
 	unsigned int rsb_count = 0;
 	unsigned int lkb_count = 0;
 
 	while (1) {
-		r = find_grant_rsb(ls, bucket);
-		if (!r) {
-			if (bucket == ls->ls_rsbtbl_size - 1)
-				break;
-			bucket++;
-			continue;
-		}
+		r = find_grant_rsb(ls);
+		if (!r)
+			break;
+
 		rsb_count++;
 		count = 0;
 		lock_rsb(r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index b5184ad550fa..2b5771a7bf31 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -495,6 +495,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	 */
 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
 
+	INIT_LIST_HEAD(&ls->ls_toss);
+	INIT_LIST_HEAD(&ls->ls_keep);
 	spin_lock_init(&ls->ls_rsbtbl_lock);
 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
 	ls->ls_rsbtbl_size = size;
@@ -838,6 +840,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			list_del(&rsb->res_rsbs_list);
 			rb_erase(n, &ls->ls_rsbtbl[i].r);
 			dlm_free_rsb(rsb);
 		}
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index e53d88e4ec93..512c1ae81a96 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -881,23 +881,15 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
 
 void dlm_clear_toss(struct dlm_ls *ls)
 {
-	struct rb_node *n, *next;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r, *safe;
 	unsigned int count = 0;
-	int i;
-
-	spin_lock(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = next) {
-			next = rb_next(n);
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (!rsb_flag(r, RSB_TOSS))
-				continue;
-
-			rb_erase(n, &ls->ls_rsbtbl[i].r);
-			dlm_free_rsb(r);
-			count++;
-		}
+
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
+		list_del(&r->res_rsbs_list);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].r);
+		dlm_free_rsb(r);
+		count++;
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index ad696528ebe7..5e8e10030b74 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -22,9 +22,8 @@
 
 static int dlm_create_masters_list(struct dlm_ls *ls)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
-	int i, error = 0;
+	int error = 0;
 
 	write_lock_bh(&ls->ls_masters_lock);
 	if (!list_empty(&ls->ls_masters_list)) {
@@ -34,15 +33,12 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 	}
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (rsb_flag(r, RSB_TOSS) || r->res_nodeid)
-				continue;
-
-			list_add(&r->res_masters_list, &ls->ls_masters_list);
-			dlm_hold_rsb(r);
-		}
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+		if (r->res_nodeid)
+			continue;
+
+		list_add(&r->res_masters_list, &ls->ls_masters_list);
+		dlm_hold_rsb(r);
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -64,21 +60,15 @@ static void dlm_release_masters_list(struct dlm_ls *ls)
 
 static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
-	int i;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (WARN_ON_ONCE(rsb_flag(r, RSB_TOSS)))
-				continue;
-
-			list_add(&r->res_root_list, root_list);
-			dlm_hold_rsb(r);
-		}
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+		list_add(&r->res_root_list, root_list);
+		dlm_hold_rsb(r);
 	}
+
+	WARN_ON_ONCE(!list_empty(&ls->ls_toss));
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 5/9] dlm: switch to use rhashtable for rsbs
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (3 preceding siblings ...)
  2024-04-10 13:48 ` [RFC dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 6/9] dlm: remove refcounting if rsb is on toss Alexander Aring
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch switches to use the rhashtable implementation to lookup rsbs
instead doing an own bucket hash implementation. The rhashtable
implementation deals internally to handle the hashbuckets depending on
the amount of the entries it stores. Currently ls_rsbtbl_size to set the
bucket size is only a guess, in reality we don't know how the user is
using DLM. We move this decision to the rhashtable handling that does
this dynamically during runtime.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/config.c       |   8 ++
 fs/dlm/config.h       |   2 +
 fs/dlm/dir.c          |   6 +-
 fs/dlm/dlm_internal.h |  18 ++---
 fs/dlm/lock.c         | 172 +++++++++++++-----------------------------
 fs/dlm/lock.h         |   2 +-
 fs/dlm/lockspace.c    |  35 ++++-----
 fs/dlm/recover.c      |   3 +-
 8 files changed, 86 insertions(+), 160 deletions(-)

diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index e55e0a2cd2e8..517fa975dc5a 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -63,6 +63,14 @@ static void release_node(struct config_item *);
 static struct configfs_attribute *comm_attrs[];
 static struct configfs_attribute *node_attrs[];
 
+const struct rhashtable_params dlm_rhash_rsb_params = {
+	.nelem_hint = 3, /* start small */
+	.key_len = DLM_RESNAME_MAXLEN,
+	.key_offset = offsetof(struct dlm_rsb, res_name),
+	.head_offset = offsetof(struct dlm_rsb, res_node),
+	.automatic_shrinking = true,
+};
+
 struct dlm_cluster {
 	struct config_group group;
 	unsigned int cl_tcp_port;
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 4c91fcca0fd4..ed237d910208 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -21,6 +21,8 @@ struct dlm_config_node {
 	uint32_t comm_seq;
 };
 
+extern const struct rhashtable_params dlm_rhash_rsb_params;
+
 #define DLM_MAX_ADDR_COUNT 3
 
 #define DLM_PROTO_TCP	0
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index f8039f3ee2d1..9687f908476b 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -198,14 +198,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 				     int len)
 {
 	struct dlm_rsb *r;
-	uint32_t hash, bucket;
 	int rv;
 
-	hash = jhash(name, len, 0);
-	bucket = hash & (ls->ls_rsbtbl_size - 1);
-
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].r, name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	if (!rv)
 		return r;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 6d06840029c3..cf43b97cf3e5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -34,6 +34,7 @@
 #include <linux/kernel.h>
 #include <linux/jhash.h>
 #include <linux/miscdevice.h>
+#include <linux/rhashtable.h>
 #include <linux/mutex.h>
 #include <linux/idr.h>
 #include <linux/ratelimit.h>
@@ -99,15 +100,6 @@ do { \
   } \
 }
 
-
-#define DLM_RTF_SHRINK_BIT	0
-
-struct dlm_rsbtable {
-	struct rb_root          r;
-	unsigned long		flags;
-};
-
-
 /*
  * Lockspace member (per node in a ls)
  */
@@ -327,13 +319,12 @@ struct dlm_rsb {
 	int			res_id;		/* for ls_recover_idr */
 	uint32_t                res_lvbseq;
 	uint32_t		res_hash;
-	uint32_t		res_bucket;	/* rsbtbl */
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
 	union {
 		struct list_head	res_hashchain;
-		struct rb_node		res_hashnode;	/* rsbtbl */
+		struct rhash_head	res_node; /* rsbtbl */
 	};
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
@@ -592,9 +583,10 @@ struct dlm_ls {
 	struct idr		ls_lkbidr;
 	spinlock_t		ls_lkbidr_spin;
 
-	struct dlm_rsbtable	*ls_rsbtbl;
+	struct rhashtable	ls_rsbtbl;
+#define DLM_RTF_SHRINK_BIT	0
+	unsigned long		ls_rsbtbl_flags;
 	spinlock_t		ls_rsbtbl_lock;
-	uint32_t		ls_rsbtbl_size;
 
 	struct list_head	ls_toss;
 	struct list_head	ls_keep;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index a70b8edb5d3f..defb90b56b72 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -436,8 +436,6 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 
 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 	list_del(&r->res_hashchain);
-	/* Convert the empty list_head to a NULL rb_node for tree usage: */
-	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
 	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
@@ -458,67 +456,31 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	return 0;
 }
 
-static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
+			struct dlm_rsb **r_ret)
 {
-	char maxname[DLM_RESNAME_MAXLEN];
+	char key[DLM_RESNAME_MAXLEN] = {};
 
-	memset(maxname, 0, DLM_RESNAME_MAXLEN);
-	memcpy(maxname, name, nlen);
-	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
-}
+	memcpy(key, name, len);
+	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
+	if (*r_ret)
+		return 0;
 
-int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
-			struct dlm_rsb **r_ret)
-{
-	struct rb_node *node = tree->rb_node;
-	struct dlm_rsb *r;
-	int rc;
-
-	while (node) {
-		r = rb_entry(node, struct dlm_rsb, res_hashnode);
-		rc = rsb_cmp(r, name, len);
-		if (rc < 0)
-			node = node->rb_left;
-		else if (rc > 0)
-			node = node->rb_right;
-		else
-			goto found;
-	}
-	*r_ret = NULL;
 	return -EBADR;
-
- found:
-	*r_ret = r;
-	return 0;
 }
 
-static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-	struct rb_node **newn = &tree->rb_node;
-	struct rb_node *parent = NULL;
-	int rc;
-
-	while (*newn) {
-		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
-					       res_hashnode);
+	int rv;
 
-		parent = *newn;
-		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
-		if (rc < 0)
-			newn = &parent->rb_left;
-		else if (rc > 0)
-			newn = &parent->rb_right;
-		else {
-			log_print("rsb_insert match");
-			dlm_dump_rsb(rsb);
-			dlm_dump_rsb(cur);
-			return -EEXIST;
-		}
+	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+				    dlm_rhash_rsb_params);
+	if (rv == -EEXIST) {
+		log_print("%s match", __func__);
+		dlm_dump_rsb(rsb);
 	}
 
-	rb_link_node(&rsb->res_hashnode, parent, newn);
-	rb_insert_color(&rsb->res_hashnode, tree);
-	return 0;
+	return rv;
 }
 
 /*
@@ -566,8 +528,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
  */
 
 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
-			uint32_t hash, uint32_t b,
-			int dir_nodeid, int from_nodeid,
+			uint32_t hash, int dir_nodeid, int from_nodeid,
 			unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r = NULL;
@@ -616,7 +577,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (error)
 		goto do_new;
 	
@@ -690,7 +651,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		goto out_unlock;
 
 	r->res_hash = hash;
-	r->res_bucket = b;
 	r->res_dir_nodeid = dir_nodeid;
 	kref_init(&r->res_ref);
 
@@ -730,7 +690,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	}
 
  out_add:
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
 	if (!error)
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
@@ -745,8 +705,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
    dlm_recover_masters). */
 
 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
-			  uint32_t hash, uint32_t b,
-			  int dir_nodeid, int from_nodeid,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
 			  unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r = NULL;
@@ -761,7 +720,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (error)
 		goto do_new;
 
@@ -823,13 +782,12 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		goto out_unlock;
 
 	r->res_hash = hash;
-	r->res_bucket = b;
 	r->res_dir_nodeid = dir_nodeid;
 	r->res_master_nodeid = dir_nodeid;
 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
 	if (!error)
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
@@ -843,23 +801,21 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
 		    int from_nodeid, unsigned int flags,
 		    struct dlm_rsb **r_ret)
 {
-	uint32_t hash, b;
 	int dir_nodeid;
+	uint32_t hash;
 
 	if (len > DLM_RESNAME_MAXLEN)
 		return -EINVAL;
 
 	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	dir_nodeid = dlm_hash2nodeid(ls, hash);
 
 	if (dlm_no_directory(ls))
-		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
+		return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
 				      from_nodeid, flags, r_ret);
 	else
-		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
-				      from_nodeid, flags, r_ret);
+		return find_rsb_dir(ls, name, len, hash, dir_nodeid,
+				    from_nodeid, flags, r_ret);
 }
 
 /* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1020,7 +976,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		      int len, unsigned int flags, int *r_nodeid, int *result)
 {
 	struct dlm_rsb *r = NULL;
-	uint32_t hash, b;
+	uint32_t hash;
 	int our_nodeid = dlm_our_nodeid();
 	int dir_nodeid, error;
 
@@ -1034,8 +990,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	}
 
 	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	dir_nodeid = dlm_hash2nodeid(ls, hash);
 	if (dir_nodeid != our_nodeid) {
 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
@@ -1051,7 +1005,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		return error;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error) {
 		if (rsb_flag(r, RSB_TOSS))
 			goto do_toss;
@@ -1100,7 +1054,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		goto out_unlock;
 
 	r->res_hash = hash;
-	r->res_bucket = b;
 	r->res_dir_nodeid = our_nodeid;
 	r->res_master_nodeid = from_nodeid;
 	r->res_nodeid = from_nodeid;
@@ -1108,7 +1061,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_toss_time = jiffies;
 	rsb_set_flag(r, RSB_TOSS);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
@@ -1141,14 +1094,10 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 {
 	struct dlm_rsb *r = NULL;
-	uint32_t hash, b;
 	int error;
 
-	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error)
 		goto out;
 
@@ -1168,7 +1117,7 @@ static void toss_rsb(struct kref *kref)
 	rsb_set_flag(r, RSB_TOSS);
 	list_move(&r->res_rsbs_list, &ls->ls_toss);
 	r->res_toss_time = jiffies;
-	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
+	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
 		r->res_lvbptr = NULL;
@@ -1607,10 +1556,9 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	return error;
 }
 
-static void shrink_bucket(struct dlm_ls *ls, int b)
+static void shrink_bucket(struct dlm_ls *ls)
 {
-	struct rb_node *n, *next;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r, *safe;
 	char *name;
 	int our_nodeid = dlm_our_nodeid();
 	int remote_count = 0;
@@ -1621,17 +1569,12 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
+	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags)) {
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
-	for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
-		next = rb_next(n);
-		r = rb_entry(n, struct dlm_rsb, res_hashnode);
-		if (!rsb_flag(r, RSB_TOSS))
-			continue;
-
+	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
 		/* If we're the directory record for this rsb, and
 		   we're not the master of it, then we need to wait
 		   for the master node to send us a dir remove for
@@ -1674,14 +1617,15 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		dlm_free_rsb(r);
 	}
 
 	if (need_shrink)
-		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
+		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
 	else
-		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
+		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	/*
@@ -1698,7 +1642,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		len = ls->ls_remove_lens[i];
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 		if (rv) {
 			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_error(ls, "remove_name not found %s", name);
@@ -1743,7 +1687,8 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
@@ -1753,14 +1698,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 void dlm_scan_rsbs(struct dlm_ls *ls)
 {
-	int i;
-
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		shrink_bucket(ls, i);
-		if (dlm_locking_stopped(ls))
-			break;
-		cond_resched();
-	}
+	shrink_bucket(ls);
 }
 
 /* lkb is master or local copy */
@@ -4174,7 +4112,6 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 {
 	char name[DLM_RESNAME_MAXLEN+1];
 	struct dlm_rsb *r;
-	uint32_t hash, b;
 	int rv, len, dir_nodeid, from_nodeid;
 
 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
@@ -4194,24 +4131,22 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		return;
 	}
 
-	/* Look for name on rsbtbl.toss, if it's there, kill it.
-	   If it's on rsbtbl.keep, it's being used, and we should ignore this
-	   message.  This is an expected race between the dir node sending a
-	   request to the master node at the same time as the master node sends
-	   a remove to the dir node.  The resolution to that race is for the
-	   dir node to ignore the remove message, and the master node to
-	   recreate the master rsb when it gets a request from the dir node for
-	   an rsb it doesn't have. */
+	/* Look for name in rsb toss state, if it's there, kill it.
+	 * If it's in non toss state, it's being used, and we should ignore this
+	 * message.  This is an expected race between the dir node sending a
+	 * request to the master node at the same time as the master node sends
+	 * a remove to the dir node.  The resolution to that race is for the
+	 * dir node to ignore the remove message, and the master node to
+	 * recreate the master rsb when it gets a request from the dir node for
+	 * an rsb it doesn't have.
+	 */
 
 	memset(name, 0, sizeof(name));
 	memcpy(name, ms->m_extra, len);
 
-	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (rv) {
 		/* should not happen */
 		log_error(ls, "%s from %d not found %s", __func__,
@@ -4247,7 +4182,8 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
 	} else {
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 45a74869810a..33616d4b0cdb 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -29,7 +29,7 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		      int len, unsigned int flags, int *r_nodeid, int *result);
 
-int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
+int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
 			struct dlm_rsb **r_ret);
 
 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 2b5771a7bf31..890e1a4cf787 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -410,9 +410,9 @@ static int new_lockspace(const char *name, const char *cluster,
 			 int *ops_result, dlm_lockspace_t **lockspace)
 {
 	struct dlm_ls *ls;
-	int i, size, error;
 	int do_unreg = 0;
 	int namelen = strlen(name);
+	int i, error;
 
 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 		return -EINVAL;
@@ -498,15 +498,10 @@ static int new_lockspace(const char *name, const char *cluster,
 	INIT_LIST_HEAD(&ls->ls_toss);
 	INIT_LIST_HEAD(&ls->ls_keep);
 	spin_lock_init(&ls->ls_rsbtbl_lock);
-	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
-	ls->ls_rsbtbl_size = size;
 
-	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
-	if (!ls->ls_rsbtbl)
+	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
+	if (error)
 		goto out_lsfree;
-	for (i = 0; i < size; i++) {
-		ls->ls_rsbtbl[i].r.rb_node = NULL;
-	}
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
@@ -669,7 +664,7 @@ static int new_lockspace(const char *name, const char *cluster,
  out_rsbtbl:
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 		kfree(ls->ls_remove_names[i]);
-	vfree(ls->ls_rsbtbl);
+	rhashtable_destroy(&ls->ls_rsbtbl);
  out_lsfree:
 	if (do_unreg)
 		kobject_put(&ls->ls_kobj);
@@ -772,10 +767,16 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 	return rv;
 }
 
+static void rhash_free_rsb(void *ptr, void *arg)
+{
+	struct dlm_rsb *rsb = ptr;
+
+	dlm_free_rsb(rsb);
+}
+
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct rb_node *n;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -834,19 +835,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	idr_destroy(&ls->ls_lkbidr);
 
 	/*
-	 * Free all rsb's on rsbtbl[] lists
+	 * Free all rsb's on rsbtbl
 	 */
-
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
-			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_del(&rsb->res_rsbs_list);
-			rb_erase(n, &ls->ls_rsbtbl[i].r);
-			dlm_free_rsb(rsb);
-		}
-	}
-
-	vfree(ls->ls_rsbtbl);
+	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 		kfree(ls->ls_remove_names[i]);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 512c1ae81a96..c21ef115123b 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -887,7 +887,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		dlm_free_rsb(r);
 		count++;
 	}
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 6/9] dlm: remove refcounting if rsb is on toss
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (4 preceding siblings ...)
  2024-04-10 13:48 ` [RFC dlm/next 5/9] dlm: switch to use rhashtable for rsbs Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 7/9] dlm: drop scand kthread and use timers Alexander Aring
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

In the past we had problems when rsb have a reference counter greater
than one when they are in the toss state. A rsb in the toss state is
not "alive" anymore and should not have any other references holding
than just the last one keeping it on the rsb hash. It is a delayed free
as kind of garbage collection with an additional handling to move the
rsb into the keep state thats alive again.

The reference counter should only be used when the rsb is not in toss
state (keep state), if the rsb isn't used anymore in keep state it will
be moved to toss state where it can be freed on the right time. The
release() function of the keep rsb will therefore set the RSB_TOSS state
and does not kref_init() at this state as toss rsbs should never deal
with any reference counting. If the rsb got alive again then the
RSB_TOSS flag will be cleared and the kref_init() will be called to use
reference counters when the rsb is in keep state.

There are warnings introduced now that will be triggered if we ever use
any reference counting when a rsb is in toss state. All before
kref_put() in toss state was only a sanity check if it ever wasn't 1.
This will be removed and we directly free the rsb when it should be
freed in the toss state. If there are still references around it is a
bug.

This WARN_ON_ONCE() should hopefully find maybe more cases where we hold
references of a toss rsb which we should not have as receive_remove() in
the most cases requires to have the rsb in toss state. Otherwise we can
run into problems when e.g. the directory rsb wasn't removed and the
directory node tells the master node in case of a lookup message.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c    | 61 ++++++++++++++++++++++++------------------------
 fs/dlm/lock.h    |  1 +
 fs/dlm/recover.c |  2 +-
 3 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index defb90b56b72..fee1a4164fc1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -325,6 +325,8 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 
 static inline void hold_rsb(struct dlm_rsb *r)
 {
+	/* rsbs in toss state never get referenced */
+	WARN_ON(rsb_flag(r, RSB_TOSS));
 	kref_get(&r->res_ref);
 }
 
@@ -631,6 +633,11 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
+	/* rsb got out of toss state, it becomes alive again
+	 * and we reinit the reference counter that is only
+	 * valid for keep state rsbs
+	 */
+	kref_init(&r->res_ref);
 	goto out_unlock;
 
 
@@ -765,6 +772,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
+	/* rsb got out of toss state, it becomes alive again
+	 * and we reinit the reference counter that is only
+	 * valid for keep state rsbs
+	 */
+	kref_init(&r->res_ref);
 	goto out_unlock;
 
 
@@ -1112,8 +1124,6 @@ static void toss_rsb(struct kref *kref)
 	struct dlm_ls *ls = r->res_ls;
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
-	kref_init(&r->res_ref);
-	WARN_ON(rsb_flag(r, RSB_TOSS));
 	rsb_set_flag(r, RSB_TOSS);
 	list_move(&r->res_rsbs_list, &ls->ls_toss);
 	r->res_toss_time = jiffies;
@@ -1129,16 +1139,20 @@ static void toss_rsb(struct kref *kref)
 static void unhold_rsb(struct dlm_rsb *r)
 {
 	int rv;
+
+	/* rsbs in toss state never get referenced */
+	WARN_ON(rsb_flag(r, RSB_TOSS));
 	rv = kref_put(&r->res_ref, toss_rsb);
 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
 }
 
-static void kill_rsb(struct kref *kref)
+void free_toss_rsb(struct dlm_rsb *r)
 {
-	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
+	WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
 
-	/* All work is done after the return from kref_put() so we
-	   can release the write_lock before the remove and free. */
+	/* check if all work is done after the rsb is on toss list
+	 * and it can be freed.
+	 */
 
 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
@@ -1147,6 +1161,8 @@ static void kill_rsb(struct kref *kref)
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
+
+	dlm_free_rsb(r);
 }
 
 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
@@ -1611,15 +1627,10 @@ static void shrink_bucket(struct dlm_ls *ls)
 			continue;
 		}
 
-		if (!kref_put(&r->res_ref, kill_rsb)) {
-			log_error(ls, "tossed rsb in use %s", r->res_name);
-			continue;
-		}
-
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
-		dlm_free_rsb(r);
+		free_toss_rsb(r);
 	}
 
 	if (need_shrink)
@@ -1680,19 +1691,13 @@ static void shrink_bucket(struct dlm_ls *ls)
 			continue;
 		}
 
-		if (!kref_put(&r->res_ref, kill_rsb)) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_error(ls, "remove_name in use %s", name);
-			continue;
-		}
-
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
-		dlm_free_rsb(r);
+		free_toss_rsb(r);
 	}
 }
 
@@ -4180,18 +4185,12 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		return;
 	}
 
-	if (kref_put(&r->res_ref, kill_rsb)) {
-		list_del(&r->res_rsbs_list);
-		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
-				       dlm_rhash_rsb_params);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-		dlm_free_rsb(r);
-	} else {
-		log_error(ls, "receive_remove from %d rsb ref error",
-			  from_nodeid);
-		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	}
+	list_del(&r->res_rsbs_list);
+	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+			       dlm_rhash_rsb_params);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+
+	free_toss_rsb(r);
 }
 
 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 33616d4b0cdb..b56a34802762 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -18,6 +18,7 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
 			       uint32_t saved_seq);
 void dlm_receive_buffer(const union dlm_packet *p, int nodeid);
 int dlm_modes_compat(int mode1, int mode2);
+void free_toss_rsb(struct dlm_rsb *r);
 void dlm_put_rsb(struct dlm_rsb *r);
 void dlm_hold_rsb(struct dlm_rsb *r);
 int dlm_put_lkb(struct dlm_lkb *lkb);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index c21ef115123b..d43189532b14 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -889,7 +889,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
-		dlm_free_rsb(r);
+		free_toss_rsb(r);
 		count++;
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 7/9] dlm: drop scand kthread and use timers
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (5 preceding siblings ...)
  2024-04-10 13:48 ` [RFC dlm/next 6/9] dlm: remove refcounting if rsb is on toss Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-11 14:00   ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 9/9] dlm: convert lkbidr to rwlock Alexander Aring
  8 siblings, 1 reply; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Currently the scand kthread acts like a garbage collection for expired
rsbs on toss list to clean them up after a certain timeout. It triggers
every couple of seconds and iterates over the toss list while holding
ls_rsbtbl_lock for the whole hash bucket iteration. To reduce the amount
of holding the ls_rsbtbl_lock time we handle free of cached rsbs (tossed
rsbs) on a per rsb timer. If it expires the rsb deletes itseld out of
rsb data structures. Addtional the timer can run on any cpu instead of
scand that was only running on one cpu, so there should be a performance
speedup handling such freeing of resources.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  15 ++-
 fs/dlm/lock.c         | 272 +++++++++++++++++++++---------------------
 fs/dlm/lockspace.c    | 105 +++-------------
 fs/dlm/recover.c      |  24 +++-
 4 files changed, 178 insertions(+), 238 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index cf43b97cf3e5..0ab76ed8685e 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -320,6 +320,7 @@ struct dlm_rsb {
 	uint32_t                res_lvbseq;
 	uint32_t		res_hash;
 	unsigned long		res_toss_time;
+	struct timer_list	res_toss_timer;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
 	union {
@@ -368,6 +369,7 @@ enum rsb_flags {
 	RSB_RECOVER_GRANT,
 	RSB_RECOVER_LVB_INVAL,
 	RSB_TOSS,
+	RSB_TIMER_KILLED,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@@ -584,8 +586,6 @@ struct dlm_ls {
 	spinlock_t		ls_lkbidr_spin;
 
 	struct rhashtable	ls_rsbtbl;
-#define DLM_RTF_SHRINK_BIT	0
-	unsigned long		ls_rsbtbl_flags;
 	spinlock_t		ls_rsbtbl_lock;
 
 	struct list_head	ls_toss;
@@ -601,9 +601,6 @@ struct dlm_ls {
 	int			ls_new_rsb_count;
 	struct list_head	ls_new_rsb;	/* new rsb structs */
 
-	char			*ls_remove_names[DLM_REMOVE_NAMES_MAX];
-	int			ls_remove_lens[DLM_REMOVE_NAMES_MAX];
-
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
@@ -698,6 +695,13 @@ struct dlm_ls {
  * dlm_ls_stop() clears this to tell dlm locking routines that they should
  * quit what they are doing so recovery can run.  dlm_recoverd sets
  * this after recovery is finished.
+ *
+ * LSFL_TIMER_KILLED - a per lockspace switch that will kill all timers in
+ * sense of they don't do anything anymore. To set this flag the caller
+ * need to held the ls_in_recovery write lock. It's useful to just don't iterate
+ * over all timers and call timer_shutdown_sync() to stop every res_toss_timer
+ * that is currently ongoing and queued e.g. when release_lockspace() is called
+ * and we need to force to stop everything.
  */
 
 #define LSFL_RECOVER_STOP	0
@@ -712,6 +716,7 @@ struct dlm_ls {
 #define LSFL_CB_DELAY		9
 #define LSFL_NODIR		10
 #define LSFL_RECV_MSG_BLOCKED	11
+#define LSFL_TIMER_KILLED	12
 
 #define DLM_PROC_FLAGS_CLOSING 1
 #define DLM_PROC_FLAGS_COMPAT  2
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index fee1a4164fc1..b3f0b0445812 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -320,6 +320,11 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
  * Basic operations on rsb's and lkb's
  */
 
+static inline unsigned long rsb_toss_jiffies(const struct dlm_rsb *r)
+{
+	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
+}
+
 /* This is only called to add a reference when the code already holds
    a valid reference to the rsb, so there's no need for locking. */
 
@@ -416,6 +421,108 @@ static int pre_rsb_struct(struct dlm_ls *ls)
 	return 0;
 }
 
+/* Caller must held ls_rsbtbl_lock and need to be called every time
+ * when either the rsb enters toss state or the toss state changes
+ * the dir/master nodeid.
+ */
+static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
+{
+	int our_nodeid = dlm_our_nodeid();
+
+	/* If we're the directory record for this rsb, and
+	 * we're not the master of it, then we need to wait
+	 * for the master node to send us a dir remove for
+	 * before removing the dir record.
+	 */
+	if (!dlm_no_directory(ls) &&
+	    (r->res_master_nodeid != our_nodeid) &&
+	    (dlm_dir_nodeid(r) == our_nodeid)) {
+		/* indicates we waiting for send_remove message */
+		r->res_toss_time = 0;
+		rsb_set_flag(r, RSB_TIMER_KILLED);
+		timer_delete(&r->res_toss_timer);
+		return;
+	}
+
+	rsb_clear_flag(r, RSB_TIMER_KILLED);
+	r->res_toss_time = jiffies;
+	mod_timer(&r->res_toss_timer, rsb_toss_jiffies(r));
+}
+
+static void dlm_rsb_toss_timer(struct timer_list *timer)
+{
+	struct dlm_rsb *r = container_of(timer, struct dlm_rsb, res_toss_timer);
+	int our_nodeid = dlm_our_nodeid();
+	struct dlm_ls *ls = r->res_ls;
+	int rv;
+
+	rv = dlm_lock_recovery_try(ls);
+	if (!rv) {
+		/* rearm timer */
+		mod_timer(&r->res_toss_timer, rsb_toss_jiffies(r));
+		return;
+	}
+
+	/* check if the timer was killed on a per lockspace basis
+	 * this flag is set when the ls_in_recovery lock is held
+	 * as write lock.
+	 */
+	if (test_bit(LSFL_TIMER_KILLED, &ls->ls_flags)) {
+		dlm_unlock_recovery(ls);
+		return;
+	}
+
+	rv = spin_trylock(&ls->ls_rsbtbl_lock);
+	if (!rv) {
+		dlm_unlock_recovery(ls);
+		/* rearm timer */
+		mod_timer(&r->res_toss_timer, rsb_toss_jiffies(r));
+		return;
+	}
+
+	/* timer got killed we do nothing, in case of the
+	 * timer got rearmed because a rsb switched to toss,
+	 * to keep and toss again and the timer of the first
+	 * toss state is still executed we check on if the
+	 * timer was rearmed then we have a new toss timeout
+	 * so we check if the timer is currently pending as
+	 * well.
+	 */
+	if (rsb_flag(r, RSB_TIMER_KILLED) ||
+	    timer_pending(&r->res_toss_timer)) {
+		spin_unlock(&ls->ls_rsbtbl_lock);
+		dlm_unlock_recovery(ls);
+		return;
+	}
+
+	list_del(&r->res_rsbs_list);
+	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+			       dlm_rhash_rsb_params);
+
+	/* not necessary to held the ls_rsbtbl_lock when
+	 * calling send_remove() its gone from DLM now
+	 */
+	spin_unlock(&ls->ls_rsbtbl_lock);
+
+	/* no rsb in this state should ever run a timer */
+	WARN_ON(!dlm_no_directory(ls) &&
+		(r->res_master_nodeid != our_nodeid) &&
+		(dlm_dir_nodeid(r) == our_nodeid));
+
+	/* We're the master of this rsb but we're not
+	 * the directory record, so we need to tell the
+	 * dir node to remove the dir record
+	 */
+	if (!dlm_no_directory(ls) &&
+	    (r->res_master_nodeid == our_nodeid) &&
+	    (dlm_dir_nodeid(r) != our_nodeid))
+		send_remove(r);
+
+	dlm_unlock_recovery(ls);
+
+	free_toss_rsb(r);
+}
+
 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
    unlock any spinlocks, go back and call pre_rsb_struct again.
    Otherwise, take an rsb off the list and return it. */
@@ -454,6 +561,9 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	INIT_LIST_HEAD(&r->res_recover_list);
 	INIT_LIST_HEAD(&r->res_masters_list);
 
+	timer_setup(&r->res_toss_timer, dlm_rsb_toss_timer,
+		    TIMER_DEFERRABLE);
+
 	*r_ret = r;
 	return 0;
 }
@@ -638,6 +748,15 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	 * valid for keep state rsbs
 	 */
 	kref_init(&r->res_ref);
+	/* force kill timer, we can't wait here that the timer
+	 * is done exectuing but we hold the ls_rsbtbl_lock lock
+	 * to make sure the timer does not do anything anymore.
+	 * The timer_delete() is just a optimization if the timer
+	 * is pending we stop to be executec if expired.
+	 */
+	rsb_set_flag(r, RSB_TIMER_KILLED);
+	timer_delete(&r->res_toss_timer);
+
 	goto out_unlock;
 
 
@@ -777,6 +896,15 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	 * valid for keep state rsbs
 	 */
 	kref_init(&r->res_ref);
+	/* force kill timer, we can't wait here that the timer
+	 * is done exectuing but we hold the ls_rsbtbl_lock lock
+	 * to make sure the timer does not do anything anymore.
+	 * The timer_delete() is just a optimization if the timer
+	 * is pending we stop to be executec if expired.
+	 */
+	rsb_set_flag(r, RSB_TIMER_KILLED);
+	timer_delete(&r->res_toss_timer);
+
 	goto out_unlock;
 
 
@@ -1050,7 +1178,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
 			    r_nodeid, result);
 
-	r->res_toss_time = jiffies;
+	rsb_mod_timer(ls, r);
 	/* the rsb was inactive (on toss list) */
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
@@ -1070,7 +1198,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_master_nodeid = from_nodeid;
 	r->res_nodeid = from_nodeid;
 	kref_init(&r->res_ref);
-	r->res_toss_time = jiffies;
 	rsb_set_flag(r, RSB_TOSS);
 
 	error = rsb_insert(r, &ls->ls_rsbtbl);
@@ -1082,6 +1209,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	}
 
 	list_add(&r->res_rsbs_list, &ls->ls_toss);
+	rsb_mod_timer(ls, r);
 
 	if (result)
 		*result = DLM_LU_ADD;
@@ -1126,8 +1254,8 @@ static void toss_rsb(struct kref *kref)
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	rsb_set_flag(r, RSB_TOSS);
 	list_move(&r->res_rsbs_list, &ls->ls_toss);
-	r->res_toss_time = jiffies;
-	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
+	rsb_mod_timer(ls, r);
+
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
 		r->res_lvbptr = NULL;
@@ -1154,6 +1282,8 @@ void free_toss_rsb(struct dlm_rsb *r)
 	 * and it can be freed.
 	 */
 
+	WARN_ON_ONCE(timer_pending(&r->res_toss_timer));
+
 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
@@ -1572,140 +1702,6 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	return error;
 }
 
-static void shrink_bucket(struct dlm_ls *ls)
-{
-	struct dlm_rsb *r, *safe;
-	char *name;
-	int our_nodeid = dlm_our_nodeid();
-	int remote_count = 0;
-	int need_shrink = 0;
-	int i, len, rv;
-
-	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
-
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
-
-	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags)) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-		return;
-	}
-
-	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
-		/* If we're the directory record for this rsb, and
-		   we're not the master of it, then we need to wait
-		   for the master node to send us a dir remove for
-		   before removing the dir record. */
-
-		if (!dlm_no_directory(ls) &&
-		    (r->res_master_nodeid != our_nodeid) &&
-		    (dlm_dir_nodeid(r) == our_nodeid)) {
-			continue;
-		}
-
-		need_shrink = 1;
-
-		if (!time_after_eq(jiffies, r->res_toss_time +
-				   dlm_config.ci_toss_secs * HZ)) {
-			continue;
-		}
-
-		if (!dlm_no_directory(ls) &&
-		    (r->res_master_nodeid == our_nodeid) &&
-		    (dlm_dir_nodeid(r) != our_nodeid)) {
-
-			/* We're the master of this rsb but we're not
-			   the directory record, so we need to tell the
-			   dir node to remove the dir record. */
-
-			ls->ls_remove_lens[remote_count] = r->res_length;
-			memcpy(ls->ls_remove_names[remote_count], r->res_name,
-			       DLM_RESNAME_MAXLEN);
-			remote_count++;
-
-			if (remote_count >= DLM_REMOVE_NAMES_MAX)
-				break;
-			continue;
-		}
-
-		list_del(&r->res_rsbs_list);
-		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
-				       dlm_rhash_rsb_params);
-		free_toss_rsb(r);
-	}
-
-	if (need_shrink)
-		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
-	else
-		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
-	/*
-	 * While searching for rsb's to free, we found some that require
-	 * remote removal.  We leave them in place and find them again here
-	 * so there is a very small gap between removing them from the toss
-	 * list and sending the removal.  Keeping this gap small is
-	 * important to keep us (the master node) from being out of sync
-	 * with the remote dir node for very long.
-	 */
-
-	for (i = 0; i < remote_count; i++) {
-		name = ls->ls_remove_names[i];
-		len = ls->ls_remove_lens[i];
-
-		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-		if (rv) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_error(ls, "remove_name not found %s", name);
-			continue;
-		}
-
-		if (!rsb_flag(r, RSB_TOSS)) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_debug(ls, "remove_name not toss %s", name);
-			continue;
-		}
-
-		if (r->res_master_nodeid != our_nodeid) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_debug(ls, "remove_name master %d dir %d our %d %s",
-				  r->res_master_nodeid, r->res_dir_nodeid,
-				  our_nodeid, name);
-			continue;
-		}
-
-		if (r->res_dir_nodeid == our_nodeid) {
-			/* should never happen */
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_error(ls, "remove_name dir %d master %d our %d %s",
-				  r->res_dir_nodeid, r->res_master_nodeid,
-				  our_nodeid, name);
-			continue;
-		}
-
-		if (!time_after_eq(jiffies, r->res_toss_time +
-				   dlm_config.ci_toss_secs * HZ)) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_debug(ls, "remove_name toss_time %lu now %lu %s",
-				  r->res_toss_time, jiffies, name);
-			continue;
-		}
-
-		list_del(&r->res_rsbs_list);
-		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
-				       dlm_rhash_rsb_params);
-		send_remove(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
-		free_toss_rsb(r);
-	}
-}
-
-void dlm_scan_rsbs(struct dlm_ls *ls)
-{
-	shrink_bucket(ls);
-}
-
 /* lkb is master or local copy */
 
 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 890e1a4cf787..df7ff5824830 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -29,8 +29,6 @@ static int			ls_count;
 static struct mutex		ls_lock;
 static struct list_head		lslist;
 static spinlock_t		lslist_lock;
-static struct task_struct *	scand_task;
-
 
 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 {
@@ -247,64 +245,6 @@ void dlm_lockspace_exit(void)
 	kset_unregister(dlm_kset);
 }
 
-static struct dlm_ls *find_ls_to_scan(void)
-{
-	struct dlm_ls *ls;
-
-	spin_lock_bh(&lslist_lock);
-	list_for_each_entry(ls, &lslist, ls_list) {
-		if (time_after_eq(jiffies, ls->ls_scan_time +
-					    dlm_config.ci_scan_secs * HZ)) {
-			atomic_inc(&ls->ls_count);
-			spin_unlock_bh(&lslist_lock);
-			return ls;
-		}
-	}
-	spin_unlock_bh(&lslist_lock);
-	return NULL;
-}
-
-static int dlm_scand(void *data)
-{
-	struct dlm_ls *ls;
-
-	while (!kthread_should_stop()) {
-		ls = find_ls_to_scan();
-		if (ls) {
-			if (dlm_lock_recovery_try(ls)) {
-				ls->ls_scan_time = jiffies;
-				dlm_scan_rsbs(ls);
-				dlm_unlock_recovery(ls);
-			} else {
-				ls->ls_scan_time += HZ;
-			}
-
-			dlm_put_lockspace(ls);
-			continue;
-		}
-		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
-	}
-	return 0;
-}
-
-static int dlm_scand_start(void)
-{
-	struct task_struct *p;
-	int error = 0;
-
-	p = kthread_run(dlm_scand, NULL, "dlm_scand");
-	if (IS_ERR(p))
-		error = PTR_ERR(p);
-	else
-		scand_task = p;
-	return error;
-}
-
-static void dlm_scand_stop(void)
-{
-	kthread_stop(scand_task);
-}
-
 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 {
 	struct dlm_ls *ls;
@@ -385,22 +325,9 @@ static int threads_start(void)
 
 	/* Thread for sending/receiving messages for all lockspace's */
 	error = dlm_midcomms_start();
-	if (error) {
+	if (error)
 		log_print("cannot start dlm midcomms %d", error);
-		goto fail;
-	}
 
-	error = dlm_scand_start();
-	if (error) {
-		log_print("cannot start dlm_scand thread %d", error);
-		goto midcomms_fail;
-	}
-
-	return 0;
-
- midcomms_fail:
-	dlm_midcomms_stop();
- fail:
 	return error;
 }
 
@@ -412,7 +339,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	struct dlm_ls *ls;
 	int do_unreg = 0;
 	int namelen = strlen(name);
-	int i, error;
+	int error;
 
 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 		return -EINVAL;
@@ -503,13 +430,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	if (error)
 		goto out_lsfree;
 
-	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
-		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
-						 GFP_KERNEL);
-		if (!ls->ls_remove_names[i])
-			goto out_rsbtbl;
-	}
-
 	idr_init(&ls->ls_lkbidr);
 	spin_lock_init(&ls->ls_lkbidr_spin);
 
@@ -661,9 +581,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	kfree(ls->ls_recover_buf);
  out_lkbidr:
 	idr_destroy(&ls->ls_lkbidr);
- out_rsbtbl:
-	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
-		kfree(ls->ls_remove_names[i]);
 	rhashtable_destroy(&ls->ls_rsbtbl);
  out_lsfree:
 	if (do_unreg)
@@ -696,7 +613,6 @@ static int __dlm_new_lockspace(const char *name, const char *cluster,
 	if (error > 0)
 		error = 0;
 	if (!ls_count) {
-		dlm_scand_stop();
 		dlm_midcomms_shutdown();
 		dlm_midcomms_stop();
 	}
@@ -771,13 +687,19 @@ static void rhash_free_rsb(void *ptr, void *arg)
 {
 	struct dlm_rsb *rsb = ptr;
 
+	/* be sure no timer callback is still
+	 * running before free rsb, it was killed before
+	 * but killing will not guarantee that the callback
+	 * is still queued/or actually in excution.
+	 */
+	timer_shutdown_sync(&rsb->res_toss_timer);
 	dlm_free_rsb(rsb);
 }
 
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	int i, busy, rv;
+	int busy, rv;
 
 	busy = lockspace_busy(ls, force);
 
@@ -812,8 +734,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	dlm_recoverd_stop(ls);
 
+	/* dummy all res_toss_timer handling */
+	down_write(&ls->ls_in_recovery);
+	set_bit(LSFL_TIMER_KILLED, &ls->ls_flags);
+	up_write(&ls->ls_in_recovery);
+
 	if (ls_count == 1) {
-		dlm_scand_stop();
 		dlm_clear_members(ls);
 		dlm_midcomms_shutdown();
 	}
@@ -839,9 +765,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
 
-	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
-		kfree(ls->ls_remove_names[i]);
-
 	while (!list_empty(&ls->ls_new_rsb)) {
 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 				       res_hashchain);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index d43189532b14..fb726dad3c53 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -881,18 +881,34 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
 
 void dlm_clear_toss(struct dlm_ls *ls)
 {
-	struct dlm_rsb *r, *safe;
 	unsigned int count = 0;
+	struct dlm_rsb *r;
+
+	while (1) {
+		spin_lock_bh(&ls->ls_rsbtbl_lock);
+		r = list_first_entry_or_null(&ls->ls_toss, struct dlm_rsb,
+					     res_rsbs_list);
+		if (!r) {
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
+			break;
+		}
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
+		rsb_set_flag(r, RSB_TIMER_KILLED);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+
+		/* Wait until all killed currently running timer
+		 * callbacks are done. Killed as RSB_TIMER_KILLED
+		 * is set and the timer callback does nothing
+		 * anymore and dlm_clear_toss() takes over.
+		 */
+		timer_shutdown_sync(&r->res_toss_timer);
+
 		free_toss_rsb(r);
 		count++;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (count)
 		log_rinfo(ls, "dlm_clear_toss %u done", count);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 8/9] dlm: likely read lock path for rsb lookup
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (6 preceding siblings ...)
  2024-04-10 13:48 ` [RFC dlm/next 7/9] dlm: drop scand kthread and use timers Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  2024-04-11 13:58   ` Alexander Aring
  2024-04-10 13:48 ` [RFC dlm/next 9/9] dlm: convert lkbidr to rwlock Alexander Aring
  8 siblings, 1 reply; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

As the conversion to rhashtable introduced a hash lock per lockspace
instead of doing a per bucket hash, this patch will change the hash lock
handling as holding only a read lock in a very likely hot path in DLM.

This hot path is to lookup rsbs and they are in keep state. Keep state
means that the RSB_TOSS flag isn't set. If this likely read lock path is
the case we don't need to hold the ls_rsbtbl_lock in write lock at all.
If it's on toss state holding the ls_rsbtbl_lock in write lock is
required. If we see that the rsb is in toss state, we hold the
ls_rsbtbl_lock in write state and relookup (because the rsb could get
removed into the transition from read to write state) and check if the
rsb is still in toss state. However this is an unlikely path as toss
rsbs are cached rsbs. Having the rsb in keep state is very likely.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |   4 +-
 fs/dlm/dir.c          |   4 +-
 fs/dlm/dlm_internal.h |   2 +-
 fs/dlm/lock.c         | 260 ++++++++++++++++++++++++++++++------------
 fs/dlm/lockspace.c    |   2 +-
 fs/dlm/recover.c      |   6 +-
 fs/dlm/recoverd.c     |   8 +-
 7 files changed, 202 insertions(+), 84 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 70567919f1b7..6ab3ed4074c6 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -413,7 +413,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 	else
 		list = &ls->ls_keep;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	return seq_list_start(list, *pos);
 }
 
@@ -434,7 +434,7 @@ static void table_seq_stop(struct seq_file *seq, void *iter_ptr)
 {
 	struct dlm_ls *ls = seq->private;
 
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static const struct seq_operations format1_seq_ops = {
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 9687f908476b..b1ab0adbd9d0 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -200,9 +200,9 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	struct dlm_rsb *r;
 	int rv;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 	if (!rv)
 		return r;
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 0ab76ed8685e..5714c073887d 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -586,7 +586,7 @@ struct dlm_ls {
 	spinlock_t		ls_lkbidr_spin;
 
 	struct rhashtable	ls_rsbtbl;
-	spinlock_t		ls_rsbtbl_lock;
+	rwlock_t		ls_rsbtbl_lock;
 
 	struct list_head	ls_toss;
 	struct list_head	ls_keep;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index b3f0b0445812..5cf94203b2db 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -342,15 +342,15 @@ void dlm_hold_rsb(struct dlm_rsb *r)
 
 /* TODO move this to lib/refcount.c */
 static __must_check bool
-dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
 __cond_acquires(lock)
 {
 	if (refcount_dec_not_one(r))
 		return false;
 
-	spin_lock_bh(lock);
+	write_lock_bh(lock);
 	if (!refcount_dec_and_test(r)) {
-		spin_unlock_bh(lock);
+		write_unlock_bh(lock);
 		return false;
 	}
 
@@ -358,11 +358,11 @@ __cond_acquires(lock)
 }
 
 /* TODO move this to include/linux/kref.h */
-static inline int dlm_kref_put_lock_bh(struct kref *kref,
-				       void (*release)(struct kref *kref),
-				       spinlock_t *lock)
+static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
+					     void (*release)(struct kref *kref),
+					     rwlock_t *lock)
 {
-	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
 		release(kref);
 		return 1;
 	}
@@ -378,10 +378,10 @@ static void put_rsb(struct dlm_rsb *r)
 	struct dlm_ls *ls = r->res_ls;
 	int rv;
 
-	rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
-				  &ls->ls_rsbtbl_lock);
+	rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
+					&ls->ls_rsbtbl_lock);
 	if (rv)
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -472,7 +472,7 @@ static void dlm_rsb_toss_timer(struct timer_list *timer)
 		return;
 	}
 
-	rv = spin_trylock(&ls->ls_rsbtbl_lock);
+	rv = write_trylock(&ls->ls_rsbtbl_lock);
 	if (!rv) {
 		dlm_unlock_recovery(ls);
 		/* rearm timer */
@@ -490,7 +490,7 @@ static void dlm_rsb_toss_timer(struct timer_list *timer)
 	 */
 	if (rsb_flag(r, RSB_TIMER_KILLED) ||
 	    timer_pending(&r->res_toss_timer)) {
-		spin_unlock(&ls->ls_rsbtbl_lock);
+		write_unlock(&ls->ls_rsbtbl_lock);
 		dlm_unlock_recovery(ls);
 		return;
 	}
@@ -502,7 +502,7 @@ static void dlm_rsb_toss_timer(struct timer_list *timer)
 	/* not necessary to held the ls_rsbtbl_lock when
 	 * calling send_remove() its gone from DLM now
 	 */
-	spin_unlock(&ls->ls_rsbtbl_lock);
+	write_unlock(&ls->ls_rsbtbl_lock);
 
 	/* no rsb in this state should ever run a timer */
 	WARN_ON(!dlm_no_directory(ls) &&
@@ -583,16 +583,8 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
 
 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-	int rv;
-
-	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
-				    dlm_rhash_rsb_params);
-	if (rv == -EEXIST) {
-		log_print("%s match", __func__);
-		dlm_dump_rsb(rsb);
-	}
-
-	return rv;
+	return rhashtable_insert_fast(rhash, &rsb->res_node,
+				      dlm_rhash_rsb_params);
 }
 
 /*
@@ -687,11 +679,15 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 			goto out;
 	}
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
 
+	/* check if the rsb is in keep state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (error)
+	if (error) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto do_new;
+	}
 	
 	/*
 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
@@ -701,7 +697,8 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		goto do_toss;
 
 	kref_get(&r->res_ref);
-	goto out_unlock;
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+	goto out;
 
 
  do_toss:
@@ -718,8 +715,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
 			  r->res_name);
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		error = -ENOTBLK;
-		goto out_unlock;
+		goto out;
 	}
 
 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
@@ -741,6 +739,23 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		r->res_first_lkid = 0;
 	}
 
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+
+	/* retry lookup under write lock to see if its still in toss state
+	 * if not it's in keep state and we relookup - unlikely path.
+	 */
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (!error) {
+		if (!rsb_flag(r, RSB_TOSS)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry_lookup;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
+
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
 	/* rsb got out of toss state, it becomes alive again
@@ -756,8 +771,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	 */
 	rsb_set_flag(r, RSB_TIMER_KILLED);
 	timer_delete(&r->res_toss_timer);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	goto out_unlock;
+	goto out;
 
 
  do_new:
@@ -766,15 +782,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	 */
 
 	if (error == -EBADR && !create)
-		goto out_unlock;
+		goto out;
 
 	error = get_rsb_struct(ls, name, len, &r);
-	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	if (error == -EAGAIN)
 		goto retry;
-	}
 	if (error)
-		goto out_unlock;
+		goto out;
 
 	r->res_hash = hash;
 	r->res_dir_nodeid = dir_nodeid;
@@ -796,7 +810,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		dlm_free_rsb(r);
 		r = NULL;
 		error = -ENOTBLK;
-		goto out_unlock;
+		goto out;
 	}
 
 	if (from_other) {
@@ -816,11 +830,20 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	}
 
  out_add:
+
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 	error = rsb_insert(r, &ls->ls_rsbtbl);
-	if (!error)
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry_lookup;
+	} else if (!error) {
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
- out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	}
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	*r_ret = r;
 	return error;
@@ -844,11 +867,15 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	if (error < 0)
 		goto out;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
 
+	/* check if the rsb is in keep state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (error)
+	if (error) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto do_new;
+	}
 
 	if (rsb_flag(r, RSB_TOSS))
 		goto do_toss;
@@ -858,7 +885,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	 */
 
 	kref_get(&r->res_ref);
-	goto out_unlock;
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+
+	goto out;
 
 
  do_toss:
@@ -874,8 +903,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
 		dlm_print_rsb(r);
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		error = -ENOTBLK;
-		goto out_unlock;
+		goto out;
 	}
 
 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
@@ -889,6 +919,24 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		r->res_nodeid = 0;
 	}
 
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+
+	/* retry lookup under write lock to see if its still in toss state
+	 * if not it's in keep state and we relookup - unlikely path.
+	 */
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (!error) {
+		if (!rsb_flag(r, RSB_TOSS)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry_lookup;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
+
+
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
 	/* rsb got out of toss state, it becomes alive again
@@ -904,8 +952,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	 */
 	rsb_set_flag(r, RSB_TIMER_KILLED);
 	timer_delete(&r->res_toss_timer);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	goto out_unlock;
+	goto out;
 
 
  do_new:
@@ -915,11 +964,10 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
-		goto out_unlock;
+		goto out;
 
 	r->res_hash = hash;
 	r->res_dir_nodeid = dir_nodeid;
@@ -927,11 +975,20 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 	error = rsb_insert(r, &ls->ls_rsbtbl);
-	if (!error)
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry_lookup;
+	} else if (!error) {
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
- out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	}
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
+
  out:
 	*r_ret = r;
 	return error;
@@ -1144,7 +1201,10 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error < 0)
 		return error;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
+
+	/* check if the rsb is in keep state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error) {
 		if (rsb_flag(r, RSB_TOSS))
@@ -1155,7 +1215,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		 */
 
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		lock_rsb(r);
 
 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1167,6 +1227,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 		return 0;
 	} else {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto not_found;
 	}
 
@@ -1178,20 +1239,39 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
 			    r_nodeid, result);
 
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+	/* unlikely path - relookup under write */
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+
+	/* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
+	 * check if the rsb is still in toss state, if not relookup
+	 */
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (!error) {
+		if (!rsb_flag(r, RSB_TOSS)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			/* something as changed, very unlikely but
+			 * try again
+			 */
+			goto retry_lookup;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto not_found;
+	}
+
 	rsb_mod_timer(ls, r);
 	/* the rsb was inactive (on toss list) */
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	return 0;
 
  not_found:
 	error = get_rsb_struct(ls, name, len, &r);
-	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	if (error == -EAGAIN)
 		goto retry;
-	}
 	if (error)
-		goto out_unlock;
+		goto out;
 
 	r->res_hash = hash;
 	r->res_dir_nodeid = our_nodeid;
@@ -1200,22 +1280,30 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	kref_init(&r->res_ref);
 	rsb_set_flag(r, RSB_TOSS);
 
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 	error = rsb_insert(r, &ls->ls_rsbtbl);
-	if (error) {
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry_lookup;
+	} else if (error) {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		/* should never happen */
 		dlm_free_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 
 	list_add(&r->res_rsbs_list, &ls->ls_toss);
 	rsb_mod_timer(ls, r);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (result)
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
- out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
 	return error;
 }
 
@@ -1223,12 +1311,12 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 {
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (r->res_hash == hash)
 			dlm_dump_rsb(r);
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
@@ -1236,14 +1324,14 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	struct dlm_rsb *r = NULL;
 	int error;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error)
 		goto out;
 
 	dlm_dump_rsb(r);
  out:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void toss_rsb(struct kref *kref)
@@ -1376,6 +1464,36 @@ static void kill_lkb(struct kref *kref)
 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 }
 
+/* TODO move this to lib/refcount.c */
+static __must_check bool
+dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+__cond_acquires(lock)
+{
+	if (refcount_dec_not_one(r))
+		return false;
+
+	spin_lock_bh(lock);
+	if (!refcount_dec_and_test(r)) {
+		spin_unlock_bh(lock);
+		return false;
+	}
+
+	return true;
+}
+
+/* TODO move this to include/linux/kref.h */
+static inline int dlm_kref_put_lock_bh(struct kref *kref,
+				       void (*release)(struct kref *kref),
+				       spinlock_t *lock)
+{
+	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+		release(kref);
+		return 1;
+	}
+
+	return 0;
+}
+
 /* __put_lkb() is used when an lkb may not have an rsb attached to
    it so we need to provide the lockspace explicitly */
 
@@ -4145,14 +4263,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	memset(name, 0, sizeof(name));
 	memcpy(name, ms->m_extra, len);
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (rv) {
 		/* should not happen */
 		log_error(ls, "%s from %d not found %s", __func__,
 			  from_nodeid, name);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -4162,14 +4280,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			log_error(ls, "receive_remove keep from %d master %d",
 				  from_nodeid, r->res_master_nodeid);
 			dlm_print_rsb(r);
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			return;
 		}
 
 		log_debug(ls, "receive_remove from %d master %d first %x %s",
 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
 			  name);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -4177,14 +4295,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		log_error(ls, "receive_remove toss from %d master %d",
 			  from_nodeid, r->res_master_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
 	list_del(&r->res_rsbs_list);
 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 			       dlm_rhash_rsb_params);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	free_toss_rsb(r);
 }
@@ -5252,7 +5370,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
 			continue;
@@ -5261,10 +5379,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
 			continue;
 		}
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		return r;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 	return NULL;
 }
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index df7ff5824830..07d47f3427bf 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -424,7 +424,7 @@ static int new_lockspace(const char *name, const char *cluster,
 
 	INIT_LIST_HEAD(&ls->ls_toss);
 	INIT_LIST_HEAD(&ls->ls_keep);
-	spin_lock_init(&ls->ls_rsbtbl_lock);
+	rwlock_init(&ls->ls_rsbtbl_lock);
 
 	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
 	if (error)
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index fb726dad3c53..d976c9eeed84 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -885,11 +885,11 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 
 	while (1) {
-		spin_lock_bh(&ls->ls_rsbtbl_lock);
+		write_lock_bh(&ls->ls_rsbtbl_lock);
 		r = list_first_entry_or_null(&ls->ls_toss, struct dlm_rsb,
 					     res_rsbs_list);
 		if (!r) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			break;
 		}
 
@@ -897,7 +897,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
 		rsb_set_flag(r, RSB_TIMER_KILLED);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 		/* Wait until all killed currently running timer
 		 * callbacks are done. Killed as RSB_TIMER_KILLED
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 5e8e10030b74..2a17570cae70 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -32,7 +32,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		goto out;
 	}
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (r->res_nodeid)
 			continue;
@@ -40,7 +40,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		list_add(&r->res_masters_list, &ls->ls_masters_list);
 		dlm_hold_rsb(r);
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	write_unlock_bh(&ls->ls_masters_lock);
 	return error;
@@ -62,14 +62,14 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		list_add(&r->res_root_list, root_list);
 		dlm_hold_rsb(r);
 	}
 
 	WARN_ON_ONCE(!list_empty(&ls->ls_toss));
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void dlm_release_root_list(struct list_head *root_list)
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [RFC dlm/next 9/9] dlm: convert lkbidr to rwlock
  2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (7 preceding siblings ...)
  2024-04-10 13:48 ` [RFC dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
@ 2024-04-10 13:48 ` Alexander Aring
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-10 13:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts lkbidr lock to rwlock. The most time we doing
lookups of idr only this can be done by holding the read lock only.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lock.c         | 44 +++++++------------------------------------
 fs/dlm/lockspace.c    |  6 +++---
 3 files changed, 11 insertions(+), 41 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 5714c073887d..04b5e6b7b8ce 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -583,7 +583,7 @@ struct dlm_ls {
 	struct kobject		ls_kobj;
 
 	struct idr		ls_lkbidr;
-	spinlock_t		ls_lkbidr_spin;
+	rwlock_t		ls_lkbidr_lock;
 
 	struct rhashtable	ls_rsbtbl;
 	rwlock_t		ls_rsbtbl_lock;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 5cf94203b2db..93302c5be4a7 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1419,11 +1419,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 
-	spin_lock_bh(&ls->ls_lkbidr_spin);
+	write_lock_bh(&ls->ls_lkbidr_lock);
 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
 	if (rv >= 0)
 		lkb->lkb_id = rv;
-	spin_unlock_bh(&ls->ls_lkbidr_spin);
+	write_unlock_bh(&ls->ls_lkbidr_lock);
 
 	if (rv < 0) {
 		log_error(ls, "create_lkb idr error %d", rv);
@@ -1444,11 +1444,11 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 {
 	struct dlm_lkb *lkb;
 
-	spin_lock_bh(&ls->ls_lkbidr_spin);
+	read_lock_bh(&ls->ls_lkbidr_lock);
 	lkb = idr_find(&ls->ls_lkbidr, lkid);
 	if (lkb)
 		kref_get(&lkb->lkb_ref);
-	spin_unlock_bh(&ls->ls_lkbidr_spin);
+	read_unlock_bh(&ls->ls_lkbidr_lock);
 
 	*lkb_ret = lkb;
 	return lkb ? 0 : -ENOENT;
@@ -1464,36 +1464,6 @@ static void kill_lkb(struct kref *kref)
 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 }
 
-/* TODO move this to lib/refcount.c */
-static __must_check bool
-dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
-__cond_acquires(lock)
-{
-	if (refcount_dec_not_one(r))
-		return false;
-
-	spin_lock_bh(lock);
-	if (!refcount_dec_and_test(r)) {
-		spin_unlock_bh(lock);
-		return false;
-	}
-
-	return true;
-}
-
-/* TODO move this to include/linux/kref.h */
-static inline int dlm_kref_put_lock_bh(struct kref *kref,
-				       void (*release)(struct kref *kref),
-				       spinlock_t *lock)
-{
-	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
-		release(kref);
-		return 1;
-	}
-
-	return 0;
-}
-
 /* __put_lkb() is used when an lkb may not have an rsb attached to
    it so we need to provide the lockspace explicitly */
 
@@ -1502,11 +1472,11 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 	uint32_t lkid = lkb->lkb_id;
 	int rv;
 
-	rv = dlm_kref_put_lock_bh(&lkb->lkb_ref, kill_lkb,
-				  &ls->ls_lkbidr_spin);
+	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
+					&ls->ls_lkbidr_lock);
 	if (rv) {
 		idr_remove(&ls->ls_lkbidr, lkid);
-		spin_unlock_bh(&ls->ls_lkbidr_spin);
+		write_unlock_bh(&ls->ls_lkbidr_lock);
 
 		detach_lkb(lkb);
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 07d47f3427bf..58f9320858a2 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -431,7 +431,7 @@ static int new_lockspace(const char *name, const char *cluster,
 		goto out_lsfree;
 
 	idr_init(&ls->ls_lkbidr);
-	spin_lock_init(&ls->ls_lkbidr_spin);
+	rwlock_init(&ls->ls_lkbidr_lock);
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
 	spin_lock_init(&ls->ls_waiters_lock);
@@ -671,7 +671,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 {
 	int rv;
 
-	spin_lock_bh(&ls->ls_lkbidr_spin);
+	read_lock_bh(&ls->ls_lkbidr_lock);
 	if (force == 0) {
 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 	} else if (force == 1) {
@@ -679,7 +679,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 	} else {
 		rv = 0;
 	}
-	spin_unlock_bh(&ls->ls_lkbidr_spin);
+	read_unlock_bh(&ls->ls_lkbidr_lock);
 	return rv;
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* Re: [RFC dlm/next 8/9] dlm: likely read lock path for rsb lookup
  2024-04-10 13:48 ` [RFC dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
@ 2024-04-11 13:58   ` Alexander Aring
  0 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-11 13:58 UTC (permalink / raw
  To: teigland; +Cc: gfs2

Hi,

On Wed, Apr 10, 2024 at 9:49 AM Alexander Aring <aahringo@redhat.com> wrote:
>
> As the conversion to rhashtable introduced a hash lock per lockspace
> instead of doing a per bucket hash, this patch will change the hash lock
> handling as holding only a read lock in a very likely hot path in DLM.
>
> This hot path is to lookup rsbs and they are in keep state. Keep state
> means that the RSB_TOSS flag isn't set. If this likely read lock path is
> the case we don't need to hold the ls_rsbtbl_lock in write lock at all.
> If it's on toss state holding the ls_rsbtbl_lock in write lock is
> required. If we see that the rsb is in toss state, we hold the
> ls_rsbtbl_lock in write state and relookup (because the rsb could get
> removed into the transition from read to write state) and check if the
> rsb is still in toss state. However this is an unlikely path as toss
> rsbs are cached rsbs. Having the rsb in keep state is very likely.
>
...
> @@ -858,7 +885,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
>          */
>
>         kref_get(&r->res_ref);
> -       goto out_unlock;
> +       read_unlock_bh(&ls->ls_rsbtbl_lock);
> +
> +       goto out;
>
>
>   do_toss:

we need to start to get the write lock here and then do the relookup
and test if the state has been changed (again do_toss is an unlikely
path as the first lookup that can be done under read lock only).
The reason is that there can be a remove and add again and we check on
some fields in the toss path that can be different then, the whole
do_toss path needs to be under write lock.

It makes more sense that the first lookup is in toss state and drops
the read lock, in do_toss we take the write lock and check if the
lookup and the states are still the same under this condition, if so
we go ahead with the toss lookup path under the write lock.

Same for the other lookup functions that look for the rsb being in
toss state and bring it to the keep state (unlikely case), because it
is mostly on the keep list that can be done under read lock.

- Alex


^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [RFC dlm/next 7/9] dlm: drop scand kthread and use timers
  2024-04-10 13:48 ` [RFC dlm/next 7/9] dlm: drop scand kthread and use timers Alexander Aring
@ 2024-04-11 14:00   ` Alexander Aring
  0 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-11 14:00 UTC (permalink / raw
  To: teigland; +Cc: gfs2

Hi,

On Wed, Apr 10, 2024 at 9:49 AM Alexander Aring <aahringo@redhat.com> wrote:
>
> Currently the scand kthread acts like a garbage collection for expired
> rsbs on toss list to clean them up after a certain timeout. It triggers
> every couple of seconds and iterates over the toss list while holding
> ls_rsbtbl_lock for the whole hash bucket iteration. To reduce the amount
> of holding the ls_rsbtbl_lock time we handle free of cached rsbs (tossed
> rsbs) on a per rsb timer. If it expires the rsb deletes itseld out of
> rsb data structures. Addtional the timer can run on any cpu instead of
> scand that was only running on one cpu, so there should be a performance
> speedup handling such freeing of resources.
>

I currently try to work on a per lockspace timer that gets rearmed
when the next rsb is expired instead of spamming the timer subsystems
with a lot of timers.

That has also the advantage to simply shutdown the per ls timer,
instead iterating over all timers which is done by some flags in this
version.

- Alex


^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2024-04-11 14:00 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-04-10 13:48 [RFC dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 2/9] dlm: change to non per bucket hashtable lock Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 3/9] dlm: merge toss and keep hash into one Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 5/9] dlm: switch to use rhashtable for rsbs Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 6/9] dlm: remove refcounting if rsb is on toss Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 7/9] dlm: drop scand kthread and use timers Alexander Aring
2024-04-11 14:00   ` Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
2024-04-11 13:58   ` Alexander Aring
2024-04-10 13:48 ` [RFC dlm/next 9/9] dlm: convert lkbidr to rwlock Alexander Aring

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).