gfs2.lists.linux.dev archive mirror
 help / color / mirror / Atom feed
* [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup
@ 2024-04-12 13:43 Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
                   ` (8 more replies)
  0 siblings, 9 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Hi,

this is a patch series for an immense change in DLM rsb hashtable logic.
It removes the double lookup functionality for rsb hashtables, convert to
rhashtable instead of own bucket hlist hash implementation.

At first there is a fix for scand that I detected while I was
implementing this patch series. It could be that remove messages are
still send when the lockspace is releasing the resource that could occur
into a use after free.

There is a conversion to use lists (keep/toss lists) instead of
iterating over the hash bucket. As we do a transition to rhashtable,
they don't like to being iterated regarding to their own bucket sizing
implementation that is sitting in the rhashtable implementation. We just
introduce the lists to do the iteration, as advantage we have a huge
reduce of code in the debugfs dump functionality as we use the dump list
helpers of debugfs. There is also a potential refcount bug when holding
rsb references of rsbs in toss state as receive remove message requires
no rsb references being hold. Another issue is also holding rsb in keep
state as they are not going into toss state when they required to.

It is now forbidden to hold references of rsbs in toss state. The
refcounter must be only functional in rsb keep state. That hopefully
will show is more invalid usage of the rsb refcounter if the rsb is in
toss state.

The scand was being fixed but now also it's removed. The scand process
was holding the hashtable/hash bucket lock for a longer timer because it
iterated over the whole hash. We use timers now to reduce the held time
of the hashtable lock. We introduce a per lockspace toss queue with tossed
timer rsb and the first item is the earliest rsb that will be expired by
the timer vice versa the last item. This makes it easy to change the
timer expiration to the next one in the queue.

The last two patches we move very likely lookup hotpath to read lock
mostly. This should for sure avoid contention in the most cases. Unlikely
path need to still hold the write lock and do some extra relookup and check
if the state of an rsb changed. However I think we hit over 90% the likely
path that we only need to hold the read lock and avoid contention between
processing DLM messages and the user triggers new DLM requests.

- Alex

changes since v2:
 - hold the write_lock in find_rsb_dir/nodir when hitting the do_toss
   path and then do the lookup and check on the do_toss rsb fields
 - move from a per rsb timer to a per lockspace timer and introduce
   a per rsb toss queue.

Alexander Aring (9):
  dlm: increment ls_count on find_ls_to_scan()
  dlm: change to non per bucket hashtable lock
  dlm: merge toss and keep hash into one
  dlm: fix avoid rsb hold during debugfs dump
  dlm: switch to use rhashtable for rsbs
  dlm: remove refcounting if rsb is on toss
  dlm: drop scand kthread and use timers
  dlm: likely read lock path for rsb lookup
  dlm: convert lkbidr to rwlock

 fs/dlm/config.c       |   8 +
 fs/dlm/config.h       |   2 +
 fs/dlm/debug_fs.c     | 212 ++---------
 fs/dlm/dir.c          |  14 +-
 fs/dlm/dlm_internal.h |  40 +-
 fs/dlm/lock.c         | 836 +++++++++++++++++++++++-------------------
 fs/dlm/lock.h         |   4 +-
 fs/dlm/lockspace.c    | 151 ++------
 fs/dlm/member.c       |   2 +
 fs/dlm/recover.c      |  29 +-
 fs/dlm/recoverd.c     |  54 +--
 11 files changed, 615 insertions(+), 737 deletions(-)

-- 
2.43.0


^ permalink raw reply	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan()
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 2/9] dlm: change to non per bucket hashtable lock Alexander Aring
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch increments the ls_count atomic value that is used as a
reference counter in find_ls_to_scan(). An additional
dlm_put_lockspace() is necessary in dlm_scand() to drop this reference
again.

This handling is necessary because release_lockspace() calls
remove_lockspace() that removes the lockspace from the global lockspace
list. It only does that when the lockspace is not in use anymore
signaled by the menitoned ls_count atomic value. Currently dlm_scand()
can be executed when release_lockspace() is freeing all lockspace
resources. As example:

1. scand timeout occurs
2. find_ls_to_scan() returns ls A - lslist_lock is not held anymore
3. dlm_scan_rsbs(A) does its job
4. release_lockspace(A, 2) is called by the user. User has no control of
   dlm_scan_rsbs(A)
5. lockspace_busy(A, 2) returns 0, it's not busy
6. dlm_scan_rsbs(A) iterates over the whole rsb hash bucket, sometimes
   triggers send_remove()
7. release_lockspace(A, 2) does not stop scand because ls_count was >= 2
8. remove_lockspace() will ignore ongoing scand because scand does not
   hold a reference as ls_count is intended to do so
9. release_lockspace(A, 2) cleans up everything while 6. can occur

Point 8 is a problem because both access the same resources and
release_lockspace() frees them.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lockspace.c | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index c3681a50decb..731c48371a27 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -255,6 +255,7 @@ static struct dlm_ls *find_ls_to_scan(void)
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (time_after_eq(jiffies, ls->ls_scan_time +
 					    dlm_config.ci_scan_secs * HZ)) {
+			atomic_inc(&ls->ls_count);
 			spin_unlock_bh(&lslist_lock);
 			return ls;
 		}
@@ -277,6 +278,8 @@ static int dlm_scand(void *data)
 			} else {
 				ls->ls_scan_time += HZ;
 			}
+
+			dlm_put_lockspace(ls);
 			continue;
 		}
 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 2/9] dlm: change to non per bucket hashtable lock
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 3/9] dlm: merge toss and keep hash into one Alexander Aring
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch prepares to switch to use the resizable, scalable, concurrent
hash table (rhashtable) implementation instead of having a own per
bucket implementation. The rhashtable implementation deals with the
buckets internally for that reason we use a per lockspace rhashtable
lock to protect mutual rhashtable access. This might reduce the
perfromance of the hash, depends on how rhashtable lookup performs to
the current own DLM hash bucket implementation. However we switch to the
rhashtable implementation and a potential lockless access can be part of
future changes.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     | 24 +++++++-------
 fs/dlm/dir.c          |  4 +--
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lock.c         | 77 +++++++++++++++++++++----------------------
 fs/dlm/lockspace.c    |  2 +-
 fs/dlm/recover.c      |  4 +--
 fs/dlm/recoverd.c     |  8 ++---
 7 files changed, 60 insertions(+), 61 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index cba5514688ee..b8234eba5e34 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -452,7 +452,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 
 	tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	if (!RB_EMPTY_ROOT(tree)) {
 		for (node = rb_first(tree); node; node = rb_next(node)) {
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
@@ -460,12 +460,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 				dlm_hold_rsb(r);
 				ri->rsb = r;
 				ri->bucket = bucket;
-				spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+				spin_unlock_bh(&ls->ls_rsbtbl_lock);
 				return ri;
 			}
 		}
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	/*
 	 * move to the first rsb in the next non-empty bucket
@@ -484,18 +484,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			node = rb_first(tree);
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	}
 }
 
@@ -516,7 +516,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 	 * move to the next rsb in the same bucket
 	 */
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	rp = ri->rsb;
 	next = rb_next(&rp->res_hashnode);
 
@@ -524,12 +524,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_put_rsb(rp);
 		++*pos;
 		return ri;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	dlm_put_rsb(rp);
 
 	/*
@@ -550,18 +550,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			next = rb_first(tree);
 			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	}
 }
 
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index ff3a51c759b5..5315f4f46cc7 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -204,12 +204,12 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	hash = jhash(name, len, 0);
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
 	if (rv)
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
 					 name, len, &r);
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (!rv)
 		return r;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 269c12e0824f..2c961db53b27 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -105,7 +105,6 @@ do { \
 struct dlm_rsbtable {
 	struct rb_root		keep;
 	struct rb_root		toss;
-	spinlock_t		lock;
 	unsigned long		flags;
 };
 
@@ -593,6 +592,7 @@ struct dlm_ls {
 	spinlock_t		ls_lkbidr_spin;
 
 	struct dlm_rsbtable	*ls_rsbtbl;
+	spinlock_t		ls_rsbtbl_lock;
 	uint32_t		ls_rsbtbl_size;
 
 	spinlock_t		ls_waiters_lock;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 4ff4ef2a5f87..af57d9d12434 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -369,13 +369,12 @@ static inline int dlm_kref_put_lock_bh(struct kref *kref,
 static void put_rsb(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
-	uint32_t bucket = r->res_bucket;
 	int rv;
 
 	rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
-				  &ls->ls_rsbtbl[bucket].lock);
+				  &ls->ls_rsbtbl_lock);
 	if (rv)
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -615,7 +614,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 			goto out;
 	}
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -685,7 +684,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
@@ -734,7 +733,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
  out_add:
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	*r_ret = r;
 	return error;
@@ -759,7 +758,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	if (error < 0)
 		goto out;
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -817,7 +816,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
@@ -832,7 +831,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	*r_ret = r;
 	return error;
@@ -1049,7 +1048,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error < 0)
 		return error;
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error) {
 		/* because the rsb is active, we need to lock_rsb before
@@ -1057,7 +1056,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		 */
 
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		lock_rsb(r);
 
 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1083,14 +1082,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 	r->res_toss_time = jiffies;
 	/* the rsb was inactive (on toss list) */
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	return 0;
 
  not_found:
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
@@ -1108,7 +1107,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 
@@ -1116,7 +1115,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
  out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	return error;
 }
 
@@ -1126,15 +1125,15 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 	struct dlm_rsb *r;
 	int i;
 
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (r->res_hash == hash)
 				dlm_dump_rsb(r);
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
@@ -1146,7 +1145,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error)
 		goto out_dump;
@@ -1157,7 +1156,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
  out_dump:
 	dlm_dump_rsb(r);
  out:
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void toss_rsb(struct kref *kref)
@@ -1621,10 +1620,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -1681,7 +1680,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
 	else
 		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
-	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	/*
 	 * While searching for rsb's to free, we found some that require
@@ -1696,16 +1695,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		name = ls->ls_remove_names[i];
 		len = ls->ls_remove_lens[i];
 
-		spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 		if (rv) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name not toss %s", name);
 			continue;
 		}
 
 		if (r->res_master_nodeid != our_nodeid) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name master %d dir %d our %d %s",
 				  r->res_master_nodeid, r->res_dir_nodeid,
 				  our_nodeid, name);
@@ -1714,7 +1713,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (r->res_dir_nodeid == our_nodeid) {
 			/* should never happen */
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_error(ls, "remove_name dir %d master %d our %d %s",
 				  r->res_dir_nodeid, r->res_master_nodeid,
 				  our_nodeid, name);
@@ -1723,21 +1722,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (!time_after_eq(jiffies, r->res_toss_time +
 				   dlm_config.ci_toss_secs * HZ)) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name toss_time %lu now %lu %s",
 				  r->res_toss_time, jiffies, name);
 			continue;
 		}
 
 		if (!kref_put(&r->res_ref, kill_rsb)) {
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_error(ls, "remove_name in use %s", name);
 			continue;
 		}
 
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 		send_remove(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 		dlm_free_rsb(r);
 	}
@@ -4201,7 +4200,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 	if (rv) {
@@ -4211,7 +4210,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			/* should not happen */
 			log_error(ls, "receive_remove from %d not found %s",
 				  from_nodeid, name);
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			return;
 		}
 		if (r->res_master_nodeid != from_nodeid) {
@@ -4219,14 +4218,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			log_error(ls, "receive_remove keep from %d master %d",
 				  from_nodeid, r->res_master_nodeid);
 			dlm_print_rsb(r);
-			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			return;
 		}
 
 		log_debug(ls, "receive_remove from %d master %d first %x %s",
 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
 			  name);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -4234,19 +4233,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		log_error(ls, "receive_remove toss from %d master %d",
 			  from_nodeid, r->res_master_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
 	} else {
 		log_error(ls, "receive_remove from %d rsb ref error",
 			  from_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	}
 }
 
@@ -5314,7 +5313,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 	struct rb_node *n;
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 
@@ -5325,10 +5324,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 			continue;
 		}
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return r;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	return NULL;
 }
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 731c48371a27..d33dbcd5f4a1 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -495,6 +495,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	 */
 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
 
+	spin_lock_init(&ls->ls_rsbtbl_lock);
 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
 	ls->ls_rsbtbl_size = size;
 
@@ -504,7 +505,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	for (i = 0; i < size; i++) {
 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
-		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 13bc845fa305..9a4c8e4b2442 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -886,8 +886,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	unsigned int count = 0;
 	int i;
 
+	spin_lock(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 			next = rb_next(n);
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -895,8 +895,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
 			dlm_free_rsb(r);
 			count++;
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (count)
 		log_rinfo(ls, "dlm_clear_toss %u done", count);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index c82cc48988c6..fa6608363302 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -33,8 +33,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		goto out;
 	}
 
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (r->res_nodeid)
@@ -43,8 +43,8 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 			list_add(&r->res_masters_list, &ls->ls_masters_list);
 			dlm_hold_rsb(r);
 		}
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	write_unlock_bh(&ls->ls_masters_lock);
 	return error;
@@ -68,8 +68,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 	struct dlm_rsb *r;
 	int i;
 
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, root_list);
@@ -78,8 +78,8 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 
 		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
 			log_error(ls, "%s toss not empty", __func__);
-		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void dlm_release_root_list(struct list_head *root_list)
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 3/9] dlm: merge toss and keep hash into one
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 2/9] dlm: change to non per bucket hashtable lock Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump Alexander Aring
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

There are several places in current DLM that can end in two lookups,
e.g. first look into the keep hash, then into the toss hash if its not
in the keep hash. However there can't be never the same rsb into both of
these hashs. This patch introduce a new rsb state flag "RSB_TOSS" that
signals if the rsb would be on the toss hash or not, which then would be
on the keep hash. Instead of those multiple lookups there is only one
now that reduce the amount of hash lookups in cases where the rsb wasn't
on keep hash. The move of an rsb from rsb toss to keep hash or vice versa
need to be done automatically anyway.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |  29 ++++++++++--
 fs/dlm/dir.c          |   6 +--
 fs/dlm/dlm_internal.h |   4 +-
 fs/dlm/lock.c         | 103 ++++++++++++++++++++++--------------------
 fs/dlm/lockspace.c    |  13 ++----
 fs/dlm/recover.c      |   7 ++-
 fs/dlm/recoverd.c     |  12 ++---
 7 files changed, 98 insertions(+), 76 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index b8234eba5e34..37f4dfca5e44 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -450,12 +450,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 	if (seq->op == &format4_seq_ops)
 		ri->format = 4;
 
-	tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+	tree = &ls->ls_rsbtbl[bucket].r;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	if (!RB_EMPTY_ROOT(tree)) {
 		for (node = rb_first(tree); node; node = rb_next(node)) {
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
+			if (toss) {
+				if (!rsb_flag(r, RSB_TOSS))
+					continue;
+			} else {
+				if (rsb_flag(r, RSB_TOSS))
+					continue;
+			}
+
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -482,12 +490,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 			kfree(ri);
 			return NULL;
 		}
-		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+		tree = &ls->ls_rsbtbl[bucket].r;
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			node = rb_first(tree);
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
+			if (toss) {
+				if (!rsb_flag(r, RSB_TOSS))
+					continue;
+			} else {
+				if (rsb_flag(r, RSB_TOSS))
+					continue;
+			}
+
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -548,12 +564,19 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 			++*pos;
 			return NULL;
 		}
-		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+		tree = &ls->ls_rsbtbl[bucket].r;
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			next = rb_first(tree);
 			r = rb_entry(next, struct dlm_rsb, res_hashnode);
+			if (toss) {
+				if (!rsb_flag(r, RSB_TOSS))
+					continue;
+			} else {
+				if (rsb_flag(r, RSB_TOSS))
+					continue;
+			}
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 5315f4f46cc7..f8039f3ee2d1 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -205,12 +205,8 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
-	if (rv)
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
-					 name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].r, name, len, &r);
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
 	if (!rv)
 		return r;
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 2c961db53b27..af88fc2f978c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -103,8 +103,7 @@ do { \
 #define DLM_RTF_SHRINK_BIT	0
 
 struct dlm_rsbtable {
-	struct rb_root		keep;
-	struct rb_root		toss;
+	struct rb_root          r;
 	unsigned long		flags;
 };
 
@@ -376,6 +375,7 @@ enum rsb_flags {
 	RSB_RECOVER_CONVERT,
 	RSB_RECOVER_GRANT,
 	RSB_RECOVER_LVB_INVAL,
+	RSB_TOSS,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index af57d9d12434..08ec1a04476a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -616,23 +616,22 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (error)
-		goto do_toss;
+		goto do_new;
 	
 	/*
 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
 	 */
 
+	if (rsb_flag(r, RSB_TOSS))
+		goto do_toss;
+
 	kref_get(&r->res_ref);
 	goto out_unlock;
 
 
  do_toss:
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
-		goto do_new;
-
 	/*
 	 * rsb found inactive (master_nodeid may be out of date unless
 	 * we are the dir_nodeid or were the master)  No other thread
@@ -669,8 +668,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		r->res_first_lkid = 0;
 	}
 
-	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
 
@@ -731,7 +729,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	}
 
  out_add:
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -760,8 +758,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (error)
+		goto do_new;
+
+	if (rsb_flag(r, RSB_TOSS))
 		goto do_toss;
 
 	/*
@@ -773,10 +774,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 
  do_toss:
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
-		goto do_new;
-
 	/*
 	 * rsb found inactive. No other thread is using this rsb because
 	 * it's on the toss list, so we can look at or update
@@ -804,8 +801,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		r->res_nodeid = 0;
 	}
 
-	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
 
@@ -829,7 +825,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -1049,8 +1045,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		return error;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (!error) {
+		if (rsb_flag(r, RSB_TOSS))
+			goto do_toss;
+
 		/* because the rsb is active, we need to lock_rsb before
 		 * checking/changing re_master_nodeid
 		 */
@@ -1067,12 +1066,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		put_rsb(r);
 
 		return 0;
-	}
-
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
+	} else {
 		goto not_found;
+	}
 
+ do_toss:
 	/* because the rsb is inactive (on toss list), it's not refcounted
 	 * and lock_rsb is not used, but is protected by the rsbtbl lock
 	 */
@@ -1102,8 +1100,9 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_nodeid = from_nodeid;
 	kref_init(&r->res_ref);
 	r->res_toss_time = jiffies;
+	rsb_set_flag(r, RSB_TOSS);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
@@ -1127,8 +1126,11 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (rsb_flag(r, RSB_TOSS))
+				continue;
+
 			if (r->res_hash == hash)
 				dlm_dump_rsb(r);
 		}
@@ -1146,14 +1148,10 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (!error)
-		goto out_dump;
-
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (error)
 		goto out;
- out_dump:
+
 	dlm_dump_rsb(r);
  out:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1166,8 +1164,8 @@ static void toss_rsb(struct kref *kref)
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
-	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
+	WARN_ON(rsb_flag(r, RSB_TOSS));
+	rsb_set_flag(r, RSB_TOSS);
 	r->res_toss_time = jiffies;
 	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
 	if (r->res_lvbptr) {
@@ -1627,9 +1625,11 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		return;
 	}
 
-	for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+	for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
 		next = rb_next(n);
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
+		if (!rsb_flag(r, RSB_TOSS))
+			continue;
 
 		/* If we're the directory record for this rsb, and
 		   we're not the master of it, then we need to wait
@@ -1672,7 +1672,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		dlm_free_rsb(r);
 	}
 
@@ -1696,8 +1696,14 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		len = ls->ls_remove_lens[i];
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 		if (rv) {
+			spin_unlock_bh(&ls->ls_rsbtbl_lock);
+			log_error(ls, "remove_name not found %s", name);
+			continue;
+		}
+
+		if (!rsb_flag(r, RSB_TOSS)) {
 			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_debug(ls, "remove_name not toss %s", name);
 			continue;
@@ -1734,7 +1740,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
@@ -4202,17 +4208,16 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
 	if (rv) {
-		/* verify the rsb is on keep list per comment above */
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
-		if (rv) {
-			/* should not happen */
-			log_error(ls, "receive_remove from %d not found %s",
-				  from_nodeid, name);
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			return;
-		}
+		/* should not happen */
+		log_error(ls, "%s from %d not found %s", __func__,
+			  from_nodeid, name);
+		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		return;
+	}
+
+	if (!rsb_flag(r, RSB_TOSS)) {
 		if (r->res_master_nodeid != from_nodeid) {
 			/* should not happen */
 			log_error(ls, "receive_remove keep from %d master %d",
@@ -4238,7 +4243,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
 	} else {
@@ -5314,8 +5319,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 	struct dlm_rsb *r;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
+	for (n = rb_first(&ls->ls_rsbtbl[bucket].r); n; n = rb_next(n)) {
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
+		if (rsb_flag(r, RSB_TOSS))
+			continue;
 
 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
 			continue;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index d33dbcd5f4a1..b5184ad550fa 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -503,8 +503,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		ls->ls_rsbtbl[i].keep.rb_node = NULL;
-		ls->ls_rsbtbl[i].toss.rb_node = NULL;
+		ls->ls_rsbtbl[i].r.rb_node = NULL;
 	}
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
@@ -837,15 +836,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+		while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			rb_erase(n, &ls->ls_rsbtbl[i].keep);
-			dlm_free_rsb(rsb);
-		}
-
-		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
-			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			rb_erase(n, &ls->ls_rsbtbl[i].toss);
+			rb_erase(n, &ls->ls_rsbtbl[i].r);
 			dlm_free_rsb(rsb);
 		}
 	}
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 9a4c8e4b2442..e53d88e4ec93 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -888,10 +888,13 @@ void dlm_clear_toss(struct dlm_ls *ls)
 
 	spin_lock(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = next) {
 			next = rb_next(n);
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			rb_erase(n, &ls->ls_rsbtbl[i].toss);
+			if (!rsb_flag(r, RSB_TOSS))
+				continue;
+
+			rb_erase(n, &ls->ls_rsbtbl[i].r);
 			dlm_free_rsb(r);
 			count++;
 		}
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index fa6608363302..ad696528ebe7 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -35,9 +35,9 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (r->res_nodeid)
+			if (rsb_flag(r, RSB_TOSS) || r->res_nodeid)
 				continue;
 
 			list_add(&r->res_masters_list, &ls->ls_masters_list);
@@ -70,14 +70,14 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (WARN_ON_ONCE(rsb_flag(r, RSB_TOSS)))
+				continue;
+
 			list_add(&r->res_root_list, root_list);
 			dlm_hold_rsb(r);
 		}
-
-		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
-			log_error(ls, "%s toss not empty", __func__);
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (2 preceding siblings ...)
  2024-04-12 13:43 ` [PATCH dlm/next 3/9] dlm: merge toss and keep hash into one Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 5/9] dlm: switch to use rhashtable for rsbs Alexander Aring
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Currently the debugfs calls dlm_hold_rsb() while iterating the
ls_rsbtbl hash and release the ls_rsbtbl_lock during dumping. We can't
do that in every situation because some rsbs don't get into the toss
state when they should be on the toss state e.g. when receive_remove()
is being called. To avoid this problem we don't use the hash as
iteratable datastructure. Moving to rhashtable implementation it should
be avoided to use the rhashtable to iterate over it due dynamic rehash
background operations that results into a whole new iteration of the
rhashtable datastructure. Instead of iterating over the hash we
introduce two lists ls_toss and ls_keep as DLM cares about the RSB_TOSS
state if iterate over all rsbs. On ls_toss are only rsbs that are on
RSB_TOSS state and on ls_keep the rsbs they are not in RSB_TOSS state.
There are only few situation to bring a rsb into toss state and back
which will be just a list_move() between those datastructures protected
by the ls_rsbtbl_lock to keep in sync with the hashtable and RSB_TOSS flag.

The advantage in debugfs is that we can simple use the predefined
debugfs dump functionality for linux lists. Instead of calling
dlm_hold_rsb() while dumping rsbs, we hold the ls_rsbtbl_lock the whole
debugfs dump.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     | 233 +++++++-----------------------------------
 fs/dlm/dlm_internal.h |   4 +
 fs/dlm/lock.c         |  47 ++++-----
 fs/dlm/lockspace.c    |   3 +
 fs/dlm/recover.c      |  24 ++---
 fs/dlm/recoverd.c     |  34 +++---
 6 files changed, 84 insertions(+), 261 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 37f4dfca5e44..70567919f1b7 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -366,12 +366,10 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s)
 	unlock_rsb(r);
 }
 
-struct rsbtbl_iter {
-	struct dlm_rsb *rsb;
-	unsigned bucket;
-	int format;
-	int header;
-};
+static const struct seq_operations format1_seq_ops;
+static const struct seq_operations format2_seq_ops;
+static const struct seq_operations format3_seq_ops;
+static const struct seq_operations format4_seq_ops;
 
 /*
  * If the buffer is full, seq_printf can be called again, but it
@@ -382,220 +380,61 @@ struct rsbtbl_iter {
 
 static int table_seq_show(struct seq_file *seq, void *iter_ptr)
 {
-	struct rsbtbl_iter *ri = iter_ptr;
-
-	switch (ri->format) {
-	case 1:
-		print_format1(ri->rsb, seq);
-		break;
-	case 2:
-		if (ri->header) {
-			seq_puts(seq, "id nodeid remid pid xid exflags flags sts grmode rqmode time_ms r_nodeid r_len r_name\n");
-			ri->header = 0;
-		}
-		print_format2(ri->rsb, seq);
-		break;
-	case 3:
-		if (ri->header) {
-			seq_puts(seq, "rsb ptr nodeid first_lkid flags !root_list_empty !recover_list_empty recover_locks_count len\n");
-			ri->header = 0;
-		}
-		print_format3(ri->rsb, seq);
-		break;
-	case 4:
-		if (ri->header) {
-			seq_puts(seq, "rsb ptr nodeid master_nodeid dir_nodeid our_nodeid toss_time flags len str|hex name\n");
-			ri->header = 0;
-		}
-		print_format4(ri->rsb, seq);
-		break;
-	}
+	struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_rsbs_list);
+
+	if (seq->op == &format1_seq_ops)
+		print_format1(rsb, seq);
+	else if (seq->op == &format2_seq_ops)
+		print_format2(rsb, seq);
+	else if (seq->op == &format3_seq_ops)
+		print_format3(rsb, seq);
+	else if (seq->op == &format4_seq_ops)
+		print_format4(rsb, seq);
 
 	return 0;
 }
 
-static const struct seq_operations format1_seq_ops;
-static const struct seq_operations format2_seq_ops;
-static const struct seq_operations format3_seq_ops;
-static const struct seq_operations format4_seq_ops;
-
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
-	struct rb_root *tree;
-	struct rb_node *node;
 	struct dlm_ls *ls = seq->private;
-	struct rsbtbl_iter *ri;
-	struct dlm_rsb *r;
-	loff_t n = *pos;
-	unsigned bucket, entry;
-	int toss = (seq->op == &format4_seq_ops);
-
-	bucket = n >> 32;
-	entry = n & ((1LL << 32) - 1);
-
-	if (bucket >= ls->ls_rsbtbl_size)
-		return NULL;
-
-	ri = kzalloc(sizeof(*ri), GFP_NOFS);
-	if (!ri)
-		return NULL;
-	if (n == 0)
-		ri->header = 1;
-	if (seq->op == &format1_seq_ops)
-		ri->format = 1;
-	if (seq->op == &format2_seq_ops)
-		ri->format = 2;
-	if (seq->op == &format3_seq_ops)
-		ri->format = 3;
-	if (seq->op == &format4_seq_ops)
-		ri->format = 4;
-
-	tree = &ls->ls_rsbtbl[bucket].r;
+	struct list_head *list;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	if (!RB_EMPTY_ROOT(tree)) {
-		for (node = rb_first(tree); node; node = rb_next(node)) {
-			r = rb_entry(node, struct dlm_rsb, res_hashnode);
-			if (toss) {
-				if (!rsb_flag(r, RSB_TOSS))
-					continue;
-			} else {
-				if (rsb_flag(r, RSB_TOSS))
-					continue;
-			}
-
-			if (!entry--) {
-				dlm_hold_rsb(r);
-				ri->rsb = r;
-				ri->bucket = bucket;
-				spin_unlock_bh(&ls->ls_rsbtbl_lock);
-				return ri;
-			}
-		}
+	if (!*pos) {
+		if (seq->op == &format2_seq_ops)
+			seq_puts(seq, "id nodeid remid pid xid exflags flags sts grmode rqmode time_ms r_nodeid r_len r_name\n");
+		else if (seq->op == &format3_seq_ops)
+			seq_puts(seq, "rsb ptr nodeid first_lkid flags !root_list_empty !recover_list_empty recover_locks_count len\n");
+		else if (seq->op == &format4_seq_ops)
+			seq_puts(seq, "rsb ptr nodeid master_nodeid dir_nodeid our_nodeid toss_time flags len str|hex name\n");
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
-	/*
-	 * move to the first rsb in the next non-empty bucket
-	 */
-
-	/* zero the entry */
-	n &= ~((1LL << 32) - 1);
 
-	while (1) {
-		bucket++;
-		n += 1LL << 32;
+	if (seq->op == &format4_seq_ops)
+		list = &ls->ls_toss;
+	else
+		list = &ls->ls_keep;
 
-		if (bucket >= ls->ls_rsbtbl_size) {
-			kfree(ri);
-			return NULL;
-		}
-		tree = &ls->ls_rsbtbl[bucket].r;
-
-		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		if (!RB_EMPTY_ROOT(tree)) {
-			node = rb_first(tree);
-			r = rb_entry(node, struct dlm_rsb, res_hashnode);
-			if (toss) {
-				if (!rsb_flag(r, RSB_TOSS))
-					continue;
-			} else {
-				if (rsb_flag(r, RSB_TOSS))
-					continue;
-			}
-
-			dlm_hold_rsb(r);
-			ri->rsb = r;
-			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			*pos = n;
-			return ri;
-		}
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	}
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	return seq_list_start(list, *pos);
 }
 
 static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
-	struct rsbtbl_iter *ri = iter_ptr;
-	struct rb_root *tree;
-	struct rb_node *next;
-	struct dlm_rsb *r, *rp;
-	loff_t n = *pos;
-	unsigned bucket;
-	int toss = (seq->op == &format4_seq_ops);
-
-	bucket = n >> 32;
-
-	/*
-	 * move to the next rsb in the same bucket
-	 */
-
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	rp = ri->rsb;
-	next = rb_next(&rp->res_hashnode);
-
-	if (next) {
-		r = rb_entry(next, struct dlm_rsb, res_hashnode);
-		dlm_hold_rsb(r);
-		ri->rsb = r;
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-		dlm_put_rsb(rp);
-		++*pos;
-		return ri;
-	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	dlm_put_rsb(rp);
-
-	/*
-	 * move to the first rsb in the next non-empty bucket
-	 */
+	struct list_head *list;
 
-	/* zero the entry */
-	n &= ~((1LL << 32) - 1);
-
-	while (1) {
-		bucket++;
-		n += 1LL << 32;
+	if (seq->op == &format4_seq_ops)
+		list = &ls->ls_toss;
+	else
+		list = &ls->ls_keep;
 
-		if (bucket >= ls->ls_rsbtbl_size) {
-			kfree(ri);
-			++*pos;
-			return NULL;
-		}
-		tree = &ls->ls_rsbtbl[bucket].r;
-
-		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		if (!RB_EMPTY_ROOT(tree)) {
-			next = rb_first(tree);
-			r = rb_entry(next, struct dlm_rsb, res_hashnode);
-			if (toss) {
-				if (!rsb_flag(r, RSB_TOSS))
-					continue;
-			} else {
-				if (rsb_flag(r, RSB_TOSS))
-					continue;
-			}
-			dlm_hold_rsb(r);
-			ri->rsb = r;
-			ri->bucket = bucket;
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			*pos = n;
-			return ri;
-		}
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	}
+	return seq_list_next(iter_ptr, list, pos);
 }
 
 static void table_seq_stop(struct seq_file *seq, void *iter_ptr)
 {
-	struct rsbtbl_iter *ri = iter_ptr;
+	struct dlm_ls *ls = seq->private;
 
-	if (ri) {
-		dlm_put_rsb(ri->rsb);
-		kfree(ri);
-	}
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static const struct seq_operations format1_seq_ops = {
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index af88fc2f978c..6d06840029c3 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -339,6 +339,7 @@ struct dlm_rsb {
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
 
+	struct list_head	res_rsbs_list;
 	struct list_head	res_root_list;	    /* used for recovery */
 	struct list_head	res_masters_list;   /* used for recovery */
 	struct list_head	res_recover_list;   /* used for recovery */
@@ -595,6 +596,9 @@ struct dlm_ls {
 	spinlock_t		ls_rsbtbl_lock;
 	uint32_t		ls_rsbtbl_size;
 
+	struct list_head	ls_toss;
+	struct list_head	ls_keep;
+
 	spinlock_t		ls_waiters_lock;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 08ec1a04476a..a70b8edb5d3f 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -668,6 +668,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		r->res_first_lkid = 0;
 	}
 
+	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
@@ -730,6 +731,8 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
  out_add:
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	if (!error)
+		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -801,6 +804,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		r->res_nodeid = 0;
 	}
 
+	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
 	goto out_unlock;
 
@@ -826,6 +830,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	kref_init(&r->res_ref);
 
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	if (!error)
+		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -1110,6 +1116,8 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		goto retry;
 	}
 
+	list_add(&r->res_rsbs_list, &ls->ls_toss);
+
 	if (result)
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
@@ -1120,20 +1128,12 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
-	int i;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (rsb_flag(r, RSB_TOSS))
-				continue;
-
-			if (r->res_hash == hash)
-				dlm_dump_rsb(r);
-		}
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+		if (r->res_hash == hash)
+			dlm_dump_rsb(r);
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
@@ -1166,6 +1166,7 @@ static void toss_rsb(struct kref *kref)
 	kref_init(&r->res_ref);
 	WARN_ON(rsb_flag(r, RSB_TOSS));
 	rsb_set_flag(r, RSB_TOSS);
+	list_move(&r->res_rsbs_list, &ls->ls_toss);
 	r->res_toss_time = jiffies;
 	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
 	if (r->res_lvbptr) {
@@ -1672,6 +1673,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
+		list_del(&r->res_rsbs_list);
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		dlm_free_rsb(r);
 	}
@@ -1740,6 +1742,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 			continue;
 		}
 
+		list_del(&r->res_rsbs_list);
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -4243,6 +4246,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
+		list_del(&r->res_rsbs_list);
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
@@ -5313,17 +5317,12 @@ void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
 			  lkb_count, nodes_count);
 }
 
-static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (n = rb_first(&ls->ls_rsbtbl[bucket].r); n; n = rb_next(n)) {
-		r = rb_entry(n, struct dlm_rsb, res_hashnode);
-		if (rsb_flag(r, RSB_TOSS))
-			continue;
-
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
 			continue;
 		if (!is_master(r)) {
@@ -5358,19 +5357,15 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 void dlm_recover_grant(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
-	int bucket = 0;
 	unsigned int count = 0;
 	unsigned int rsb_count = 0;
 	unsigned int lkb_count = 0;
 
 	while (1) {
-		r = find_grant_rsb(ls, bucket);
-		if (!r) {
-			if (bucket == ls->ls_rsbtbl_size - 1)
-				break;
-			bucket++;
-			continue;
-		}
+		r = find_grant_rsb(ls);
+		if (!r)
+			break;
+
 		rsb_count++;
 		count = 0;
 		lock_rsb(r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index b5184ad550fa..2b5771a7bf31 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -495,6 +495,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	 */
 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
 
+	INIT_LIST_HEAD(&ls->ls_toss);
+	INIT_LIST_HEAD(&ls->ls_keep);
 	spin_lock_init(&ls->ls_rsbtbl_lock);
 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
 	ls->ls_rsbtbl_size = size;
@@ -838,6 +840,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			list_del(&rsb->res_rsbs_list);
 			rb_erase(n, &ls->ls_rsbtbl[i].r);
 			dlm_free_rsb(rsb);
 		}
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index e53d88e4ec93..512c1ae81a96 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -881,23 +881,15 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
 
 void dlm_clear_toss(struct dlm_ls *ls)
 {
-	struct rb_node *n, *next;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r, *safe;
 	unsigned int count = 0;
-	int i;
-
-	spin_lock(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = next) {
-			next = rb_next(n);
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (!rsb_flag(r, RSB_TOSS))
-				continue;
-
-			rb_erase(n, &ls->ls_rsbtbl[i].r);
-			dlm_free_rsb(r);
-			count++;
-		}
+
+	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
+		list_del(&r->res_rsbs_list);
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].r);
+		dlm_free_rsb(r);
+		count++;
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index ad696528ebe7..5e8e10030b74 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -22,9 +22,8 @@
 
 static int dlm_create_masters_list(struct dlm_ls *ls)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
-	int i, error = 0;
+	int error = 0;
 
 	write_lock_bh(&ls->ls_masters_lock);
 	if (!list_empty(&ls->ls_masters_list)) {
@@ -34,15 +33,12 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 	}
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (rsb_flag(r, RSB_TOSS) || r->res_nodeid)
-				continue;
-
-			list_add(&r->res_masters_list, &ls->ls_masters_list);
-			dlm_hold_rsb(r);
-		}
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+		if (r->res_nodeid)
+			continue;
+
+		list_add(&r->res_masters_list, &ls->ls_masters_list);
+		dlm_hold_rsb(r);
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -64,21 +60,15 @@ static void dlm_release_masters_list(struct dlm_ls *ls)
 
 static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 {
-	struct rb_node *n;
 	struct dlm_rsb *r;
-	int i;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (WARN_ON_ONCE(rsb_flag(r, RSB_TOSS)))
-				continue;
-
-			list_add(&r->res_root_list, root_list);
-			dlm_hold_rsb(r);
-		}
+	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+		list_add(&r->res_root_list, root_list);
+		dlm_hold_rsb(r);
 	}
+
+	WARN_ON_ONCE(!list_empty(&ls->ls_toss));
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 5/9] dlm: switch to use rhashtable for rsbs
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (3 preceding siblings ...)
  2024-04-12 13:43 ` [PATCH dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 6/9] dlm: remove refcounting if rsb is on toss Alexander Aring
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch switches to use the rhashtable implementation to lookup rsbs
instead doing an own bucket hash implementation. The rhashtable
implementation deals internally to handle the hashbuckets depending on
the amount of the entries it stores. Currently ls_rsbtbl_size to set the
bucket size is only a guess, in reality we don't know how the user is
using DLM. We move this decision to the rhashtable handling that does
this dynamically during runtime.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/config.c       |   8 ++
 fs/dlm/config.h       |   2 +
 fs/dlm/dir.c          |   6 +-
 fs/dlm/dlm_internal.h |  18 ++---
 fs/dlm/lock.c         | 172 +++++++++++++-----------------------------
 fs/dlm/lock.h         |   2 +-
 fs/dlm/lockspace.c    |  35 ++++-----
 fs/dlm/recover.c      |   3 +-
 8 files changed, 86 insertions(+), 160 deletions(-)

diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index e55e0a2cd2e8..517fa975dc5a 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -63,6 +63,14 @@ static void release_node(struct config_item *);
 static struct configfs_attribute *comm_attrs[];
 static struct configfs_attribute *node_attrs[];
 
+const struct rhashtable_params dlm_rhash_rsb_params = {
+	.nelem_hint = 3, /* start small */
+	.key_len = DLM_RESNAME_MAXLEN,
+	.key_offset = offsetof(struct dlm_rsb, res_name),
+	.head_offset = offsetof(struct dlm_rsb, res_node),
+	.automatic_shrinking = true,
+};
+
 struct dlm_cluster {
 	struct config_group group;
 	unsigned int cl_tcp_port;
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 4c91fcca0fd4..ed237d910208 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -21,6 +21,8 @@ struct dlm_config_node {
 	uint32_t comm_seq;
 };
 
+extern const struct rhashtable_params dlm_rhash_rsb_params;
+
 #define DLM_MAX_ADDR_COUNT 3
 
 #define DLM_PROTO_TCP	0
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index f8039f3ee2d1..9687f908476b 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -198,14 +198,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 				     int len)
 {
 	struct dlm_rsb *r;
-	uint32_t hash, bucket;
 	int rv;
 
-	hash = jhash(name, len, 0);
-	bucket = hash & (ls->ls_rsbtbl_size - 1);
-
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].r, name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 	if (!rv)
 		return r;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 6d06840029c3..cf43b97cf3e5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -34,6 +34,7 @@
 #include <linux/kernel.h>
 #include <linux/jhash.h>
 #include <linux/miscdevice.h>
+#include <linux/rhashtable.h>
 #include <linux/mutex.h>
 #include <linux/idr.h>
 #include <linux/ratelimit.h>
@@ -99,15 +100,6 @@ do { \
   } \
 }
 
-
-#define DLM_RTF_SHRINK_BIT	0
-
-struct dlm_rsbtable {
-	struct rb_root          r;
-	unsigned long		flags;
-};
-
-
 /*
  * Lockspace member (per node in a ls)
  */
@@ -327,13 +319,12 @@ struct dlm_rsb {
 	int			res_id;		/* for ls_recover_idr */
 	uint32_t                res_lvbseq;
 	uint32_t		res_hash;
-	uint32_t		res_bucket;	/* rsbtbl */
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
 	union {
 		struct list_head	res_hashchain;
-		struct rb_node		res_hashnode;	/* rsbtbl */
+		struct rhash_head	res_node; /* rsbtbl */
 	};
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
@@ -592,9 +583,10 @@ struct dlm_ls {
 	struct idr		ls_lkbidr;
 	spinlock_t		ls_lkbidr_spin;
 
-	struct dlm_rsbtable	*ls_rsbtbl;
+	struct rhashtable	ls_rsbtbl;
+#define DLM_RTF_SHRINK_BIT	0
+	unsigned long		ls_rsbtbl_flags;
 	spinlock_t		ls_rsbtbl_lock;
-	uint32_t		ls_rsbtbl_size;
 
 	struct list_head	ls_toss;
 	struct list_head	ls_keep;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index a70b8edb5d3f..defb90b56b72 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -436,8 +436,6 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 
 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 	list_del(&r->res_hashchain);
-	/* Convert the empty list_head to a NULL rb_node for tree usage: */
-	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
 	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
@@ -458,67 +456,31 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	return 0;
 }
 
-static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
+			struct dlm_rsb **r_ret)
 {
-	char maxname[DLM_RESNAME_MAXLEN];
+	char key[DLM_RESNAME_MAXLEN] = {};
 
-	memset(maxname, 0, DLM_RESNAME_MAXLEN);
-	memcpy(maxname, name, nlen);
-	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
-}
+	memcpy(key, name, len);
+	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
+	if (*r_ret)
+		return 0;
 
-int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
-			struct dlm_rsb **r_ret)
-{
-	struct rb_node *node = tree->rb_node;
-	struct dlm_rsb *r;
-	int rc;
-
-	while (node) {
-		r = rb_entry(node, struct dlm_rsb, res_hashnode);
-		rc = rsb_cmp(r, name, len);
-		if (rc < 0)
-			node = node->rb_left;
-		else if (rc > 0)
-			node = node->rb_right;
-		else
-			goto found;
-	}
-	*r_ret = NULL;
 	return -EBADR;
-
- found:
-	*r_ret = r;
-	return 0;
 }
 
-static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-	struct rb_node **newn = &tree->rb_node;
-	struct rb_node *parent = NULL;
-	int rc;
-
-	while (*newn) {
-		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
-					       res_hashnode);
+	int rv;
 
-		parent = *newn;
-		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
-		if (rc < 0)
-			newn = &parent->rb_left;
-		else if (rc > 0)
-			newn = &parent->rb_right;
-		else {
-			log_print("rsb_insert match");
-			dlm_dump_rsb(rsb);
-			dlm_dump_rsb(cur);
-			return -EEXIST;
-		}
+	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+				    dlm_rhash_rsb_params);
+	if (rv == -EEXIST) {
+		log_print("%s match", __func__);
+		dlm_dump_rsb(rsb);
 	}
 
-	rb_link_node(&rsb->res_hashnode, parent, newn);
-	rb_insert_color(&rsb->res_hashnode, tree);
-	return 0;
+	return rv;
 }
 
 /*
@@ -566,8 +528,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
  */
 
 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
-			uint32_t hash, uint32_t b,
-			int dir_nodeid, int from_nodeid,
+			uint32_t hash, int dir_nodeid, int from_nodeid,
 			unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r = NULL;
@@ -616,7 +577,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (error)
 		goto do_new;
 	
@@ -690,7 +651,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		goto out_unlock;
 
 	r->res_hash = hash;
-	r->res_bucket = b;
 	r->res_dir_nodeid = dir_nodeid;
 	kref_init(&r->res_ref);
 
@@ -730,7 +690,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	}
 
  out_add:
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
 	if (!error)
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
@@ -745,8 +705,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
    dlm_recover_masters). */
 
 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
-			  uint32_t hash, uint32_t b,
-			  int dir_nodeid, int from_nodeid,
+			  uint32_t hash, int dir_nodeid, int from_nodeid,
 			  unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r = NULL;
@@ -761,7 +720,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (error)
 		goto do_new;
 
@@ -823,13 +782,12 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		goto out_unlock;
 
 	r->res_hash = hash;
-	r->res_bucket = b;
 	r->res_dir_nodeid = dir_nodeid;
 	r->res_master_nodeid = dir_nodeid;
 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
 	if (!error)
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
  out_unlock:
@@ -843,23 +801,21 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
 		    int from_nodeid, unsigned int flags,
 		    struct dlm_rsb **r_ret)
 {
-	uint32_t hash, b;
 	int dir_nodeid;
+	uint32_t hash;
 
 	if (len > DLM_RESNAME_MAXLEN)
 		return -EINVAL;
 
 	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	dir_nodeid = dlm_hash2nodeid(ls, hash);
 
 	if (dlm_no_directory(ls))
-		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
+		return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
 				      from_nodeid, flags, r_ret);
 	else
-		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
-				      from_nodeid, flags, r_ret);
+		return find_rsb_dir(ls, name, len, hash, dir_nodeid,
+				    from_nodeid, flags, r_ret);
 }
 
 /* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1020,7 +976,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		      int len, unsigned int flags, int *r_nodeid, int *result)
 {
 	struct dlm_rsb *r = NULL;
-	uint32_t hash, b;
+	uint32_t hash;
 	int our_nodeid = dlm_our_nodeid();
 	int dir_nodeid, error;
 
@@ -1034,8 +990,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	}
 
 	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	dir_nodeid = dlm_hash2nodeid(ls, hash);
 	if (dir_nodeid != our_nodeid) {
 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
@@ -1051,7 +1005,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		return error;
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error) {
 		if (rsb_flag(r, RSB_TOSS))
 			goto do_toss;
@@ -1100,7 +1054,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		goto out_unlock;
 
 	r->res_hash = hash;
-	r->res_bucket = b;
 	r->res_dir_nodeid = our_nodeid;
 	r->res_master_nodeid = from_nodeid;
 	r->res_nodeid = from_nodeid;
@@ -1108,7 +1061,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_toss_time = jiffies;
 	rsb_set_flag(r, RSB_TOSS);
 
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
+	error = rsb_insert(r, &ls->ls_rsbtbl);
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
@@ -1141,14 +1094,10 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 {
 	struct dlm_rsb *r = NULL;
-	uint32_t hash, b;
 	int error;
 
-	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error)
 		goto out;
 
@@ -1168,7 +1117,7 @@ static void toss_rsb(struct kref *kref)
 	rsb_set_flag(r, RSB_TOSS);
 	list_move(&r->res_rsbs_list, &ls->ls_toss);
 	r->res_toss_time = jiffies;
-	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
+	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
 		r->res_lvbptr = NULL;
@@ -1607,10 +1556,9 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	return error;
 }
 
-static void shrink_bucket(struct dlm_ls *ls, int b)
+static void shrink_bucket(struct dlm_ls *ls)
 {
-	struct rb_node *n, *next;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r, *safe;
 	char *name;
 	int our_nodeid = dlm_our_nodeid();
 	int remote_count = 0;
@@ -1621,17 +1569,12 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
+	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags)) {
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
-	for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
-		next = rb_next(n);
-		r = rb_entry(n, struct dlm_rsb, res_hashnode);
-		if (!rsb_flag(r, RSB_TOSS))
-			continue;
-
+	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
 		/* If we're the directory record for this rsb, and
 		   we're not the master of it, then we need to wait
 		   for the master node to send us a dir remove for
@@ -1674,14 +1617,15 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		dlm_free_rsb(r);
 	}
 
 	if (need_shrink)
-		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
+		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
 	else
-		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
+		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	/*
@@ -1698,7 +1642,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		len = ls->ls_remove_lens[i];
 
 		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 		if (rv) {
 			spin_unlock_bh(&ls->ls_rsbtbl_lock);
 			log_error(ls, "remove_name not found %s", name);
@@ -1743,7 +1687,8 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
@@ -1753,14 +1698,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 void dlm_scan_rsbs(struct dlm_ls *ls)
 {
-	int i;
-
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		shrink_bucket(ls, i);
-		if (dlm_locking_stopped(ls))
-			break;
-		cond_resched();
-	}
+	shrink_bucket(ls);
 }
 
 /* lkb is master or local copy */
@@ -4174,7 +4112,6 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 {
 	char name[DLM_RESNAME_MAXLEN+1];
 	struct dlm_rsb *r;
-	uint32_t hash, b;
 	int rv, len, dir_nodeid, from_nodeid;
 
 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
@@ -4194,24 +4131,22 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		return;
 	}
 
-	/* Look for name on rsbtbl.toss, if it's there, kill it.
-	   If it's on rsbtbl.keep, it's being used, and we should ignore this
-	   message.  This is an expected race between the dir node sending a
-	   request to the master node at the same time as the master node sends
-	   a remove to the dir node.  The resolution to that race is for the
-	   dir node to ignore the remove message, and the master node to
-	   recreate the master rsb when it gets a request from the dir node for
-	   an rsb it doesn't have. */
+	/* Look for name in rsb toss state, if it's there, kill it.
+	 * If it's in non toss state, it's being used, and we should ignore this
+	 * message.  This is an expected race between the dir node sending a
+	 * request to the master node at the same time as the master node sends
+	 * a remove to the dir node.  The resolution to that race is for the
+	 * dir node to ignore the remove message, and the master node to
+	 * recreate the master rsb when it gets a request from the dir node for
+	 * an rsb it doesn't have.
+	 */
 
 	memset(name, 0, sizeof(name));
 	memcpy(name, ms->m_extra, len);
 
-	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (rv) {
 		/* should not happen */
 		log_error(ls, "%s from %d not found %s", __func__,
@@ -4247,7 +4182,8 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
 	} else {
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 45a74869810a..33616d4b0cdb 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -29,7 +29,7 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		      int len, unsigned int flags, int *r_nodeid, int *result);
 
-int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
+int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
 			struct dlm_rsb **r_ret);
 
 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 2b5771a7bf31..890e1a4cf787 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -410,9 +410,9 @@ static int new_lockspace(const char *name, const char *cluster,
 			 int *ops_result, dlm_lockspace_t **lockspace)
 {
 	struct dlm_ls *ls;
-	int i, size, error;
 	int do_unreg = 0;
 	int namelen = strlen(name);
+	int i, error;
 
 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 		return -EINVAL;
@@ -498,15 +498,10 @@ static int new_lockspace(const char *name, const char *cluster,
 	INIT_LIST_HEAD(&ls->ls_toss);
 	INIT_LIST_HEAD(&ls->ls_keep);
 	spin_lock_init(&ls->ls_rsbtbl_lock);
-	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
-	ls->ls_rsbtbl_size = size;
 
-	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
-	if (!ls->ls_rsbtbl)
+	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
+	if (error)
 		goto out_lsfree;
-	for (i = 0; i < size; i++) {
-		ls->ls_rsbtbl[i].r.rb_node = NULL;
-	}
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
@@ -669,7 +664,7 @@ static int new_lockspace(const char *name, const char *cluster,
  out_rsbtbl:
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 		kfree(ls->ls_remove_names[i]);
-	vfree(ls->ls_rsbtbl);
+	rhashtable_destroy(&ls->ls_rsbtbl);
  out_lsfree:
 	if (do_unreg)
 		kobject_put(&ls->ls_kobj);
@@ -772,10 +767,16 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 	return rv;
 }
 
+static void rhash_free_rsb(void *ptr, void *arg)
+{
+	struct dlm_rsb *rsb = ptr;
+
+	dlm_free_rsb(rsb);
+}
+
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct rb_node *n;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -834,19 +835,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	idr_destroy(&ls->ls_lkbidr);
 
 	/*
-	 * Free all rsb's on rsbtbl[] lists
+	 * Free all rsb's on rsbtbl
 	 */
-
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
-			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_del(&rsb->res_rsbs_list);
-			rb_erase(n, &ls->ls_rsbtbl[i].r);
-			dlm_free_rsb(rsb);
-		}
-	}
-
-	vfree(ls->ls_rsbtbl);
+	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
 
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 		kfree(ls->ls_remove_names[i]);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 512c1ae81a96..c21ef115123b 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -887,7 +887,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	spin_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
 		list_del(&r->res_rsbs_list);
-		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].r);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
 		dlm_free_rsb(r);
 		count++;
 	}
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 6/9] dlm: remove refcounting if rsb is on toss
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (4 preceding siblings ...)
  2024-04-12 13:43 ` [PATCH dlm/next 5/9] dlm: switch to use rhashtable for rsbs Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 7/9] dlm: drop scand kthread and use timers Alexander Aring
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

In the past we had problems when rsb have a reference counter greater
than one when they are in the toss state. A rsb in the toss state is
not "alive" anymore and should not have any other references holding
than just the last one keeping it on the rsb hash. It is a delayed free
as kind of garbage collection with an additional handling to move the
rsb into the keep state thats alive again.

The reference counter should only be used when the rsb is not in toss
state (keep state), if the rsb isn't used anymore in keep state it will
be moved to toss state where it can be freed on the right time. The
release() function of the keep rsb will therefore set the RSB_TOSS state
and does not kref_init() at this state as toss rsbs should never deal
with any reference counting. If the rsb got alive again then the
RSB_TOSS flag will be cleared and the kref_init() will be called to use
reference counters when the rsb is in keep state.

There are warnings introduced now that will be triggered if we ever use
any reference counting when a rsb is in toss state. All before
kref_put() in toss state was only a sanity check if it ever wasn't 1.
This will be removed and we directly free the rsb when it should be
freed in the toss state. If there are still references around it is a
bug.

This WARN_ON_ONCE() should hopefully find maybe more cases where we hold
references of a toss rsb which we should not have as receive_remove() in
the most cases requires to have the rsb in toss state. Otherwise we can
run into problems when e.g. the directory rsb wasn't removed and the
directory node tells the master node in case of a lookup message.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c    | 61 ++++++++++++++++++++++++------------------------
 fs/dlm/lock.h    |  1 +
 fs/dlm/recover.c |  2 +-
 3 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index defb90b56b72..fee1a4164fc1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -325,6 +325,8 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 
 static inline void hold_rsb(struct dlm_rsb *r)
 {
+	/* rsbs in toss state never get referenced */
+	WARN_ON(rsb_flag(r, RSB_TOSS));
 	kref_get(&r->res_ref);
 }
 
@@ -631,6 +633,11 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
+	/* rsb got out of toss state, it becomes alive again
+	 * and we reinit the reference counter that is only
+	 * valid for keep state rsbs
+	 */
+	kref_init(&r->res_ref);
 	goto out_unlock;
 
 
@@ -765,6 +772,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
 	rsb_clear_flag(r, RSB_TOSS);
+	/* rsb got out of toss state, it becomes alive again
+	 * and we reinit the reference counter that is only
+	 * valid for keep state rsbs
+	 */
+	kref_init(&r->res_ref);
 	goto out_unlock;
 
 
@@ -1112,8 +1124,6 @@ static void toss_rsb(struct kref *kref)
 	struct dlm_ls *ls = r->res_ls;
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
-	kref_init(&r->res_ref);
-	WARN_ON(rsb_flag(r, RSB_TOSS));
 	rsb_set_flag(r, RSB_TOSS);
 	list_move(&r->res_rsbs_list, &ls->ls_toss);
 	r->res_toss_time = jiffies;
@@ -1129,16 +1139,20 @@ static void toss_rsb(struct kref *kref)
 static void unhold_rsb(struct dlm_rsb *r)
 {
 	int rv;
+
+	/* rsbs in toss state never get referenced */
+	WARN_ON(rsb_flag(r, RSB_TOSS));
 	rv = kref_put(&r->res_ref, toss_rsb);
 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
 }
 
-static void kill_rsb(struct kref *kref)
+void free_toss_rsb(struct dlm_rsb *r)
 {
-	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
+	WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
 
-	/* All work is done after the return from kref_put() so we
-	   can release the write_lock before the remove and free. */
+	/* check if all work is done after the rsb is on toss list
+	 * and it can be freed.
+	 */
 
 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
@@ -1147,6 +1161,8 @@ static void kill_rsb(struct kref *kref)
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
+
+	dlm_free_rsb(r);
 }
 
 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
@@ -1611,15 +1627,10 @@ static void shrink_bucket(struct dlm_ls *ls)
 			continue;
 		}
 
-		if (!kref_put(&r->res_ref, kill_rsb)) {
-			log_error(ls, "tossed rsb in use %s", r->res_name);
-			continue;
-		}
-
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
-		dlm_free_rsb(r);
+		free_toss_rsb(r);
 	}
 
 	if (need_shrink)
@@ -1680,19 +1691,13 @@ static void shrink_bucket(struct dlm_ls *ls)
 			continue;
 		}
 
-		if (!kref_put(&r->res_ref, kill_rsb)) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_error(ls, "remove_name in use %s", name);
-			continue;
-		}
-
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
 		send_remove(r);
 		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
-		dlm_free_rsb(r);
+		free_toss_rsb(r);
 	}
 }
 
@@ -4180,18 +4185,12 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		return;
 	}
 
-	if (kref_put(&r->res_ref, kill_rsb)) {
-		list_del(&r->res_rsbs_list);
-		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
-				       dlm_rhash_rsb_params);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-		dlm_free_rsb(r);
-	} else {
-		log_error(ls, "receive_remove from %d rsb ref error",
-			  from_nodeid);
-		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-	}
+	list_del(&r->res_rsbs_list);
+	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+			       dlm_rhash_rsb_params);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+
+	free_toss_rsb(r);
 }
 
 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 33616d4b0cdb..b56a34802762 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -18,6 +18,7 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
 			       uint32_t saved_seq);
 void dlm_receive_buffer(const union dlm_packet *p, int nodeid);
 int dlm_modes_compat(int mode1, int mode2);
+void free_toss_rsb(struct dlm_rsb *r);
 void dlm_put_rsb(struct dlm_rsb *r);
 void dlm_hold_rsb(struct dlm_rsb *r);
 int dlm_put_lkb(struct dlm_lkb *lkb);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index c21ef115123b..d43189532b14 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -889,7 +889,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
-		dlm_free_rsb(r);
+		free_toss_rsb(r);
 		count++;
 	}
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 7/9] dlm: drop scand kthread and use timers
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (5 preceding siblings ...)
  2024-04-12 13:43 ` [PATCH dlm/next 6/9] dlm: remove refcounting if rsb is on toss Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 9/9] dlm: convert lkbidr to rwlock Alexander Aring
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Currently the scand kthread acts like a garbage collection for expired
rsbs on toss list to clean them up after a certain timeout. It triggers
every couple of seconds and iterates over the toss list while holding
ls_rsbtbl_lock for the whole hash bucket iteration. To reduce the amount
of holding the ls_rsbtbl_lock time we handle free of cached rsbs (tossed
rsbs) on a per rsb timer. If it expires the rsb deletes itself out of
the rsb toss queue that is an ordered queue of tossed rsbs. First entry
is the earliest rsb to expire the last is the lastest rsb that will expire.
The res_toss_time is now an absolute value of when it will be expired
before it was the time when the rsb was tossed. It was an debug
functionality for debugfs anyway, we change a little bit the semantic
here.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  16 +-
 fs/dlm/lock.c         | 351 +++++++++++++++++++++++++-----------------
 fs/dlm/lock.h         |   1 +
 fs/dlm/lockspace.c    | 105 ++-----------
 fs/dlm/member.c       |   2 +
 fs/dlm/recover.c      |   5 +
 fs/dlm/recoverd.c     |  14 ++
 7 files changed, 255 insertions(+), 239 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index cf43b97cf3e5..98a0ac511bc8 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -334,6 +334,7 @@ struct dlm_rsb {
 	struct list_head	res_root_list;	    /* used for recovery */
 	struct list_head	res_masters_list;   /* used for recovery */
 	struct list_head	res_recover_list;   /* used for recovery */
+	struct list_head	res_toss_q_list;
 	int			res_recover_locks_count;
 
 	char			*res_lvbptr;
@@ -584,13 +585,20 @@ struct dlm_ls {
 	spinlock_t		ls_lkbidr_spin;
 
 	struct rhashtable	ls_rsbtbl;
-#define DLM_RTF_SHRINK_BIT	0
-	unsigned long		ls_rsbtbl_flags;
 	spinlock_t		ls_rsbtbl_lock;
 
 	struct list_head	ls_toss;
 	struct list_head	ls_keep;
 
+	struct timer_list	ls_timer;
+	/* this queue is ordered according the
+	 * absolute res_toss_time jiffies time
+	 * to mod_timer() with the first element
+	 * if necessary.
+	 */
+	struct list_head	ls_toss_q;
+	spinlock_t		ls_toss_q_lock;
+
 	spinlock_t		ls_waiters_lock;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
@@ -601,9 +609,6 @@ struct dlm_ls {
 	int			ls_new_rsb_count;
 	struct list_head	ls_new_rsb;	/* new rsb structs */
 
-	char			*ls_remove_names[DLM_REMOVE_NAMES_MAX];
-	int			ls_remove_lens[DLM_REMOVE_NAMES_MAX];
-
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
@@ -640,7 +645,6 @@ struct dlm_ls {
 
 	spinlock_t		ls_cb_lock;
 	struct list_head	ls_cb_delay; /* save for queue_work later */
-	struct timer_list	ls_timer;
 	struct task_struct	*ls_recoverd_task;
 	struct mutex		ls_recoverd_active;
 	spinlock_t		ls_recover_lock;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index fee1a4164fc1..419e7bdb3629 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -320,6 +320,11 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
  * Basic operations on rsb's and lkb's
  */
 
+static inline unsigned long rsb_toss_jiffies(void)
+{
+	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
+}
+
 /* This is only called to add a reference when the code already holds
    a valid reference to the rsb, so there's no need for locking. */
 
@@ -416,6 +421,198 @@ static int pre_rsb_struct(struct dlm_ls *ls)
 	return 0;
 }
 
+/* connected with timer_delete_sync() in dlm_ls_stop() to stop
+ * new timers when recovery is triggered.
+ */
+static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies)
+{
+	if (!dlm_locking_stopped(ls))
+		mod_timer(&ls->ls_timer, jiffies);
+}
+
+/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */
+static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
+{
+	struct dlm_rsb *first;
+
+	spin_lock_bh(&ls->ls_toss_q_lock);
+	r->res_toss_time = 0;
+
+	/* if the rsb is not queued do nothing */
+	if (list_empty(&r->res_toss_q_list))
+		goto out;
+
+	/* get the first element before delete */
+	first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb,
+				 res_toss_q_list);
+	list_del_init(&r->res_toss_q_list);
+	/* check if the first element was the rsb we deleted */
+	if (first == r) {
+		/* try to get the new first element, if the list
+		 * is empty now try to delete the timer, if we are
+		 * too late we don't care.
+		 *
+		 * if the list isn't empty and a new first element got
+		 * in place, set the new timer expire time.
+		 */
+		first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+						 res_toss_q_list);
+		if (!first)
+			timer_delete(&ls->ls_timer);
+		else
+			__rsb_mod_timer(ls, first->res_toss_time);
+	}
+
+out:
+	spin_unlock_bh(&ls->ls_toss_q_lock);
+}
+
+/* Caller must held ls_rsbtbl_lock and need to be called every time
+ * when either the rsb enters toss state or the toss state changes
+ * the dir/master nodeid.
+ */
+static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
+{
+	int our_nodeid = dlm_our_nodeid();
+	struct dlm_rsb *first;
+
+	/* If we're the directory record for this rsb, and
+	 * we're not the master of it, then we need to wait
+	 * for the master node to send us a dir remove for
+	 * before removing the dir record.
+	 */
+	if (!dlm_no_directory(ls) &&
+	    (r->res_master_nodeid != our_nodeid) &&
+	    (dlm_dir_nodeid(r) == our_nodeid)) {
+		rsb_delete_toss_timer(ls, r);
+		return;
+	}
+
+	spin_lock_bh(&ls->ls_toss_q_lock);
+	/* set the new rsb absolute expire time in the rsb */
+	r->res_toss_time = rsb_toss_jiffies();
+	if (list_empty(&ls->ls_toss_q)) {
+		/* if the queue is empty add the element and it's
+		 * our new expire time
+		 */
+		list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
+		__rsb_mod_timer(ls, r->res_toss_time);
+	} else {
+		/* check if the rsb was already queued, if so delete
+		 * it from the toss queue
+		 */
+		if (!list_empty(&r->res_toss_q_list))
+			list_del(&r->res_toss_q_list);
+
+		/* try to get the maybe new first element and then add
+		 * to this rsb with the oldest expire time to the end
+		 * of the queue. If the list was empty before this
+		 * rsb expire time is our next expiration if it wasn't
+		 * the now new first elemet is our new expiration time
+		 */
+		first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+						 res_toss_q_list);
+		list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
+		if (!first)
+			__rsb_mod_timer(ls, r->res_toss_time);
+		else
+			__rsb_mod_timer(ls, first->res_toss_time);
+	}
+	spin_unlock_bh(&ls->ls_toss_q_lock);
+}
+
+/* if we hit contention we do in 100 ms a retry to trylock */
+#define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
+
+void dlm_rsb_toss_timer(struct timer_list *timer)
+{
+	struct dlm_ls *ls = from_timer(ls, timer, ls_timer);
+	int our_nodeid = dlm_our_nodeid();
+	struct dlm_rsb *r;
+	int rv;
+
+	while (1) {
+		/* interrupting point to leave iteration when
+		 * recovery waits for timer_delete_sync(), recovery
+		 * will take care to delete everything in toss queue.
+		 */
+		if (dlm_locking_stopped(ls))
+			break;
+
+		rv = spin_trylock(&ls->ls_toss_q_lock);
+		if (!rv) {
+			/* rearm again try timer */
+			__rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
+			break;
+		}
+
+		r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+					     res_toss_q_list);
+		if (!r) {
+			/* nothing to do anymore next rsb queue will
+			 * set next mod_timer() expire.
+			 */
+			spin_unlock(&ls->ls_toss_q_lock);
+			break;
+		}
+
+		/* test if the first rsb isn't expired yet, if
+		 * so we stop freeing rsb from toss queue as
+		 * the order in queue is ascending to the
+		 * absolute res_toss_time jiffies
+		 */
+		if (time_before(jiffies, r->res_toss_time)) {
+			/* rearm with the next rsb to expire in the future */
+			__rsb_mod_timer(ls, r->res_toss_time);
+			spin_unlock(&ls->ls_toss_q_lock);
+			break;
+		}
+
+		/* in find_rsb_dir/nodir there is a reverse order of this
+		 * lock, however this is only a trylock if we hit some
+		 * possible contention we try it again.
+		 */
+		rv = spin_trylock(&ls->ls_rsbtbl_lock);
+		if (!rv) {
+			spin_unlock(&ls->ls_toss_q_lock);
+			/* rearm again try timer */
+			__rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
+			break;
+		}
+
+		list_del(&r->res_rsbs_list);
+		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+				       dlm_rhash_rsb_params);
+
+		/* not necessary to held the ls_rsbtbl_lock when
+		 * calling send_remove()
+		 */
+		spin_unlock(&ls->ls_rsbtbl_lock);
+
+		/* remove the rsb out of the toss queue its gone
+		 * drom DLM now
+		 */
+		list_del_init(&r->res_toss_q_list);
+		spin_unlock(&ls->ls_toss_q_lock);
+
+		/* no rsb in this state should ever run a timer */
+		WARN_ON(!dlm_no_directory(ls) &&
+			(r->res_master_nodeid != our_nodeid) &&
+			(dlm_dir_nodeid(r) == our_nodeid));
+
+		/* We're the master of this rsb but we're not
+		 * the directory record, so we need to tell the
+		 * dir node to remove the dir record
+		 */
+		if (!dlm_no_directory(ls) &&
+		    (r->res_master_nodeid == our_nodeid) &&
+		    (dlm_dir_nodeid(r) != our_nodeid))
+			send_remove(r);
+
+		free_toss_rsb(r);
+	}
+}
+
 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
    unlock any spinlocks, go back and call pre_rsb_struct again.
    Otherwise, take an rsb off the list and return it. */
@@ -451,6 +648,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	INIT_LIST_HEAD(&r->res_convertqueue);
 	INIT_LIST_HEAD(&r->res_waitqueue);
 	INIT_LIST_HEAD(&r->res_root_list);
+	INIT_LIST_HEAD(&r->res_toss_q_list);
 	INIT_LIST_HEAD(&r->res_recover_list);
 	INIT_LIST_HEAD(&r->res_masters_list);
 
@@ -638,6 +836,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	 * valid for keep state rsbs
 	 */
 	kref_init(&r->res_ref);
+	rsb_delete_toss_timer(ls, r);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+
 	goto out_unlock;
 
 
@@ -777,6 +978,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	 * valid for keep state rsbs
 	 */
 	kref_init(&r->res_ref);
+	rsb_delete_toss_timer(ls, r);
+	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+
 	goto out_unlock;
 
 
@@ -1050,7 +1254,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
 			    r_nodeid, result);
 
-	r->res_toss_time = jiffies;
+	rsb_mod_timer(ls, r);
 	/* the rsb was inactive (on toss list) */
 	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
@@ -1070,7 +1274,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_master_nodeid = from_nodeid;
 	r->res_nodeid = from_nodeid;
 	kref_init(&r->res_ref);
-	r->res_toss_time = jiffies;
 	rsb_set_flag(r, RSB_TOSS);
 
 	error = rsb_insert(r, &ls->ls_rsbtbl);
@@ -1082,6 +1285,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	}
 
 	list_add(&r->res_rsbs_list, &ls->ls_toss);
+	rsb_mod_timer(ls, r);
 
 	if (result)
 		*result = DLM_LU_ADD;
@@ -1126,8 +1330,8 @@ static void toss_rsb(struct kref *kref)
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	rsb_set_flag(r, RSB_TOSS);
 	list_move(&r->res_rsbs_list, &ls->ls_toss);
-	r->res_toss_time = jiffies;
-	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
+	rsb_mod_timer(ls, r);
+
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
 		r->res_lvbptr = NULL;
@@ -1150,15 +1354,12 @@ void free_toss_rsb(struct dlm_rsb *r)
 {
 	WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
 
-	/* check if all work is done after the rsb is on toss list
-	 * and it can be freed.
-	 */
-
 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
+	DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
 
@@ -1572,140 +1773,6 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	return error;
 }
 
-static void shrink_bucket(struct dlm_ls *ls)
-{
-	struct dlm_rsb *r, *safe;
-	char *name;
-	int our_nodeid = dlm_our_nodeid();
-	int remote_count = 0;
-	int need_shrink = 0;
-	int i, len, rv;
-
-	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
-
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
-
-	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags)) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-		return;
-	}
-
-	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
-		/* If we're the directory record for this rsb, and
-		   we're not the master of it, then we need to wait
-		   for the master node to send us a dir remove for
-		   before removing the dir record. */
-
-		if (!dlm_no_directory(ls) &&
-		    (r->res_master_nodeid != our_nodeid) &&
-		    (dlm_dir_nodeid(r) == our_nodeid)) {
-			continue;
-		}
-
-		need_shrink = 1;
-
-		if (!time_after_eq(jiffies, r->res_toss_time +
-				   dlm_config.ci_toss_secs * HZ)) {
-			continue;
-		}
-
-		if (!dlm_no_directory(ls) &&
-		    (r->res_master_nodeid == our_nodeid) &&
-		    (dlm_dir_nodeid(r) != our_nodeid)) {
-
-			/* We're the master of this rsb but we're not
-			   the directory record, so we need to tell the
-			   dir node to remove the dir record. */
-
-			ls->ls_remove_lens[remote_count] = r->res_length;
-			memcpy(ls->ls_remove_names[remote_count], r->res_name,
-			       DLM_RESNAME_MAXLEN);
-			remote_count++;
-
-			if (remote_count >= DLM_REMOVE_NAMES_MAX)
-				break;
-			continue;
-		}
-
-		list_del(&r->res_rsbs_list);
-		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
-				       dlm_rhash_rsb_params);
-		free_toss_rsb(r);
-	}
-
-	if (need_shrink)
-		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
-	else
-		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
-	/*
-	 * While searching for rsb's to free, we found some that require
-	 * remote removal.  We leave them in place and find them again here
-	 * so there is a very small gap between removing them from the toss
-	 * list and sending the removal.  Keeping this gap small is
-	 * important to keep us (the master node) from being out of sync
-	 * with the remote dir node for very long.
-	 */
-
-	for (i = 0; i < remote_count; i++) {
-		name = ls->ls_remove_names[i];
-		len = ls->ls_remove_lens[i];
-
-		spin_lock_bh(&ls->ls_rsbtbl_lock);
-		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-		if (rv) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_error(ls, "remove_name not found %s", name);
-			continue;
-		}
-
-		if (!rsb_flag(r, RSB_TOSS)) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_debug(ls, "remove_name not toss %s", name);
-			continue;
-		}
-
-		if (r->res_master_nodeid != our_nodeid) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_debug(ls, "remove_name master %d dir %d our %d %s",
-				  r->res_master_nodeid, r->res_dir_nodeid,
-				  our_nodeid, name);
-			continue;
-		}
-
-		if (r->res_dir_nodeid == our_nodeid) {
-			/* should never happen */
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_error(ls, "remove_name dir %d master %d our %d %s",
-				  r->res_dir_nodeid, r->res_master_nodeid,
-				  our_nodeid, name);
-			continue;
-		}
-
-		if (!time_after_eq(jiffies, r->res_toss_time +
-				   dlm_config.ci_toss_secs * HZ)) {
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
-			log_debug(ls, "remove_name toss_time %lu now %lu %s",
-				  r->res_toss_time, jiffies, name);
-			continue;
-		}
-
-		list_del(&r->res_rsbs_list);
-		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
-				       dlm_rhash_rsb_params);
-		send_remove(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
-		free_toss_rsb(r);
-	}
-}
-
-void dlm_scan_rsbs(struct dlm_ls *ls)
-{
-	shrink_bucket(ls);
-}
-
 /* lkb is master or local copy */
 
 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index b56a34802762..a90b11876cd3 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -11,6 +11,7 @@
 #ifndef __LOCK_DOT_H__
 #define __LOCK_DOT_H__
 
+void dlm_rsb_toss_timer(struct timer_list *timer);
 void dlm_dump_rsb(struct dlm_rsb *r);
 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len);
 void dlm_print_lkb(struct dlm_lkb *lkb);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 890e1a4cf787..931eb3f22ec6 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -29,8 +29,6 @@ static int			ls_count;
 static struct mutex		ls_lock;
 static struct list_head		lslist;
 static spinlock_t		lslist_lock;
-static struct task_struct *	scand_task;
-
 
 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 {
@@ -247,64 +245,6 @@ void dlm_lockspace_exit(void)
 	kset_unregister(dlm_kset);
 }
 
-static struct dlm_ls *find_ls_to_scan(void)
-{
-	struct dlm_ls *ls;
-
-	spin_lock_bh(&lslist_lock);
-	list_for_each_entry(ls, &lslist, ls_list) {
-		if (time_after_eq(jiffies, ls->ls_scan_time +
-					    dlm_config.ci_scan_secs * HZ)) {
-			atomic_inc(&ls->ls_count);
-			spin_unlock_bh(&lslist_lock);
-			return ls;
-		}
-	}
-	spin_unlock_bh(&lslist_lock);
-	return NULL;
-}
-
-static int dlm_scand(void *data)
-{
-	struct dlm_ls *ls;
-
-	while (!kthread_should_stop()) {
-		ls = find_ls_to_scan();
-		if (ls) {
-			if (dlm_lock_recovery_try(ls)) {
-				ls->ls_scan_time = jiffies;
-				dlm_scan_rsbs(ls);
-				dlm_unlock_recovery(ls);
-			} else {
-				ls->ls_scan_time += HZ;
-			}
-
-			dlm_put_lockspace(ls);
-			continue;
-		}
-		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
-	}
-	return 0;
-}
-
-static int dlm_scand_start(void)
-{
-	struct task_struct *p;
-	int error = 0;
-
-	p = kthread_run(dlm_scand, NULL, "dlm_scand");
-	if (IS_ERR(p))
-		error = PTR_ERR(p);
-	else
-		scand_task = p;
-	return error;
-}
-
-static void dlm_scand_stop(void)
-{
-	kthread_stop(scand_task);
-}
-
 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 {
 	struct dlm_ls *ls;
@@ -385,22 +325,9 @@ static int threads_start(void)
 
 	/* Thread for sending/receiving messages for all lockspace's */
 	error = dlm_midcomms_start();
-	if (error) {
+	if (error)
 		log_print("cannot start dlm midcomms %d", error);
-		goto fail;
-	}
 
-	error = dlm_scand_start();
-	if (error) {
-		log_print("cannot start dlm_scand thread %d", error);
-		goto midcomms_fail;
-	}
-
-	return 0;
-
- midcomms_fail:
-	dlm_midcomms_stop();
- fail:
 	return error;
 }
 
@@ -412,7 +339,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	struct dlm_ls *ls;
 	int do_unreg = 0;
 	int namelen = strlen(name);
-	int i, error;
+	int error;
 
 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 		return -EINVAL;
@@ -503,13 +430,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	if (error)
 		goto out_lsfree;
 
-	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
-		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
-						 GFP_KERNEL);
-		if (!ls->ls_remove_names[i])
-			goto out_rsbtbl;
-	}
-
 	idr_init(&ls->ls_lkbidr);
 	spin_lock_init(&ls->ls_lkbidr_spin);
 
@@ -582,6 +502,11 @@ static int new_lockspace(const char *name, const char *cluster,
 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
 	rwlock_init(&ls->ls_dir_dump_lock);
 
+	INIT_LIST_HEAD(&ls->ls_toss_q);
+	spin_lock_init(&ls->ls_toss_q_lock);
+	timer_setup(&ls->ls_timer, dlm_rsb_toss_timer,
+		    TIMER_DEFERRABLE);
+
 	spin_lock_bh(&lslist_lock);
 	ls->ls_create_count = 1;
 	list_add(&ls->ls_list, &lslist);
@@ -661,9 +586,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	kfree(ls->ls_recover_buf);
  out_lkbidr:
 	idr_destroy(&ls->ls_lkbidr);
- out_rsbtbl:
-	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
-		kfree(ls->ls_remove_names[i]);
 	rhashtable_destroy(&ls->ls_rsbtbl);
  out_lsfree:
 	if (do_unreg)
@@ -696,7 +618,6 @@ static int __dlm_new_lockspace(const char *name, const char *cluster,
 	if (error > 0)
 		error = 0;
 	if (!ls_count) {
-		dlm_scand_stop();
 		dlm_midcomms_shutdown();
 		dlm_midcomms_stop();
 	}
@@ -777,7 +698,7 @@ static void rhash_free_rsb(void *ptr, void *arg)
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	int i, busy, rv;
+	int busy, rv;
 
 	busy = lockspace_busy(ls, force);
 
@@ -812,8 +733,13 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	dlm_recoverd_stop(ls);
 
+	/* clear the LSFL_RUNNING flag to fast up
+	 * time_shutdown_sync(), we don't care anymore
+	 */
+	clear_bit(LSFL_RUNNING, &ls->ls_flags);
+	timer_shutdown_sync(&ls->ls_timer);
+
 	if (ls_count == 1) {
-		dlm_scand_stop();
 		dlm_clear_members(ls);
 		dlm_midcomms_shutdown();
 	}
@@ -839,9 +765,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
 
-	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
-		kfree(ls->ls_remove_names[i]);
-
 	while (!list_empty(&ls->ls_new_rsb)) {
 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 				       res_hashchain);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 6401916a97ef..c46e306f2e5c 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -641,6 +641,8 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	spin_lock_bh(&ls->ls_recover_lock);
 	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
+	if (new)
+		timer_delete_sync(&ls->ls_timer);
 	ls->ls_recover_seq++;
 
 	/* activate requestqueue and stop processing */
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index d43189532b14..960a14b95605 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -889,6 +889,11 @@ void dlm_clear_toss(struct dlm_ls *ls)
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
+
+		/* remove it from the toss queue if its part of it */
+		if (!list_empty(&r->res_toss_q_list))
+			list_del_init(&r->res_toss_q_list);
+
 		free_toss_rsb(r);
 		count++;
 	}
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 5e8e10030b74..dc3f1b45113f 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -82,6 +82,18 @@ static void dlm_release_root_list(struct list_head *root_list)
 	}
 }
 
+static void dlm_resume_next_toss(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r;
+
+	spin_lock_bh(&ls->ls_toss_q_lock);
+	r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+				     res_toss_q_list);
+	if (r)
+		mod_timer(&ls->ls_timer, r->res_toss_time);
+	spin_unlock_bh(&ls->ls_toss_q_lock);
+}
+
 /* If the start for which we're re-enabling locking (seq) has been superseded
    by a newer stop (ls_recover_seq), we need to leave locking disabled.
 
@@ -100,6 +112,8 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 		set_bit(LSFL_RUNNING, &ls->ls_flags);
 		/* unblocks processes waiting to enter the dlm */
 		up_write(&ls->ls_in_recovery);
+		/* schedule next timer if recovery put something on toss */
+		dlm_resume_next_toss(ls);
 		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 		error = 0;
 	}
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 8/9] dlm: likely read lock path for rsb lookup
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (6 preceding siblings ...)
  2024-04-12 13:43 ` [PATCH dlm/next 7/9] dlm: drop scand kthread and use timers Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  2024-04-12 14:53   ` Alexander Aring
  2024-04-12 13:43 ` [PATCH dlm/next 9/9] dlm: convert lkbidr to rwlock Alexander Aring
  8 siblings, 1 reply; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

As the conversion to rhashtable introduced a hash lock per lockspace
instead of doing a per bucket hash, this patch will change the hash lock
handling as holding only a read lock in a very likely hot path in DLM.

This hot path is to lookup rsbs and they are in keep state. Keep state
means that the RSB_TOSS flag isn't set. If this likely read lock path is
the case we don't need to hold the ls_rsbtbl_lock in write lock at all.
If it's on toss state holding the ls_rsbtbl_lock in write lock is
required. If we see that the rsb is in toss state, we hold the
ls_rsbtbl_lock in write state and relookup (because the rsb could get
removed into the transition from read to write state) and check if the
rsb is still in toss state. However this is an unlikely path as toss
rsbs are cached rsbs. Having the rsb in keep state is very likely.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |   4 +-
 fs/dlm/dir.c          |   4 +-
 fs/dlm/dlm_internal.h |   2 +-
 fs/dlm/lock.c         | 269 ++++++++++++++++++++++++++++++------------
 fs/dlm/lockspace.c    |   2 +-
 fs/dlm/recover.c      |   4 +-
 fs/dlm/recoverd.c     |   8 +-
 7 files changed, 206 insertions(+), 87 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 70567919f1b7..6ab3ed4074c6 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -413,7 +413,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 	else
 		list = &ls->ls_keep;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	return seq_list_start(list, *pos);
 }
 
@@ -434,7 +434,7 @@ static void table_seq_stop(struct seq_file *seq, void *iter_ptr)
 {
 	struct dlm_ls *ls = seq->private;
 
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static const struct seq_operations format1_seq_ops = {
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 9687f908476b..b1ab0adbd9d0 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -200,9 +200,9 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	struct dlm_rsb *r;
 	int rv;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 	if (!rv)
 		return r;
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 98a0ac511bc8..b675bffb61ae 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -585,7 +585,7 @@ struct dlm_ls {
 	spinlock_t		ls_lkbidr_spin;
 
 	struct rhashtable	ls_rsbtbl;
-	spinlock_t		ls_rsbtbl_lock;
+	rwlock_t		ls_rsbtbl_lock;
 
 	struct list_head	ls_toss;
 	struct list_head	ls_keep;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 419e7bdb3629..644f2b88f44a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -342,15 +342,15 @@ void dlm_hold_rsb(struct dlm_rsb *r)
 
 /* TODO move this to lib/refcount.c */
 static __must_check bool
-dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
 __cond_acquires(lock)
 {
 	if (refcount_dec_not_one(r))
 		return false;
 
-	spin_lock_bh(lock);
+	write_lock_bh(lock);
 	if (!refcount_dec_and_test(r)) {
-		spin_unlock_bh(lock);
+		write_unlock_bh(lock);
 		return false;
 	}
 
@@ -358,11 +358,11 @@ __cond_acquires(lock)
 }
 
 /* TODO move this to include/linux/kref.h */
-static inline int dlm_kref_put_lock_bh(struct kref *kref,
-				       void (*release)(struct kref *kref),
-				       spinlock_t *lock)
+static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
+					     void (*release)(struct kref *kref),
+					     rwlock_t *lock)
 {
-	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
 		release(kref);
 		return 1;
 	}
@@ -378,10 +378,10 @@ static void put_rsb(struct dlm_rsb *r)
 	struct dlm_ls *ls = r->res_ls;
 	int rv;
 
-	rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
-				  &ls->ls_rsbtbl_lock);
+	rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
+					&ls->ls_rsbtbl_lock);
 	if (rv)
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -572,7 +572,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
 		 * lock, however this is only a trylock if we hit some
 		 * possible contention we try it again.
 		 */
-		rv = spin_trylock(&ls->ls_rsbtbl_lock);
+		rv = write_trylock(&ls->ls_rsbtbl_lock);
 		if (!rv) {
 			spin_unlock(&ls->ls_toss_q_lock);
 			/* rearm again try timer */
@@ -587,7 +587,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
 		/* not necessary to held the ls_rsbtbl_lock when
 		 * calling send_remove()
 		 */
-		spin_unlock(&ls->ls_rsbtbl_lock);
+		write_unlock(&ls->ls_rsbtbl_lock);
 
 		/* remove the rsb out of the toss queue its gone
 		 * drom DLM now
@@ -671,16 +671,8 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
 
 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-	int rv;
-
-	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
-				    dlm_rhash_rsb_params);
-	if (rv == -EEXIST) {
-		log_print("%s match", __func__);
-		dlm_dump_rsb(rsb);
-	}
-
-	return rv;
+	return rhashtable_insert_fast(rhash, &rsb->res_node,
+				      dlm_rhash_rsb_params);
 }
 
 /*
@@ -775,24 +767,47 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 			goto out;
 	}
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
 
+	/* check if the rsb is in keep state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (error)
+	if (error) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto do_new;
+	}
 	
 	/*
 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
 	 */
 
-	if (rsb_flag(r, RSB_TOSS))
+	if (rsb_flag(r, RSB_TOSS)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto do_toss;
+	}
 
 	kref_get(&r->res_ref);
-	goto out_unlock;
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+	goto out;
 
 
  do_toss:
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+
+	/* retry lookup under write lock to see if its still in toss state
+	 * if not it's in keep state and we relookup - unlikely path.
+	 */
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (!error) {
+		if (!rsb_flag(r, RSB_TOSS)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry_lookup;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
+
 	/*
 	 * rsb found inactive (master_nodeid may be out of date unless
 	 * we are the dir_nodeid or were the master)  No other thread
@@ -806,8 +821,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
 			  r->res_name);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		error = -ENOTBLK;
-		goto out_unlock;
+		goto out;
 	}
 
 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
@@ -837,9 +853,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	 */
 	kref_init(&r->res_ref);
 	rsb_delete_toss_timer(ls, r);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	goto out_unlock;
+	goto out;
 
 
  do_new:
@@ -848,15 +864,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	 */
 
 	if (error == -EBADR && !create)
-		goto out_unlock;
+		goto out;
 
 	error = get_rsb_struct(ls, name, len, &r);
-	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	if (error == -EAGAIN)
 		goto retry;
-	}
 	if (error)
-		goto out_unlock;
+		goto out;
 
 	r->res_hash = hash;
 	r->res_dir_nodeid = dir_nodeid;
@@ -878,7 +892,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		dlm_free_rsb(r);
 		r = NULL;
 		error = -ENOTBLK;
-		goto out_unlock;
+		goto out;
 	}
 
 	if (from_other) {
@@ -898,11 +912,20 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	}
 
  out_add:
+
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 	error = rsb_insert(r, &ls->ls_rsbtbl);
-	if (!error)
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry_lookup;
+	} else if (!error) {
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
- out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	}
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	*r_ret = r;
 	return error;
@@ -926,24 +949,49 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	if (error < 0)
 		goto out;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
 
+	/* check if the rsb is in keep state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (error)
+	if (error) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto do_new;
+	}
 
-	if (rsb_flag(r, RSB_TOSS))
+	if (rsb_flag(r, RSB_TOSS)) {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto do_toss;
+	}
 
 	/*
 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
 	 */
 
 	kref_get(&r->res_ref);
-	goto out_unlock;
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
+
+	goto out;
 
 
  do_toss:
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+
+	/* retry lookup under write lock to see if its still in toss state
+	 * if not it's in keep state and we relookup - unlikely path.
+	 */
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (!error) {
+		if (!rsb_flag(r, RSB_TOSS)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			goto retry_lookup;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto do_new;
+	}
+
+
 	/*
 	 * rsb found inactive. No other thread is using this rsb because
 	 * it's on the toss list, so we can look at or update
@@ -956,8 +1004,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
 		dlm_print_rsb(r);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		error = -ENOTBLK;
-		goto out_unlock;
+		goto out;
 	}
 
 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
@@ -979,9 +1028,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	 */
 	kref_init(&r->res_ref);
 	rsb_delete_toss_timer(ls, r);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
-	goto out_unlock;
+	goto out;
 
 
  do_new:
@@ -991,11 +1040,10 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 	if (error)
-		goto out_unlock;
+		goto out;
 
 	r->res_hash = hash;
 	r->res_dir_nodeid = dir_nodeid;
@@ -1003,11 +1051,20 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 	error = rsb_insert(r, &ls->ls_rsbtbl);
-	if (!error)
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry_lookup;
+	} else if (!error) {
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
- out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	}
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
+
  out:
 	*r_ret = r;
 	return error;
@@ -1220,18 +1277,23 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error < 0)
 		return error;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
+
+	/* check if the rsb is in keep state under read lock - likely path */
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error) {
-		if (rsb_flag(r, RSB_TOSS))
+		if (rsb_flag(r, RSB_TOSS)) {
+			read_unlock_bh(&ls->ls_rsbtbl_lock);
 			goto do_toss;
+		}
 
 		/* because the rsb is active, we need to lock_rsb before
 		 * checking/changing re_master_nodeid
 		 */
 
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		lock_rsb(r);
 
 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1243,10 +1305,31 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 		return 0;
 	} else {
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto not_found;
 	}
 
  do_toss:
+	/* unlikely path - relookup under write */
+	write_lock_bh(&ls->ls_rsbtbl_lock);
+
+	/* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
+	 * check if the rsb is still in toss state, if not relookup
+	 */
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+	if (!error) {
+		if (!rsb_flag(r, RSB_TOSS)) {
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
+			/* something as changed, very unlikely but
+			 * try again
+			 */
+			goto retry_lookup;
+		}
+	} else {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		goto not_found;
+	}
+
 	/* because the rsb is inactive (on toss list), it's not refcounted
 	 * and lock_rsb is not used, but is protected by the rsbtbl lock
 	 */
@@ -1256,18 +1339,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 	rsb_mod_timer(ls, r);
 	/* the rsb was inactive (on toss list) */
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	return 0;
 
  not_found:
 	error = get_rsb_struct(ls, name, len, &r);
-	if (error == -EAGAIN) {
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	if (error == -EAGAIN)
 		goto retry;
-	}
 	if (error)
-		goto out_unlock;
+		goto out;
 
 	r->res_hash = hash;
 	r->res_dir_nodeid = our_nodeid;
@@ -1276,22 +1357,30 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	kref_init(&r->res_ref);
 	rsb_set_flag(r, RSB_TOSS);
 
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 	error = rsb_insert(r, &ls->ls_rsbtbl);
-	if (error) {
+	if (error == -EEXIST) {
+		/* somebody else was faster and it seems the
+		 * rsb exists now, we do a whole relookup
+		 */
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		dlm_free_rsb(r);
+		goto retry_lookup;
+	} else if (error) {
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		/* should never happen */
 		dlm_free_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto retry;
 	}
 
 	list_add(&r->res_rsbs_list, &ls->ls_toss);
 	rsb_mod_timer(ls, r);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (result)
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
- out_unlock:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
 	return error;
 }
 
@@ -1299,12 +1388,12 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 {
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (r->res_hash == hash)
 			dlm_dump_rsb(r);
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
@@ -1312,14 +1401,14 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	struct dlm_rsb *r = NULL;
 	int error;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (!error)
 		goto out;
 
 	dlm_dump_rsb(r);
  out:
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void toss_rsb(struct kref *kref)
@@ -1447,6 +1536,36 @@ static void kill_lkb(struct kref *kref)
 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 }
 
+/* TODO move this to lib/refcount.c */
+static __must_check bool
+dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+__cond_acquires(lock)
+{
+	if (refcount_dec_not_one(r))
+		return false;
+
+	spin_lock_bh(lock);
+	if (!refcount_dec_and_test(r)) {
+		spin_unlock_bh(lock);
+		return false;
+	}
+
+	return true;
+}
+
+/* TODO move this to include/linux/kref.h */
+static inline int dlm_kref_put_lock_bh(struct kref *kref,
+				       void (*release)(struct kref *kref),
+				       spinlock_t *lock)
+{
+	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+		release(kref);
+		return 1;
+	}
+
+	return 0;
+}
+
 /* __put_lkb() is used when an lkb may not have an rsb attached to
    it so we need to provide the lockspace explicitly */
 
@@ -4216,14 +4335,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	memset(name, 0, sizeof(name));
 	memcpy(name, ms->m_extra, len);
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
 	if (rv) {
 		/* should not happen */
 		log_error(ls, "%s from %d not found %s", __func__,
 			  from_nodeid, name);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -4233,14 +4352,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			log_error(ls, "receive_remove keep from %d master %d",
 				  from_nodeid, r->res_master_nodeid);
 			dlm_print_rsb(r);
-			spin_unlock_bh(&ls->ls_rsbtbl_lock);
+			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			return;
 		}
 
 		log_debug(ls, "receive_remove from %d master %d first %x %s",
 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
 			  name);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
@@ -4248,14 +4367,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		log_error(ls, "receive_remove toss from %d master %d",
 			  from_nodeid, r->res_master_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		return;
 	}
 
 	list_del(&r->res_rsbs_list);
 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 			       dlm_rhash_rsb_params);
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	free_toss_rsb(r);
 }
@@ -5323,7 +5442,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
 			continue;
@@ -5332,10 +5451,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
 			continue;
 		}
 		hold_rsb(r);
-		spin_unlock_bh(&ls->ls_rsbtbl_lock);
+		read_unlock_bh(&ls->ls_rsbtbl_lock);
 		return r;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 	return NULL;
 }
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 931eb3f22ec6..04f4c74831ce 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -424,7 +424,7 @@ static int new_lockspace(const char *name, const char *cluster,
 
 	INIT_LIST_HEAD(&ls->ls_toss);
 	INIT_LIST_HEAD(&ls->ls_keep);
-	spin_lock_init(&ls->ls_rsbtbl_lock);
+	rwlock_init(&ls->ls_rsbtbl_lock);
 
 	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
 	if (error)
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 960a14b95605..f493d5f30c58 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -884,7 +884,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	struct dlm_rsb *r, *safe;
 	unsigned int count = 0;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	write_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
@@ -897,7 +897,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 		free_toss_rsb(r);
 		count++;
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	if (count)
 		log_rinfo(ls, "dlm_clear_toss %u done", count);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index dc3f1b45113f..8a243145b7d3 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -32,7 +32,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		goto out;
 	}
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		if (r->res_nodeid)
 			continue;
@@ -40,7 +40,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		list_add(&r->res_masters_list, &ls->ls_masters_list);
 		dlm_hold_rsb(r);
 	}
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 	write_unlock_bh(&ls->ls_masters_lock);
 	return error;
@@ -62,14 +62,14 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_rsbtbl_lock);
+	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
 		list_add(&r->res_root_list, root_list);
 		dlm_hold_rsb(r);
 	}
 
 	WARN_ON_ONCE(!list_empty(&ls->ls_toss));
-	spin_unlock_bh(&ls->ls_rsbtbl_lock);
+	read_unlock_bh(&ls->ls_rsbtbl_lock);
 }
 
 static void dlm_release_root_list(struct list_head *root_list)
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH dlm/next 9/9] dlm: convert lkbidr to rwlock
  2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
                   ` (7 preceding siblings ...)
  2024-04-12 13:43 ` [PATCH dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
@ 2024-04-12 13:43 ` Alexander Aring
  8 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 13:43 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts lkbidr lock to rwlock. The most time we doing
lookups of idr only this can be done by holding the read lock only.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lock.c         | 44 +++++++------------------------------------
 fs/dlm/lockspace.c    |  6 +++---
 3 files changed, 11 insertions(+), 41 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index b675bffb61ae..19e57cbd5b13 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -582,7 +582,7 @@ struct dlm_ls {
 	struct kobject		ls_kobj;
 
 	struct idr		ls_lkbidr;
-	spinlock_t		ls_lkbidr_spin;
+	rwlock_t		ls_lkbidr_lock;
 
 	struct rhashtable	ls_rsbtbl;
 	rwlock_t		ls_rsbtbl_lock;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 644f2b88f44a..67414c3db0ad 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1491,11 +1491,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 
-	spin_lock_bh(&ls->ls_lkbidr_spin);
+	write_lock_bh(&ls->ls_lkbidr_lock);
 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
 	if (rv >= 0)
 		lkb->lkb_id = rv;
-	spin_unlock_bh(&ls->ls_lkbidr_spin);
+	write_unlock_bh(&ls->ls_lkbidr_lock);
 
 	if (rv < 0) {
 		log_error(ls, "create_lkb idr error %d", rv);
@@ -1516,11 +1516,11 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 {
 	struct dlm_lkb *lkb;
 
-	spin_lock_bh(&ls->ls_lkbidr_spin);
+	read_lock_bh(&ls->ls_lkbidr_lock);
 	lkb = idr_find(&ls->ls_lkbidr, lkid);
 	if (lkb)
 		kref_get(&lkb->lkb_ref);
-	spin_unlock_bh(&ls->ls_lkbidr_spin);
+	read_unlock_bh(&ls->ls_lkbidr_lock);
 
 	*lkb_ret = lkb;
 	return lkb ? 0 : -ENOENT;
@@ -1536,36 +1536,6 @@ static void kill_lkb(struct kref *kref)
 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 }
 
-/* TODO move this to lib/refcount.c */
-static __must_check bool
-dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
-__cond_acquires(lock)
-{
-	if (refcount_dec_not_one(r))
-		return false;
-
-	spin_lock_bh(lock);
-	if (!refcount_dec_and_test(r)) {
-		spin_unlock_bh(lock);
-		return false;
-	}
-
-	return true;
-}
-
-/* TODO move this to include/linux/kref.h */
-static inline int dlm_kref_put_lock_bh(struct kref *kref,
-				       void (*release)(struct kref *kref),
-				       spinlock_t *lock)
-{
-	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
-		release(kref);
-		return 1;
-	}
-
-	return 0;
-}
-
 /* __put_lkb() is used when an lkb may not have an rsb attached to
    it so we need to provide the lockspace explicitly */
 
@@ -1574,11 +1544,11 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 	uint32_t lkid = lkb->lkb_id;
 	int rv;
 
-	rv = dlm_kref_put_lock_bh(&lkb->lkb_ref, kill_lkb,
-				  &ls->ls_lkbidr_spin);
+	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
+					&ls->ls_lkbidr_lock);
 	if (rv) {
 		idr_remove(&ls->ls_lkbidr, lkid);
-		spin_unlock_bh(&ls->ls_lkbidr_spin);
+		write_unlock_bh(&ls->ls_lkbidr_lock);
 
 		detach_lkb(lkb);
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 04f4c74831ce..5ce26882159e 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -431,7 +431,7 @@ static int new_lockspace(const char *name, const char *cluster,
 		goto out_lsfree;
 
 	idr_init(&ls->ls_lkbidr);
-	spin_lock_init(&ls->ls_lkbidr_spin);
+	rwlock_init(&ls->ls_lkbidr_lock);
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
 	spin_lock_init(&ls->ls_waiters_lock);
@@ -676,7 +676,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 {
 	int rv;
 
-	spin_lock_bh(&ls->ls_lkbidr_spin);
+	read_lock_bh(&ls->ls_lkbidr_lock);
 	if (force == 0) {
 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 	} else if (force == 1) {
@@ -684,7 +684,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 	} else {
 		rv = 0;
 	}
-	spin_unlock_bh(&ls->ls_lkbidr_spin);
+	read_unlock_bh(&ls->ls_lkbidr_lock);
 	return rv;
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* Re: [PATCH dlm/next 8/9] dlm: likely read lock path for rsb lookup
  2024-04-12 13:43 ` [PATCH dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
@ 2024-04-12 14:53   ` Alexander Aring
  2024-04-12 19:24     ` Alexander Aring
  0 siblings, 1 reply; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 14:53 UTC (permalink / raw
  To: teigland; +Cc: gfs2

Hi,

On Fri, Apr 12, 2024 at 9:43 AM Alexander Aring <aahringo@redhat.com> wrote:
>
> As the conversion to rhashtable introduced a hash lock per lockspace
> instead of doing a per bucket hash, this patch will change the hash lock
> handling as holding only a read lock in a very likely hot path in DLM.
>
> This hot path is to lookup rsbs and they are in keep state. Keep state
> means that the RSB_TOSS flag isn't set. If this likely read lock path is
> the case we don't need to hold the ls_rsbtbl_lock in write lock at all.
> If it's on toss state holding the ls_rsbtbl_lock in write lock is
> required. If we see that the rsb is in toss state, we hold the
> ls_rsbtbl_lock in write state and relookup (because the rsb could get
> removed into the transition from read to write state) and check if the
> rsb is still in toss state. However this is an unlikely path as toss
> rsbs are cached rsbs. Having the rsb in keep state is very likely.
>
> Signed-off-by: Alexander Aring <aahringo@redhat.com>
> ---
>  fs/dlm/debug_fs.c     |   4 +-
>  fs/dlm/dir.c          |   4 +-
>  fs/dlm/dlm_internal.h |   2 +-
>  fs/dlm/lock.c         | 269 ++++++++++++++++++++++++++++++------------
>  fs/dlm/lockspace.c    |   2 +-
>  fs/dlm/recover.c      |   4 +-
>  fs/dlm/recoverd.c     |   8 +-
>  7 files changed, 206 insertions(+), 87 deletions(-)
>
> diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
> index 70567919f1b7..6ab3ed4074c6 100644
> --- a/fs/dlm/debug_fs.c
> +++ b/fs/dlm/debug_fs.c
> @@ -413,7 +413,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
>         else
>                 list = &ls->ls_keep;
>
> -       spin_lock_bh(&ls->ls_rsbtbl_lock);
> +       read_lock_bh(&ls->ls_rsbtbl_lock);
>         return seq_list_start(list, *pos);
>  }
>
> @@ -434,7 +434,7 @@ static void table_seq_stop(struct seq_file *seq, void *iter_ptr)
>  {
>         struct dlm_ls *ls = seq->private;
>
> -       spin_unlock_bh(&ls->ls_rsbtbl_lock);
> +       read_unlock_bh(&ls->ls_rsbtbl_lock);
>  }
>
>  static const struct seq_operations format1_seq_ops = {
> diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
> index 9687f908476b..b1ab0adbd9d0 100644
> --- a/fs/dlm/dir.c
> +++ b/fs/dlm/dir.c
> @@ -200,9 +200,9 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
>         struct dlm_rsb *r;
>         int rv;
>
> -       spin_lock_bh(&ls->ls_rsbtbl_lock);
> +       read_lock_bh(&ls->ls_rsbtbl_lock);
>         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
> -       spin_unlock_bh(&ls->ls_rsbtbl_lock);
> +       read_unlock_bh(&ls->ls_rsbtbl_lock);
>         if (!rv)
>                 return r;
>
> diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
> index 98a0ac511bc8..b675bffb61ae 100644
> --- a/fs/dlm/dlm_internal.h
> +++ b/fs/dlm/dlm_internal.h
> @@ -585,7 +585,7 @@ struct dlm_ls {
>         spinlock_t              ls_lkbidr_spin;
>
>         struct rhashtable       ls_rsbtbl;
> -       spinlock_t              ls_rsbtbl_lock;
> +       rwlock_t                ls_rsbtbl_lock;
>
>         struct list_head        ls_toss;
>         struct list_head        ls_keep;
> diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
> index 419e7bdb3629..644f2b88f44a 100644
> --- a/fs/dlm/lock.c
> +++ b/fs/dlm/lock.c
> @@ -342,15 +342,15 @@ void dlm_hold_rsb(struct dlm_rsb *r)
>
>  /* TODO move this to lib/refcount.c */
>  static __must_check bool
> -dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
> +dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
>  __cond_acquires(lock)
>  {
>         if (refcount_dec_not_one(r))
>                 return false;
>
> -       spin_lock_bh(lock);
> +       write_lock_bh(lock);
>         if (!refcount_dec_and_test(r)) {
> -               spin_unlock_bh(lock);
> +               write_unlock_bh(lock);
>                 return false;
>         }
>
> @@ -358,11 +358,11 @@ __cond_acquires(lock)
>  }
>
>  /* TODO move this to include/linux/kref.h */
> -static inline int dlm_kref_put_lock_bh(struct kref *kref,
> -                                      void (*release)(struct kref *kref),
> -                                      spinlock_t *lock)
> +static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
> +                                            void (*release)(struct kref *kref),
> +                                            rwlock_t *lock)
>  {
> -       if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
> +       if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
>                 release(kref);
>                 return 1;
>         }
> @@ -378,10 +378,10 @@ static void put_rsb(struct dlm_rsb *r)
>         struct dlm_ls *ls = r->res_ls;
>         int rv;
>
> -       rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
> -                                 &ls->ls_rsbtbl_lock);
> +       rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
> +                                       &ls->ls_rsbtbl_lock);
>         if (rv)
> -               spin_unlock_bh(&ls->ls_rsbtbl_lock);
> +               write_unlock_bh(&ls->ls_rsbtbl_lock);
>  }
>
>  void dlm_put_rsb(struct dlm_rsb *r)
> @@ -572,7 +572,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
>                  * lock, however this is only a trylock if we hit some
>                  * possible contention we try it again.
>                  */
> -               rv = spin_trylock(&ls->ls_rsbtbl_lock);
> +               rv = write_trylock(&ls->ls_rsbtbl_lock);
>                 if (!rv) {
>                         spin_unlock(&ls->ls_toss_q_lock);
>                         /* rearm again try timer */
> @@ -587,7 +587,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
>                 /* not necessary to held the ls_rsbtbl_lock when
>                  * calling send_remove()
>                  */
> -               spin_unlock(&ls->ls_rsbtbl_lock);
> +               write_unlock(&ls->ls_rsbtbl_lock);
>
>                 /* remove the rsb out of the toss queue its gone
>                  * drom DLM now
> @@ -671,16 +671,8 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
>
>  static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
>  {
> -       int rv;
> -
> -       rv = rhashtable_insert_fast(rhash, &rsb->res_node,
> -                                   dlm_rhash_rsb_params);
> -       if (rv == -EEXIST) {
> -               log_print("%s match", __func__);
> -               dlm_dump_rsb(rsb);
> -       }
> -
> -       return rv;
> +       return rhashtable_insert_fast(rhash, &rsb->res_node,
> +                                     dlm_rhash_rsb_params);
>  }
>
>  /*
> @@ -775,24 +767,47 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
>                         goto out;
>         }
>
> -       spin_lock_bh(&ls->ls_rsbtbl_lock);
> + retry_lookup:
>
> +       /* check if the rsb is in keep state under read lock - likely path */
> +       read_lock_bh(&ls->ls_rsbtbl_lock);
>         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
> -       if (error)
> +       if (error) {
> +               read_unlock_bh(&ls->ls_rsbtbl_lock);
>                 goto do_new;
> +       }
>
>         /*
>          * rsb is active, so we can't check master_nodeid without lock_rsb.
>          */
>
> -       if (rsb_flag(r, RSB_TOSS))
> +       if (rsb_flag(r, RSB_TOSS)) {
> +               read_unlock_bh(&ls->ls_rsbtbl_lock);
>                 goto do_toss;
> +       }
>
>         kref_get(&r->res_ref);
> -       goto out_unlock;
> +       read_unlock_bh(&ls->ls_rsbtbl_lock);
> +       goto out;
>
>
>   do_toss:
> +       write_lock_bh(&ls->ls_rsbtbl_lock);
> +

There might be improvements with rhashtable per bucket locks, but this
is now simple to do and the most part it is in read lock only anyway.
I need to think more about this because ls_toss/ls_keep list need to
be addressed here because are associated if the rsb is still part of
the hash.

> +       /* retry lookup under write lock to see if its still in toss state
> +        * if not it's in keep state and we relookup - unlikely path.
> +        */
> +       error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);

we don't need to do a lookup again, we only need to check if the rsb
is still part of the hashtable, there is no other hashtable it can be
part of.

- Alex


^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH dlm/next 8/9] dlm: likely read lock path for rsb lookup
  2024-04-12 14:53   ` Alexander Aring
@ 2024-04-12 19:24     ` Alexander Aring
  0 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-04-12 19:24 UTC (permalink / raw
  To: teigland; +Cc: gfs2

Hi,

On Fri, Apr 12, 2024 at 10:53 AM Alexander Aring <aahringo@redhat.com> wrote:
>
> Hi,
>
> On Fri, Apr 12, 2024 at 9:43 AM Alexander Aring <aahringo@redhat.com> wrote:
> >
> > As the conversion to rhashtable introduced a hash lock per lockspace
> > instead of doing a per bucket hash, this patch will change the hash lock
> > handling as holding only a read lock in a very likely hot path in DLM.
> >
> > This hot path is to lookup rsbs and they are in keep state. Keep state
> > means that the RSB_TOSS flag isn't set. If this likely read lock path is
> > the case we don't need to hold the ls_rsbtbl_lock in write lock at all.
> > If it's on toss state holding the ls_rsbtbl_lock in write lock is
> > required. If we see that the rsb is in toss state, we hold the
> > ls_rsbtbl_lock in write state and relookup (because the rsb could get
> > removed into the transition from read to write state) and check if the
> > rsb is still in toss state. However this is an unlikely path as toss
> > rsbs are cached rsbs. Having the rsb in keep state is very likely.
> >
> > Signed-off-by: Alexander Aring <aahringo@redhat.com>
> > ---
> >  fs/dlm/debug_fs.c     |   4 +-
> >  fs/dlm/dir.c          |   4 +-
> >  fs/dlm/dlm_internal.h |   2 +-
> >  fs/dlm/lock.c         | 269 ++++++++++++++++++++++++++++++------------
> >  fs/dlm/lockspace.c    |   2 +-
> >  fs/dlm/recover.c      |   4 +-
> >  fs/dlm/recoverd.c     |   8 +-
> >  7 files changed, 206 insertions(+), 87 deletions(-)
> >
> > diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
> > index 70567919f1b7..6ab3ed4074c6 100644
> > --- a/fs/dlm/debug_fs.c
> > +++ b/fs/dlm/debug_fs.c
> > @@ -413,7 +413,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
> >         else
> >                 list = &ls->ls_keep;
> >
> > -       spin_lock_bh(&ls->ls_rsbtbl_lock);
> > +       read_lock_bh(&ls->ls_rsbtbl_lock);
> >         return seq_list_start(list, *pos);
> >  }
> >
> > @@ -434,7 +434,7 @@ static void table_seq_stop(struct seq_file *seq, void *iter_ptr)
> >  {
> >         struct dlm_ls *ls = seq->private;
> >
> > -       spin_unlock_bh(&ls->ls_rsbtbl_lock);
> > +       read_unlock_bh(&ls->ls_rsbtbl_lock);
> >  }
> >
> >  static const struct seq_operations format1_seq_ops = {
> > diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
> > index 9687f908476b..b1ab0adbd9d0 100644
> > --- a/fs/dlm/dir.c
> > +++ b/fs/dlm/dir.c
> > @@ -200,9 +200,9 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
> >         struct dlm_rsb *r;
> >         int rv;
> >
> > -       spin_lock_bh(&ls->ls_rsbtbl_lock);
> > +       read_lock_bh(&ls->ls_rsbtbl_lock);
> >         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
> > -       spin_unlock_bh(&ls->ls_rsbtbl_lock);
> > +       read_unlock_bh(&ls->ls_rsbtbl_lock);
> >         if (!rv)
> >                 return r;
> >
> > diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
> > index 98a0ac511bc8..b675bffb61ae 100644
> > --- a/fs/dlm/dlm_internal.h
> > +++ b/fs/dlm/dlm_internal.h
> > @@ -585,7 +585,7 @@ struct dlm_ls {
> >         spinlock_t              ls_lkbidr_spin;
> >
> >         struct rhashtable       ls_rsbtbl;
> > -       spinlock_t              ls_rsbtbl_lock;
> > +       rwlock_t                ls_rsbtbl_lock;
> >
> >         struct list_head        ls_toss;
> >         struct list_head        ls_keep;
> > diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
> > index 419e7bdb3629..644f2b88f44a 100644
> > --- a/fs/dlm/lock.c
> > +++ b/fs/dlm/lock.c
> > @@ -342,15 +342,15 @@ void dlm_hold_rsb(struct dlm_rsb *r)
> >
> >  /* TODO move this to lib/refcount.c */
> >  static __must_check bool
> > -dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
> > +dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
> >  __cond_acquires(lock)
> >  {
> >         if (refcount_dec_not_one(r))
> >                 return false;
> >
> > -       spin_lock_bh(lock);
> > +       write_lock_bh(lock);
> >         if (!refcount_dec_and_test(r)) {
> > -               spin_unlock_bh(lock);
> > +               write_unlock_bh(lock);
> >                 return false;
> >         }
> >
> > @@ -358,11 +358,11 @@ __cond_acquires(lock)
> >  }
> >
> >  /* TODO move this to include/linux/kref.h */
> > -static inline int dlm_kref_put_lock_bh(struct kref *kref,
> > -                                      void (*release)(struct kref *kref),
> > -                                      spinlock_t *lock)
> > +static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
> > +                                            void (*release)(struct kref *kref),
> > +                                            rwlock_t *lock)
> >  {
> > -       if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
> > +       if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
> >                 release(kref);
> >                 return 1;
> >         }
> > @@ -378,10 +378,10 @@ static void put_rsb(struct dlm_rsb *r)
> >         struct dlm_ls *ls = r->res_ls;
> >         int rv;
> >
> > -       rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
> > -                                 &ls->ls_rsbtbl_lock);
> > +       rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
> > +                                       &ls->ls_rsbtbl_lock);
> >         if (rv)
> > -               spin_unlock_bh(&ls->ls_rsbtbl_lock);
> > +               write_unlock_bh(&ls->ls_rsbtbl_lock);
> >  }
> >
> >  void dlm_put_rsb(struct dlm_rsb *r)
> > @@ -572,7 +572,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
> >                  * lock, however this is only a trylock if we hit some
> >                  * possible contention we try it again.
> >                  */
> > -               rv = spin_trylock(&ls->ls_rsbtbl_lock);
> > +               rv = write_trylock(&ls->ls_rsbtbl_lock);
> >                 if (!rv) {
> >                         spin_unlock(&ls->ls_toss_q_lock);
> >                         /* rearm again try timer */
> > @@ -587,7 +587,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
> >                 /* not necessary to held the ls_rsbtbl_lock when
> >                  * calling send_remove()
> >                  */
> > -               spin_unlock(&ls->ls_rsbtbl_lock);
> > +               write_unlock(&ls->ls_rsbtbl_lock);
> >
> >                 /* remove the rsb out of the toss queue its gone
> >                  * drom DLM now
> > @@ -671,16 +671,8 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
> >
> >  static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
> >  {
> > -       int rv;
> > -
> > -       rv = rhashtable_insert_fast(rhash, &rsb->res_node,
> > -                                   dlm_rhash_rsb_params);
> > -       if (rv == -EEXIST) {
> > -               log_print("%s match", __func__);
> > -               dlm_dump_rsb(rsb);
> > -       }
> > -
> > -       return rv;
> > +       return rhashtable_insert_fast(rhash, &rsb->res_node,
> > +                                     dlm_rhash_rsb_params);
> >  }
> >
> >  /*
> > @@ -775,24 +767,47 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
> >                         goto out;
> >         }
> >
> > -       spin_lock_bh(&ls->ls_rsbtbl_lock);
> > + retry_lookup:
> >
> > +       /* check if the rsb is in keep state under read lock - likely path */
> > +       read_lock_bh(&ls->ls_rsbtbl_lock);
> >         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
> > -       if (error)
> > +       if (error) {
> > +               read_unlock_bh(&ls->ls_rsbtbl_lock);
> >                 goto do_new;
> > +       }
> >
> >         /*
> >          * rsb is active, so we can't check master_nodeid without lock_rsb.
> >          */
> >
> > -       if (rsb_flag(r, RSB_TOSS))
> > +       if (rsb_flag(r, RSB_TOSS)) {
> > +               read_unlock_bh(&ls->ls_rsbtbl_lock);
> >                 goto do_toss;
> > +       }
> >
> >         kref_get(&r->res_ref);
> > -       goto out_unlock;
> > +       read_unlock_bh(&ls->ls_rsbtbl_lock);
> > +       goto out;
> >
> >
> >   do_toss:
> > +       write_lock_bh(&ls->ls_rsbtbl_lock);
> > +
>
> There might be improvements with rhashtable per bucket locks, but this
> is now simple to do and the most part it is in read lock only anyway.
> I need to think more about this because ls_toss/ls_keep list need to
> be addressed here because are associated if the rsb is still part of
> the hash.
>
> > +       /* retry lookup under write lock to see if its still in toss state
> > +        * if not it's in keep state and we relookup - unlikely path.
> > +        */
> > +       error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
>
> we don't need to do a lookup again, we only need to check if the rsb
> is still part of the hashtable, there is no other hashtable it can be
> part of.

We can't do that, because others removing the rsb from the hash during
the read_unlock() and write_lock() time will be freeing it and this
causes a use after free when we check if the rsb is still part of a
hash. We can prevent this case by holding the rcu read lock (without
introducing ref counters for toss rsbs again) and freeing the rsb with
call_rcu() but this might be something for later...

- Alex


^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2024-04-12 19:24 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-04-12 13:43 [PATCH dlm/next 0/9] dlm: sand fix, rhashtable, timers and lookup hotpath speedup Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 1/9] dlm: increment ls_count on find_ls_to_scan() Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 2/9] dlm: change to non per bucket hashtable lock Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 3/9] dlm: merge toss and keep hash into one Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 4/9] dlm: fix avoid rsb hold during debugfs dump Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 5/9] dlm: switch to use rhashtable for rsbs Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 6/9] dlm: remove refcounting if rsb is on toss Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 7/9] dlm: drop scand kthread and use timers Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 8/9] dlm: likely read lock path for rsb lookup Alexander Aring
2024-04-12 14:53   ` Alexander Aring
2024-04-12 19:24     ` Alexander Aring
2024-04-12 13:43 ` [PATCH dlm/next 9/9] dlm: convert lkbidr to rwlock Alexander Aring

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).