gfs2.lists.linux.dev archive mirror
 help / color / mirror / Atom feed
* [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE
@ 2024-06-03 21:55 Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 1/8] dlm: using rcu to avoid rsb lookup again Alexander Aring
                   ` (7 more replies)
  0 siblings, 8 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

Hi,

the first three patches are resend with an updated commit message and
additional comment e.g. to explain the rcu_read_lock() handling to
avoid a lookup twice.

The main part are the rest of the patches introducing the
DLM_LSFL_SOFTIRQ_SAFE flag to signal that ast/bast callbacks can be
handled in softirq context. md-cluster is the first user here to take
advantage about this flag as their callback handling is simple enough
and looks okay to be called from softirq context. To be honest, I also
just need a upstream user for the new flag. Other users, e.g. gfs2 can
take also advantage of it.

I also gave it a try with a debug kernel and used md-cluster with 2 block
devices and level mirrored. I used gfs2 on top of it and it seems to run
without any problems.

This patch series is based on:

https://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm.git/log/?h=next

in case of md-cluster people want to try it out.

Thanks.

- Alex

Alexander Aring (8):
  dlm: using rcu to avoid rsb lookup again
  dlm: remove struct field with the same meaning
  dlm: use is_master() on checks if we are the master
  dlm: use LSFL_FS to check if it's a kernel lockspace
  dlm: introduce DLM_LSFL_SOFTIRQ_SAFE
  dlm: implement LSFL_SOFTIRQ_SAFE
  dlm: convert ls_cb_lock to rwlock
  md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()

 drivers/md/md-cluster.c  |   2 +-
 fs/dlm/ast.c             | 185 ++++++++++++++++++---------
 fs/dlm/ast.h             |  11 +-
 fs/dlm/debug_fs.c        |  33 ++---
 fs/dlm/dlm_internal.h    |  40 +++++-
 fs/dlm/lock.c            | 268 +++++++++++++++++++++++----------------
 fs/dlm/lock.h            |   5 +-
 fs/dlm/lockspace.c       |  23 ++--
 fs/dlm/memory.c          |  15 ++-
 fs/dlm/rcom.c            |   4 +-
 fs/dlm/recover.c         |  20 +--
 fs/dlm/recoverd.c        |   2 +-
 fs/dlm/user.c            |  38 +++---
 include/linux/dlm.h      |  17 ++-
 include/uapi/linux/dlm.h |   2 +
 15 files changed, 413 insertions(+), 252 deletions(-)

-- 
2.43.0


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 1/8] dlm: using rcu to avoid rsb lookup again
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 2/8] dlm: remove struct field with the same meaning Alexander Aring
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

This patch converts the rsb to be protected under rcu. When the rcu lock
is held it cannot be freed. In combination with the new introduced flag
RSB_HASHED we can check if the rsb is still part of the ls_rsbtbl
hashtable, it cannot be part of another table. If its still part of the
ls_rsbtbl we can avoid a second dlm_search_rsb_tree() call.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |   3 ++
 fs/dlm/lock.c         | 117 ++++++++++++++++++++++++++++++++++--------
 fs/dlm/memory.c       |  15 +++++-
 3 files changed, 112 insertions(+), 23 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 9e68e68bf0cf..6f578527c6d8 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -333,6 +333,7 @@ struct dlm_rsb {
 	struct list_head	res_recover_list;   /* used for recovery */
 	struct list_head	res_toss_q_list;
 	int			res_recover_locks_count;
+	struct rcu_head		rcu;
 
 	char			*res_lvbptr;
 	char			res_name[DLM_RESNAME_MAXLEN+1];
@@ -366,6 +367,8 @@ enum rsb_flags {
 	RSB_RECOVER_GRANT,
 	RSB_RECOVER_LVB_INVAL,
 	RSB_TOSS,
+	/* if rsb is part of ls_rsbtbl or not */
+	RSB_HASHED,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index a29de48849ef..e1e990b09068 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -582,6 +582,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
+		rsb_clear_flag(r, RSB_HASHED);
 
 		/* not necessary to held the ls_rsbtbl_lock when
 		 * calling send_remove()
@@ -658,8 +659,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
 
 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-	return rhashtable_insert_fast(rhash, &rsb->res_node,
-				      dlm_rhash_rsb_params);
+	int rv;
+
+	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+				    dlm_rhash_rsb_params);
+	if (!rv)
+		rsb_set_flag(rsb, RSB_HASHED);
+
+	return rv;
 }
 
 /*
@@ -773,12 +780,24 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
  do_toss:
 	write_lock_bh(&ls->ls_rsbtbl_lock);
-
-	/* retry lookup under write lock to see if its still in toss state
-	 * if not it's in keep state and we relookup - unlikely path.
+	/* toss handling move to out of toss handling requires ls_rsbtbl_lock
+	 * to held in write lock state. However during the transition from
+	 * previously read lock state to write lock state the rsb can be
+	 * removed out of the ls_rsbtbl hash or change it's state to keep
+	 * state. If RSB_HASHED is not set anymore somebody else removed
+	 * the entry out of the ls_rsbtbl hash, the lookup is protected by
+	 * rcu read lock and is an rcu read critical section, a possible free
+	 * of a rsb (that usually occurs when a rsb is removed out of the hash)
+	 * cannot be done between the read to write lock state transition as
+	 * kfree_rcu() of rsb will not occur during this rcu read critical
+	 * section for this reason a check on if RSB_HASHED is enough to check
+	 * if this situation happened. The recheck on RSB_TOSS is required to
+	 * check if the rsb is still in toss state, because somebody else
+	 * could take it out of RSB_TOSS state. If this is the case we do a
+	 * relookup under read lock as this is a very unlikely case.
+	 * The RSB_HASHED case can happen and we need to be prepared for this.
 	 */
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (!error) {
+	if (rsb_flag(r, RSB_HASHED)) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			goto retry;
@@ -950,11 +969,24 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
  do_toss:
 	write_lock_bh(&ls->ls_rsbtbl_lock);
 
-	/* retry lookup under write lock to see if its still in toss state
-	 * if not it's in keep state and we relookup - unlikely path.
+	/* toss handling move to out of toss handling requires ls_rsbtbl_lock
+	 * to held in write lock state. However during the transition from
+	 * previously read lock state to write lock state the rsb can be
+	 * removed out of the ls_rsbtbl hash or change it's state to keep
+	 * state. If RSB_HASHED is not set anymore somebody else removed
+	 * the entry out of the ls_rsbtbl hash, the lookup is protected by
+	 * rcu read lock and is an rcu read critical section, a possible free
+	 * of a rsb (that usually occurs when a rsb is removed out of the hash)
+	 * cannot be done between the read to write lock state transition as
+	 * kfree_rcu() of rsb will not occur during this rcu read critical
+	 * section for this reason a check on if RSB_HASHED is enough to check
+	 * if this situation happened. The recheck on RSB_TOSS is required to
+	 * check if the rsb is still in toss state, because somebody else
+	 * could take it out of RSB_TOSS state. If this is the case we do a
+	 * relookup under read lock as this is a very unlikely case.
+	 * The RSB_HASHED case can happen and we need to be prepared for this.
 	 */
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (!error) {
+	if (rsb_flag(r, RSB_HASHED)) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			goto retry;
@@ -1046,6 +1078,7 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
 {
 	int dir_nodeid;
 	uint32_t hash;
+	int rv;
 
 	if (len > DLM_RESNAME_MAXLEN)
 		return -EINVAL;
@@ -1053,12 +1086,20 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
 	hash = jhash(name, len, 0);
 	dir_nodeid = dlm_hash2nodeid(ls, hash);
 
+	/* hold the rcu lock here to prevent freeing of the rsb
+	 * while looking it up, there are currently a optimization
+	 * to not lookup the rsb twice instead look if its still
+	 * part of the rsbtbl hash.
+	 */
+	rcu_read_lock();
 	if (dlm_no_directory(ls))
-		return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
-				      from_nodeid, flags, r_ret);
-	else
-		return find_rsb_dir(ls, name, len, hash, dir_nodeid,
+		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
 				    from_nodeid, flags, r_ret);
+	else
+		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
+				  from_nodeid, flags, r_ret);
+	rcu_read_unlock();
+	return rv;
 }
 
 /* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1215,8 +1256,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
  */
 
-int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
-		      int len, unsigned int flags, int *r_nodeid, int *result)
+static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid,
+			      const char *name, int len, unsigned int flags,
+			      int *r_nodeid, int *result)
 {
 	struct dlm_rsb *r = NULL;
 	uint32_t hash;
@@ -1243,7 +1285,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	}
 
  retry:
-
 	/* check if the rsb is in keep state under read lock - likely path */
 	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
@@ -1278,11 +1319,24 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	/* unlikely path - relookup under write */
 	write_lock_bh(&ls->ls_rsbtbl_lock);
 
-	/* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
-	 * check if the rsb is still in toss state, if not relookup
+	/* toss handling move to out of toss handling requires ls_rsbtbl_lock
+	 * to held in write lock state. However during the transition from
+	 * previously read lock state to write lock state the rsb can be
+	 * removed out of the ls_rsbtbl hash or change it's state to keep
+	 * state. If RSB_HASHED is not set anymore somebody else removed
+	 * the entry out of the ls_rsbtbl hash, the lookup is protected by
+	 * rcu read lock and is an rcu read critical section, a possible free
+	 * of a rsb (that usually occurs when a rsb is removed out of the hash)
+	 * cannot be done between the read to write lock state transition as
+	 * kfree_rcu() of rsb will not occur during this rcu read critical
+	 * section for this reason a check on if RSB_HASHED is enough to check
+	 * if this situation happened. The recheck on RSB_TOSS is required to
+	 * check if the rsb is still in toss state, because somebody else
+	 * could take it out of RSB_TOSS state. If this is the case we do a
+	 * relookup under read lock as this is a very unlikely case.
+	 * The RSB_HASHED case can happen and we need to be prepared for this.
 	 */
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (!error) {
+	if (rsb_flag(r, RSB_HASHED)) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			/* something as changed, very unlikely but
@@ -1290,6 +1344,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 			 */
 			goto retry;
 		}
+
 	} else {
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		goto not_found;
@@ -1346,6 +1401,23 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	return error;
 }
 
+int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+		      int len, unsigned int flags, int *r_nodeid, int *result)
+{
+	int rv;
+
+	/* hold the rcu lock here to prevent freeing of the rsb
+	 * while looking it up, there are currently a optimization
+	 * to not lookup the rsb twice instead look if its still
+	 * part of the rsbtbl hash.
+	 */
+	rcu_read_lock();
+	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid,
+				result);
+	rcu_read_unlock();
+	return rv;
+}
+
 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 {
 	struct dlm_rsb *r;
@@ -4308,6 +4380,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	list_del(&r->res_rsbs_list);
 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 			       dlm_rhash_rsb_params);
+	rsb_clear_flag(r, RSB_HASHED);
 	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	free_toss_rsb(r);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 15a8b1cee433..d6cb7a1c700d 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -101,13 +101,26 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 	return r;
 }
 
-void dlm_free_rsb(struct dlm_rsb *r)
+static void __dlm_free_rsb(struct rcu_head *rcu)
 {
+	struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu);
+
 	if (r->res_lvbptr)
 		dlm_free_lvb(r->res_lvbptr);
 	kmem_cache_free(rsb_cache, r);
 }
 
+void dlm_free_rsb(struct dlm_rsb *r)
+{
+	/* wait until all current rcu read critical sections
+	 * are done until calling __dlm_free_rsb. Currently
+	 * this is used to avoid an use after free when checking
+	 * of the rsb is still part of ls_rsbtbl by testing on
+	 * RSB_HASHED flag.
+	 */
+	call_rcu(&r->rcu, __dlm_free_rsb);
+}
+
 struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 2/8] dlm: remove struct field with the same meaning
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 1/8] dlm: using rcu to avoid rsb lookup again Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-10 14:17   ` Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 3/8] dlm: use is_master() on checks if we are the master Alexander Aring
                   ` (5 subsequent siblings)
  7 siblings, 1 reply; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

There is currently "res_nodeid" and "res_master_nodeid" fields in
"struct dlm_rsb". At some point a developer does not know when to use
which one or forget to update one and they are out of sync. This patch
removes the "res_nodeid" and allows "res_master_nodeid" only. They have
different representation values about their invalid values, the
"res_master_nodeid" seems to be the modern way of represent the actual
master nodeid and actually use the nodeid value when the own nodeid is
the master (on res_nodeid the 0 represented this value). Also the modern
nodeid representation fits into a "unsigned" range as this avoids to
convert negative values over the network. The old value representation
is still part of the DLM networking protocol that's why the conversion
functions dlm_res_nodeid() and dlm_res_master_nodeid() are still
present. On a new major DLM version bump protocol the nodeid representation
should be updated to the modern value representation. These conversion
functions also applies for existing UAPI and the user space still
assumes the old "res_nodeid" value representation.

The same arguments applies to "lkb_nodeid" and "lkb_master_nodeid"
wheras this is also only a copied value from another lkb related field
"lkb_resource" and it's "res_master_nodeid" value. In this case it
requires more code review because "lkb_resource" is not set sometimes.

This patch so far makes the code easier to read and understandable
because we don't have several fields with the same meaning in some
structs. In case of the previously "res_nodeid" value, sometimes an
additional check on our_nodeid() is required to set the value to 0 that
represents in this value representation that we are the master node.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |  33 +++++-----
 fs/dlm/dlm_internal.h |  33 ++++++++--
 fs/dlm/lock.c         | 144 ++++++++++++++++++------------------------
 fs/dlm/lock.h         |   5 +-
 fs/dlm/lockspace.c    |   2 +-
 fs/dlm/rcom.c         |   4 +-
 fs/dlm/recover.c      |  20 ++----
 fs/dlm/recoverd.c     |   2 +-
 8 files changed, 119 insertions(+), 124 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 6ab3ed4074c6..40eb1696f3b6 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -58,9 +58,10 @@ static void print_format1_lock(struct seq_file *s, struct dlm_lkb *lkb,
 	    lkb->lkb_status == DLM_LKSTS_WAITING)
 		seq_printf(s, " (%s)", print_lockmode(lkb->lkb_rqmode));
 
-	if (lkb->lkb_nodeid) {
-		if (lkb->lkb_nodeid != res->res_nodeid)
-			seq_printf(s, " Remote: %3d %08x", lkb->lkb_nodeid,
+	if (lkb->lkb_master_nodeid != dlm_our_nodeid()) {
+		if (lkb->lkb_master_nodeid != res->res_master_nodeid)
+			seq_printf(s, " Remote: %3d %08x",
+				   dlm_res_nodeid(lkb->lkb_master_nodeid),
 				   lkb->lkb_remid);
 		else
 			seq_printf(s, " Master:     %08x", lkb->lkb_remid);
@@ -88,16 +89,15 @@ static void print_format1(struct dlm_rsb *res, struct seq_file *s)
 			seq_printf(s, "%c", '.');
 	}
 
-	if (res->res_nodeid > 0)
-		seq_printf(s, "\"\nLocal Copy, Master is node %d\n",
-			   res->res_nodeid);
-	else if (res->res_nodeid == 0)
-		seq_puts(s, "\"\nMaster Copy\n");
-	else if (res->res_nodeid == -1)
+
+	if (res->res_master_nodeid == 0)
 		seq_printf(s, "\"\nLooking up master (lkid %x)\n",
 			   res->res_first_lkid);
+	else if (res->res_master_nodeid == dlm_our_nodeid())
+		seq_puts(s, "\"\nMaster Copy\n");
 	else
-		seq_printf(s, "\"\nInvalid master %d\n", res->res_nodeid);
+		seq_printf(s, "\"\nLocal Copy, Master is node %d\n",
+			   res->res_master_nodeid);
 	if (seq_has_overflowed(s))
 		goto out;
 
@@ -184,7 +184,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
 
 	seq_printf(s, "%x %d %x %u %llu %x %x %d %d %d %llu %u %d \"%s\"\n",
 		   lkb->lkb_id,
-		   lkb->lkb_nodeid,
+		   dlm_res_nodeid(lkb->lkb_master_nodeid),
 		   lkb->lkb_remid,
 		   lkb->lkb_ownpid,
 		   (unsigned long long)xid,
@@ -194,7 +194,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
 		   lkb->lkb_grmode,
 		   lkb->lkb_rqmode,
 		   (unsigned long long)us,
-		   r->res_nodeid,
+		   dlm_res_nodeid(r->res_master_nodeid),
 		   r->res_length,
 		   r->res_name);
 }
@@ -238,7 +238,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
 
 	seq_printf(s, "lkb %x %d %x %u %llu %x %x %d %d %d %d %d %d %u %llu %llu\n",
 		   lkb->lkb_id,
-		   lkb->lkb_nodeid,
+		   dlm_res_nodeid(lkb->lkb_master_nodeid),
 		   lkb->lkb_remid,
 		   lkb->lkb_ownpid,
 		   (unsigned long long)xid,
@@ -265,7 +265,7 @@ static void print_format3(struct dlm_rsb *r, struct seq_file *s)
 
 	seq_printf(s, "rsb %p %d %x %lx %d %d %u %d ",
 		   r,
-		   r->res_nodeid,
+		   dlm_res_nodeid(r->res_master_nodeid),
 		   r->res_first_lkid,
 		   r->res_flags,
 		   !list_empty(&r->res_root_list),
@@ -341,7 +341,7 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s)
 
 	seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
 		   r,
-		   r->res_nodeid,
+		   dlm_res_nodeid(r->res_master_nodeid),
 		   r->res_master_nodeid,
 		   r->res_dir_nodeid,
 		   our_nodeid,
@@ -611,7 +611,8 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
 		ret = snprintf(debug_buf + pos, len - pos, "%x %d %d %s\n",
 			       lkb->lkb_id, lkb->lkb_wait_type,
-			       lkb->lkb_nodeid, lkb->lkb_resource->res_name);
+			       dlm_res_nodeid(lkb->lkb_master_nodeid),
+			       lkb->lkb_resource->res_name);
 		if (ret >= len - pos)
 			break;
 		pos += ret;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 6f578527c6d8..2c7ad3c5e893 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -253,7 +253,7 @@ struct dlm_callback {
 struct dlm_lkb {
 	struct dlm_rsb		*lkb_resource;	/* the rsb */
 	struct kref		lkb_ref;
-	int			lkb_nodeid;	/* copied from rsb */
+	unsigned int		lkb_master_nodeid;	/* copied from rsb */
 	int			lkb_ownpid;	/* pid of lock owner */
 	uint32_t		lkb_id;		/* our lock ID */
 	uint32_t		lkb_remid;	/* lock ID on remote partner */
@@ -303,18 +303,41 @@ struct dlm_lkb {
  *
  * res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
  * greater than zero when another nodeid.
- *
- * (TODO: remove res_nodeid and only use res_master_nodeid)
  */
 
+/* For backwards compatibility, see above.
+ * The protocol is still using res_nodeid.
+ */
+static inline int dlm_res_nodeid(unsigned int res_master_nodeid)
+{
+	if (res_master_nodeid == 0)
+		return -1;
+	else if (res_master_nodeid == dlm_our_nodeid())
+		return 0;
+	else
+		return res_master_nodeid;
+}
+
+/* For backwards compatibility, see above.
+ * The protocol is still using res_nodeid.
+ */
+static inline unsigned int dlm_res_master_nodeid(int res_nodeid)
+{
+	if (res_nodeid == -1)
+		return 0;
+	else if (res_nodeid == 0)
+		return dlm_our_nodeid();
+	else
+		return res_nodeid;
+}
+
 struct dlm_rsb {
 	struct dlm_ls		*res_ls;	/* the lockspace */
 	struct kref		res_ref;
 	spinlock_t		res_lock;
 	unsigned long		res_flags;
 	int			res_length;	/* length of rsb name */
-	int			res_nodeid;
-	int			res_master_nodeid;
+	unsigned int		res_master_nodeid;
 	int			res_dir_nodeid;
 	unsigned long		res_id;		/* for ls_recover_xa */
 	uint32_t                res_lvbseq;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e1e990b09068..3195d0b96c74 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,21 +160,19 @@ static const int __quecvt_compat_matrix[8][8] = {
 
 void dlm_print_lkb(struct dlm_lkb *lkb)
 {
-	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
+	pr_err("lkb: master_nodeid %d id %x remid %x exflags %x flags %x "
 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
-	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
-	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
-	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
-	       (unsigned long long)lkb->lkb_recover_seq);
+	       lkb->lkb_master_nodeid, lkb->lkb_id, lkb->lkb_remid,
+	       lkb->lkb_exflags, dlm_iflags_val(lkb), lkb->lkb_status,
+	       lkb->lkb_rqmode, lkb->lkb_grmode, lkb->lkb_wait_type,
+	       lkb->lkb_wait_nodeid, (unsigned long long)lkb->lkb_recover_seq);
 }
 
 static void dlm_print_rsb(struct dlm_rsb *r)
 {
-	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
-	       "rlc %d name %s\n",
-	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
-	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
-	       r->res_name);
+	pr_err("rsb: master %d dir %d flags %lx first %x rlc %d name %s\n",
+	       r->res_master_nodeid, r->res_dir_nodeid, r->res_flags,
+	       r->res_first_lkid, r->res_recover_locks_count, r->res_name);
 }
 
 void dlm_dump_rsb(struct dlm_rsb *r)
@@ -243,13 +241,13 @@ static inline int is_granted(struct dlm_lkb *lkb)
 
 static inline int is_remote(struct dlm_rsb *r)
 {
-	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
-	return !!r->res_nodeid;
+	DLM_ASSERT(r->res_master_nodeid != 0, dlm_print_rsb(r););
+	return r->res_master_nodeid != dlm_our_nodeid();
 }
 
 static inline int is_process_copy(struct dlm_lkb *lkb)
 {
-	return lkb->lkb_nodeid &&
+	return lkb->lkb_master_nodeid != dlm_our_nodeid() &&
 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
 }
 
@@ -832,7 +830,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		dlm_print_rsb(r);
 		/* fix it and go on */
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 		r->res_first_lkid = 0;
 	}
@@ -878,7 +875,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
 			  from_nodeid, r->res_name);
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 		goto out_add;
 	}
 
@@ -901,11 +897,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		/* When we are the dir nodeid, we can set the master
 		   node immediately */
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 	} else {
 		/* set_master will send_lookup to dir_nodeid */
 		r->res_master_nodeid = 0;
-		r->res_nodeid = -1;
 	}
 
  out_add:
@@ -1022,7 +1016,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
 		dlm_print_rsb(r);
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 	}
 
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
@@ -1050,7 +1043,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	r->res_hash = hash;
 	r->res_dir_nodeid = dir_nodeid;
 	r->res_master_nodeid = dir_nodeid;
-	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
 	write_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1130,7 +1122,8 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
 		return -ENOTBLK;
 	} else {
 		/* our rsb is not master, but the dir nodeid has sent us a
-	   	   request; this could happen with master 0 / res_nodeid -1 */
+		 * request; this could happen with res_master_nodeid 0
+		 */
 
 		if (r->res_master_nodeid) {
 			log_error(ls, "validate master from_dir %d master %d "
@@ -1140,7 +1133,6 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
 		}
 
 		r->res_master_nodeid = dlm_our_nodeid();
-		r->res_nodeid = 0;
 		return 0;
 	}
 }
@@ -1163,11 +1155,10 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		/* Recovery uses this function to set a new master when
 		 * the previous master failed.  Setting NEW_MASTER will
 		 * force dlm_recover_masters to call recover_master on this
-		 * rsb even though the res_nodeid is no longer removed.
+		 * rsb even though the res_master_nodeid is no longer removed.
 		 */
 
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
 		rsb_set_flag(r, RSB_NEW_MASTER);
 
 		if (toss_list) {
@@ -1183,9 +1174,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		 * cycle before recovering this master value
 		 */
 
-		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
+		log_limit(ls, "%s from_master %d master_nodeid %d first %x %s",
 			  __func__, from_nodeid, r->res_master_nodeid,
-			  r->res_nodeid, r->res_first_lkid, r->res_name);
+			  r->res_first_lkid, r->res_name);
 
 		if (r->res_master_nodeid == our_nodeid) {
 			log_error(ls, "from_master %d our_master", from_nodeid);
@@ -1194,7 +1185,6 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		}
 
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
 		rsb_set_flag(r, RSB_NEW_MASTER);
 	}
 
@@ -1206,7 +1196,6 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
 			  from_nodeid, r->res_first_lkid, r->res_name);
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
 	}
 
 	if (!from_master && !fix_master &&
@@ -1371,7 +1360,7 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid,
 	r->res_hash = hash;
 	r->res_dir_nodeid = our_nodeid;
 	r->res_master_nodeid = from_nodeid;
-	r->res_nodeid = from_nodeid;
+	kref_init(&r->res_ref);
 	rsb_set_flag(r, RSB_TOSS);
 
 	write_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1523,7 +1512,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
-	lkb->lkb_nodeid = -1;
 	lkb->lkb_grmode = DLM_LOCK_IV;
 	kref_init(&lkb->lkb_ref);
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
@@ -2485,8 +2473,9 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
 		}
 
 		if (!demoted && is_demoted(lkb)) {
-			log_print("WARN: pending demoted %x node %d %s",
-				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
+			log_print("WARN: pending demoted %x master_node %d %s",
+				  lkb->lkb_id, lkb->lkb_master_nodeid,
+				  r->res_name);
 			demote_restart = 1;
 			continue;
 		}
@@ -2504,7 +2493,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
 				}
 			} else {
 				log_print("WARN: pending deadlock %x node %d %s",
-					  lkb->lkb_id, lkb->lkb_nodeid,
+					  lkb->lkb_id, lkb->lkb_master_nodeid,
 					  r->res_name);
 				dlm_dump_rsb(r);
 			}
@@ -2573,7 +2562,8 @@ static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
 	int cw = 0;
 
 	if (!is_master(r)) {
-		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+		log_print("%s r master_nodeid %d", __func__,
+			  r->res_master_nodeid);
 		dlm_dump_rsb(r);
 		return;
 	}
@@ -2669,7 +2659,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 		r->res_first_lkid = lkb->lkb_id;
-		lkb->lkb_nodeid = r->res_nodeid;
+		lkb->lkb_master_nodeid = r->res_master_nodeid;
 		return 0;
 	}
 
@@ -2678,13 +2668,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 		return 1;
 	}
 
-	if (r->res_master_nodeid == our_nodeid) {
-		lkb->lkb_nodeid = 0;
-		return 0;
-	}
-
 	if (r->res_master_nodeid) {
-		lkb->lkb_nodeid = r->res_master_nodeid;
+		lkb->lkb_master_nodeid = r->res_master_nodeid;
 		return 0;
 	}
 
@@ -2699,8 +2684,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
 			  r->res_name);
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
-		lkb->lkb_nodeid = 0;
+		lkb->lkb_master_nodeid = our_nodeid;
 		return 0;
 	}
 
@@ -2897,7 +2881,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
    for success */
 
-/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
+/* note: it's valid for lkb_master_nodeid/res_master_nodeid to be 0 when we get here
    because there may be a lookup in progress and it's valid to do
    cancel/unlockf on it */
 
@@ -3578,7 +3562,7 @@ static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
 		      struct dlm_message *ms)
 {
-	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
+	ms->m_nodeid   = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
@@ -3625,7 +3609,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = r->res_nodeid;
+	to_nodeid = r->res_master_nodeid;
 
 	error = add_to_waiters(lkb, mstype, to_nodeid);
 	if (error)
@@ -3689,7 +3673,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = lkb->lkb_nodeid;
+	to_nodeid = lkb->lkb_master_nodeid;
 
 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
 	if (error)
@@ -3710,7 +3694,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = lkb->lkb_nodeid;
+	to_nodeid = lkb->lkb_master_nodeid;
 
 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
 	if (error)
@@ -3780,7 +3764,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = lkb->lkb_nodeid;
+	to_nodeid = lkb->lkb_master_nodeid;
 
 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 	if (error)
@@ -3896,7 +3880,7 @@ static void fake_astfn(void *astparam)
 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 				const struct dlm_message *ms)
 {
-	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+	lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(ms->m_header.h_nodeid));
 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
 	lkb->lkb_grmode = DLM_LOCK_IV;
@@ -3944,7 +3928,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
 {
 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
-	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+	lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(ms->m_header.h_nodeid));
 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
 }
 
@@ -3969,7 +3953,7 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
 	case cpu_to_le32(DLM_MSG_CONVERT):
 	case cpu_to_le32(DLM_MSG_UNLOCK):
 	case cpu_to_le32(DLM_MSG_CANCEL):
-		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
+		if (!is_master_copy(lkb) || lkb->lkb_master_nodeid != from)
 			error = -EINVAL;
 		break;
 
@@ -3978,14 +3962,14 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
 	case cpu_to_le32(DLM_MSG_GRANT):
 	case cpu_to_le32(DLM_MSG_BAST):
-		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
+		if (!is_process_copy(lkb) || lkb->lkb_master_nodeid != from)
 			error = -EINVAL;
 		break;
 
 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
 		if (!is_process_copy(lkb))
 			error = -EINVAL;
-		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
+		else if (lkb->lkb_master_nodeid != 0 && lkb->lkb_master_nodeid != from)
 			error = -EINVAL;
 		break;
 
@@ -3999,7 +3983,7 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
 			  "ignore invalid message %d from %d %x %x %x %d",
 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
 			  lkb->lkb_remid, dlm_iflags_val(lkb),
-			  lkb->lkb_nodeid);
+			  lkb->lkb_master_nodeid);
 	return error;
 }
 
@@ -4425,8 +4409,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 	   lookup as a request and sent request reply instead of lookup reply */
 	if (mstype == DLM_MSG_LOOKUP) {
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
-		lkb->lkb_nodeid = from_nodeid;
+		lkb->lkb_master_nodeid = from_nodeid;
 	}
 
 	/* this is the value returned from do_request() on the master */
@@ -4468,8 +4451,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 		    r->res_master_nodeid != dlm_our_nodeid()) {
 			/* cause _request_lock->set_master->send_lookup */
 			r->res_master_nodeid = 0;
-			r->res_nodeid = -1;
-			lkb->lkb_nodeid = -1;
+			lkb->lkb_master_nodeid = 0;
 		}
 
 		if (is_overlap(lkb)) {
@@ -4743,7 +4725,6 @@ static void receive_lookup_reply(struct dlm_ls *ls,
 
 	if (ret_nodeid == dlm_our_nodeid()) {
 		r->res_master_nodeid = ret_nodeid;
-		r->res_nodeid = 0;
 		do_lookup_list = 1;
 		r->res_first_lkid = 0;
 	} else if (ret_nodeid == -1) {
@@ -4751,12 +4732,10 @@ static void receive_lookup_reply(struct dlm_ls *ls,
 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
 		r->res_master_nodeid = 0;
-		r->res_nodeid = -1;
-		lkb->lkb_nodeid = -1;
+		lkb->lkb_master_nodeid = 0;
 	} else {
-		/* set_master() will set lkb_nodeid from r */
+		/* set_master() will set lkb_master_nodeid from r */
 		r->res_master_nodeid = ret_nodeid;
-		r->res_nodeid = ret_nodeid;
 	}
 
 	if (is_overlap(lkb)) {
@@ -5023,7 +5002,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
 		memset(ms_local, 0, sizeof(struct dlm_message));
 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
-		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+		ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 		_receive_convert_reply(lkb, ms_local, true);
 
 		/* Same special case as in receive_rcom_lock_args() */
@@ -5079,13 +5058,12 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 		   many and they aren't very interesting */
 
 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
-			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
-				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+			log_debug(ls, "waiter %x remote %x msg %d master_nodeid %d lkb_master_nodeid %d wait_nodeid %d dir_nodeid %d",
 				  lkb->lkb_id,
 				  lkb->lkb_remid,
 				  lkb->lkb_wait_type,
-				  lkb->lkb_resource->res_nodeid,
-				  lkb->lkb_nodeid,
+				  lkb->lkb_resource->res_master_nodeid,
+				  lkb->lkb_master_nodeid,
 				  lkb->lkb_wait_nodeid,
 				  dir_nodeid);
 		}
@@ -5142,7 +5120,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			memset(ms_local, 0, sizeof(struct dlm_message));
 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
-			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+			ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 			_receive_unlock_reply(lkb, ms_local, true);
 			dlm_put_lkb(lkb);
 			break;
@@ -5152,7 +5130,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			memset(ms_local, 0, sizeof(struct dlm_message));
 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
-			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+			ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 			_receive_cancel_reply(lkb, ms_local, true);
 			dlm_put_lkb(lkb);
 			break;
@@ -5248,11 +5226,10 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 					&lkb->lkb_iflags);
 		err = 0;
 
-		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
-			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
-			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
-			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
-			  dlm_dir_nodeid(r), oc, ou);
+		log_debug(ls, "waiter %x remote %x msg %d master_nodeid %d lkb_master_nodeid %d wait_nodeid %d dir_nodeid %d overlap %d %d",
+			  lkb->lkb_id, lkb->lkb_remid, mstype,
+			  r->res_master_nodeid, lkb->lkb_master_nodeid,
+			  lkb->lkb_wait_nodeid, dlm_dir_nodeid(r), oc, ou);
 
 		/*
 		 * No reply to the pre-recovery operation will now be received,
@@ -5325,9 +5302,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		}
 
 		if (err) {
-			log_error(ls, "waiter %x msg %d r_nodeid %d "
+			log_error(ls, "waiter %x msg %d master_nodeid %d "
 				  "dir_nodeid %d overlap %d %d",
-				  lkb->lkb_id, mstype, r->res_nodeid,
+				  lkb->lkb_id, mstype, r->res_master_nodeid,
 				  dlm_dir_nodeid(r), oc, ou);
 		}
 		unlock_rsb(r);
@@ -5380,8 +5357,8 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
 		if (!is_master_copy(lkb))
 			continue;
 
-		if ((lkb->lkb_nodeid == nodeid_gone) ||
-		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
+		if ((lkb->lkb_master_nodeid == nodeid_gone) ||
+		    dlm_is_removed(ls, lkb->lkb_master_nodeid)) {
 
 			/* tell recover_lvb to invalidate the lvb
 			   because a node holding EX/PW failed */
@@ -5518,7 +5495,7 @@ static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
 	struct dlm_lkb *lkb;
 
 	list_for_each_entry(lkb, head, lkb_statequeue) {
-		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
+		if (lkb->lkb_master_nodeid == nodeid && lkb->lkb_remid == remid)
 			return lkb;
 	}
 	return NULL;
@@ -5547,7 +5524,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 {
 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
 
-	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
+	lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(rc->rc_header.h_nodeid));
 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
@@ -6294,7 +6271,8 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
 
 /* debug functionality */
 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
-		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
+		      int lkb_master_nodeid, unsigned int lkb_dflags,
+		      int lkb_status)
 {
 	struct dlm_lksb *lksb;
 	struct dlm_lkb *lkb;
@@ -6316,7 +6294,7 @@ int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
 	}
 
 	dlm_set_dflags_val(lkb, lkb_dflags);
-	lkb->lkb_nodeid = lkb_nodeid;
+	lkb->lkb_master_nodeid = lkb_master_nodeid;
 	lkb->lkb_lksb = lksb;
 	/* user specific pointer, just don't have it NULL for kernel locks */
 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 8de9dee4c058..e59161d2fe84 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -61,13 +61,14 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid);
 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
-		      int lkb_nodeid, unsigned int lkb_flags, int lkb_status);
+		      int lkb_master_nodeid, unsigned int lkb_flags,
+		      int lkb_status);
 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
 				 int mstype, int to_nodeid);
 
 static inline int is_master(struct dlm_rsb *r)
 {
-	return !r->res_nodeid;
+	return r->res_master_nodeid == dlm_our_nodeid();
 }
 
 static inline void lock_rsb(struct dlm_rsb *r)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 6f1078a1c715..f54f43dbe7b6 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -660,7 +660,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 		}
 	} else if (force == 1) {
 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
-			if (lkb->lkb_nodeid == 0 &&
+			if (lkb->lkb_master_nodeid == dlm_our_nodeid() &&
 			    lkb->lkb_grmode != DLM_LOCK_IV) {
 				rv = 1;
 				break;
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index be1a71a6303a..b545cd7a23cd 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -455,8 +455,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
 	if (lkb->lkb_lvbptr)
 		len += ls->ls_lvblen;
 
-	error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
-			    seq);
+	error = create_rcom(ls, r->res_master_nodeid, DLM_RCOM_LOCK, len, &rc,
+			    &mh, seq);
 	if (error)
 		goto out;
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index d156196b9e69..d948d28d8f92 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -408,7 +408,7 @@ static void set_lock_master(struct list_head *queue, int nodeid)
 
 	list_for_each_entry(lkb, queue, lkb_statequeue) {
 		if (!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
-			lkb->lkb_nodeid = nodeid;
+			lkb->lkb_master_nodeid = nodeid;
 			lkb->lkb_remid = 0;
 		}
 	}
@@ -416,9 +416,9 @@ static void set_lock_master(struct list_head *queue, int nodeid)
 
 static void set_master_lkbs(struct dlm_rsb *r)
 {
-	set_lock_master(&r->res_grantqueue, r->res_nodeid);
-	set_lock_master(&r->res_convertqueue, r->res_nodeid);
-	set_lock_master(&r->res_waitqueue, r->res_nodeid);
+	set_lock_master(&r->res_grantqueue, r->res_master_nodeid);
+	set_lock_master(&r->res_convertqueue, r->res_master_nodeid);
+	set_lock_master(&r->res_waitqueue, r->res_master_nodeid);
 }
 
 /*
@@ -455,7 +455,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
 	if (is_master(r))
 		return 0;
 
-	is_removed = dlm_is_removed(ls, r->res_nodeid);
+	is_removed = dlm_is_removed(ls, r->res_master_nodeid);
 
 	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
 		return 0;
@@ -464,10 +464,8 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
 	dir_nodeid = dlm_dir_nodeid(r);
 
 	if (dir_nodeid == our_nodeid) {
-		if (is_removed) {
+		if (is_removed)
 			r->res_master_nodeid = our_nodeid;
-			r->res_nodeid = 0;
-		}
 
 		/* set master of lkbs to ourself when is_removed, or to
 		   another new master which we set along with NEW_MASTER
@@ -501,14 +499,9 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
 static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 {
 	int dir_nodeid = dlm_dir_nodeid(r);
-	int new_master = dir_nodeid;
-
-	if (dir_nodeid == dlm_our_nodeid())
-		new_master = 0;
 
 	dlm_purge_mstcpy_locks(r);
 	r->res_master_nodeid = dir_nodeid;
-	r->res_nodeid = new_master;
 	set_new_master(r);
 	(*count)++;
 	return 0;
@@ -584,7 +577,6 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
 
 	lock_rsb(r);
 	r->res_master_nodeid = ret_nodeid;
-	r->res_nodeid = new_master;
 	set_new_master(r);
 	unlock_rsb(r);
 	recover_xa_del(r);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 17a40d1e6036..06959b128bd0 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -34,7 +34,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 
 	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
-		if (r->res_nodeid)
+		if (r->res_master_nodeid != dlm_our_nodeid())
 			continue;
 
 		list_add(&r->res_masters_list, &ls->ls_masters_list);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 3/8] dlm: use is_master() on checks if we are the master
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 1/8] dlm: using rcu to avoid rsb lookup again Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 2/8] dlm: remove struct field with the same meaning Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 4/8] dlm: use LSFL_FS to check if it's a kernel lockspace Alexander Aring
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

There are checks if we are the master or not done by
"r->res_master_nodeid == dlm_our_nodeid()" or unequal to check if we are
not the master. There is a helper function is_master() for doing this
kind of check. This patch replaces several checks of those by using the
helper instead of using the mentioned condition check.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c     | 9 ++++-----
 fs/dlm/recoverd.c | 2 +-
 2 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 3195d0b96c74..a41a6fa2123b 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -242,7 +242,7 @@ static inline int is_granted(struct dlm_lkb *lkb)
 static inline int is_remote(struct dlm_rsb *r)
 {
 	DLM_ASSERT(r->res_master_nodeid != 0, dlm_print_rsb(r););
-	return r->res_master_nodeid != dlm_our_nodeid();
+	return !is_master(r);
 }
 
 static inline int is_process_copy(struct dlm_lkb *lkb)
@@ -4025,7 +4025,7 @@ static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
 
 	lock_rsb(r);
 
-	if (r->res_master_nodeid != dlm_our_nodeid()) {
+	if (!is_master(r)) {
 		error = validate_master_nodeid(ls, r, from_nodeid);
 		if (error) {
 			unlock_rsb(r);
@@ -4447,8 +4447,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 			  from_nodeid, result, r->res_master_nodeid,
 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
 
-		if (r->res_dir_nodeid != dlm_our_nodeid() &&
-		    r->res_master_nodeid != dlm_our_nodeid()) {
+		if (r->res_dir_nodeid != dlm_our_nodeid() && !is_master(r)) {
 			/* cause _request_lock->set_master->send_lookup */
 			r->res_master_nodeid = 0;
 			lkb->lkb_master_nodeid = 0;
@@ -4462,7 +4461,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 		} else {
 			_request_lock(r, lkb);
 
-			if (r->res_master_nodeid == dlm_our_nodeid())
+			if (is_master(r))
 				confirm_master(r, 0);
 		}
 		break;
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 06959b128bd0..98d83f90a0db 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -34,7 +34,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 
 	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
-		if (r->res_master_nodeid != dlm_our_nodeid())
+		if (!is_master(r))
 			continue;
 
 		list_add(&r->res_masters_list, &ls->ls_masters_list);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 4/8] dlm: use LSFL_FS to check if it's a kernel lockspace
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
                   ` (2 preceding siblings ...)
  2024-06-03 21:55 ` [PATCH dlm/next 3/8] dlm: use is_master() on checks if we are the master Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 5/8] dlm: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

This patch sets and uses the internal flag LSFL_FS to check in workqueue
handling if it's a kernel lockspace or not in the callback workqueue
handling. Currently user space lockspaces don't require the callback
workqueue. Upcoming changes will make it possible to remove the
the callback workqueue context switch when a specific lockspace flag is
passed. This patch prepares for such handling as some handling like
setting of LSFL_CB_DELAY is still required for those kernel lockspaces
but they don't have a callback workqueue.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          | 17 +++++++++++------
 fs/dlm/dlm_internal.h |  1 +
 fs/dlm/lockspace.c    | 13 +++++++------
 3 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 59711486d801..52ce27031314 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -161,6 +161,9 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 
 int dlm_callback_start(struct dlm_ls *ls)
 {
+	if (!test_bit(LSFL_FS, &ls->ls_flags))
+		return 0;
+
 	ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
 						     WQ_HIGHPRI | WQ_MEM_RECLAIM);
 	if (!ls->ls_callback_wq) {
@@ -178,13 +181,15 @@ void dlm_callback_stop(struct dlm_ls *ls)
 
 void dlm_callback_suspend(struct dlm_ls *ls)
 {
-	if (ls->ls_callback_wq) {
-		spin_lock_bh(&ls->ls_cb_lock);
-		set_bit(LSFL_CB_DELAY, &ls->ls_flags);
-		spin_unlock_bh(&ls->ls_cb_lock);
+	if (!test_bit(LSFL_FS, &ls->ls_flags))
+		return;
 
+	spin_lock_bh(&ls->ls_cb_lock);
+	set_bit(LSFL_CB_DELAY, &ls->ls_flags);
+	spin_unlock_bh(&ls->ls_cb_lock);
+
+	if (ls->ls_callback_wq)
 		flush_workqueue(ls->ls_callback_wq);
-	}
 }
 
 #define MAX_CB_QUEUE 25
@@ -195,7 +200,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 	int count = 0, sum = 0;
 	bool empty;
 
-	if (!ls->ls_callback_wq)
+	if (!test_bit(LSFL_FS, &ls->ls_flags))
 		return;
 
 more:
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 2c7ad3c5e893..3b026d80aa2b 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -726,6 +726,7 @@ struct dlm_ls {
 #define LSFL_CB_DELAY		9
 #define LSFL_NODIR		10
 #define LSFL_RECV_MSG_BLOCKED	11
+#define LSFL_FS			12
 
 #define DLM_PROC_FLAGS_CLOSING 1
 #define DLM_PROC_FLAGS_COMPAT  2
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index f54f43dbe7b6..be5dd5c990e5 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -500,12 +500,13 @@ static int new_lockspace(const char *name, const char *cluster,
 	list_add(&ls->ls_list, &lslist);
 	spin_unlock_bh(&lslist_lock);
 
-	if (flags & DLM_LSFL_FS) {
-		error = dlm_callback_start(ls);
-		if (error) {
-			log_error(ls, "can't start dlm_callback %d", error);
-			goto out_delist;
-		}
+	if (flags & DLM_LSFL_FS)
+		set_bit(LSFL_FS, &ls->ls_flags);
+
+	error = dlm_callback_start(ls);
+	if (error) {
+		log_error(ls, "can't start dlm_callback %d", error);
+		goto out_delist;
 	}
 
 	init_waitqueue_head(&ls->ls_recover_lock_wait);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 5/8] dlm: introduce DLM_LSFL_SOFTIRQ_SAFE
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
                   ` (3 preceding siblings ...)
  2024-06-03 21:55 ` [PATCH dlm/next 4/8] dlm: use LSFL_FS to check if it's a kernel lockspace Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 6/8] dlm: implement LSFL_SOFTIRQ_SAFE Alexander Aring
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

This patch introduces the per lockspace DLM_LSFL_SOFTIRQ_SAFE flag to
signal that the user is ready to handle ast/bast callbacks in
softirq context. It is a reserved dlm kernel lockspace flag as seen from
the user space yet. We not chose available 0x4 flag as this was recently
flag that was dropped and should be not reused now. In future, if all
users changed to use DLM_LSFL_SOFTIRQ_SAFE, we can hopefully drop this
flag and dlm code functionality (dlm_callback workqueue) which belongs to
it.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lockspace.c       |  3 +++
 include/linux/dlm.h      | 17 ++++++++++++++++-
 include/uapi/linux/dlm.h |  2 ++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index be5dd5c990e5..51f9516b710d 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -630,6 +630,9 @@ int dlm_new_user_lockspace(const char *name, const char *cluster,
 			   void *ops_arg, int *ops_result,
 			   dlm_lockspace_t **lockspace)
 {
+	if (flags & DLM_LSFL_SOFTIRQ)
+		return -EINVAL;
+
 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
 				   ops_arg, ops_result, lockspace);
 }
diff --git a/include/linux/dlm.h b/include/linux/dlm.h
index c58c4f790c04..bacda9898f2b 100644
--- a/include/linux/dlm.h
+++ b/include/linux/dlm.h
@@ -35,6 +35,9 @@ struct dlm_lockspace_ops {
 			      int num_slots, int our_slot, uint32_t generation);
 };
 
+/* only relevant for kernel lockspaces, will be removed in future */
+#define DLM_LSFL_SOFTIRQ __DLM_LSFL_RESERVED0
+
 /*
  * dlm_new_lockspace
  *
@@ -55,6 +58,11 @@ struct dlm_lockspace_ops {
  *   used to select the directory node.  Must be the same on all nodes.
  * DLM_LSFL_NEWEXCL
  *   dlm_new_lockspace() should return -EEXIST if the lockspace exists.
+ * DLM_LSFL_SOFTIRQ
+ *   dlm request callbacks (ast, bast) are softirq safe. Flag should be
+ *   preferred by users. Will be default in some future. If set the
+ *   strongest context for ast, bast callback is softirq as it avoids
+ *   an additional context switch.
  *
  * lvblen: length of lvb in bytes.  Must be multiple of 8.
  *   dlm_new_lockspace() returns an error if this does not match
@@ -121,7 +129,14 @@ int dlm_release_lockspace(dlm_lockspace_t *lockspace, int force);
  * call.
  *
  * AST routines should not block (at least not for long), but may make
- * any locking calls they please.
+ * any locking calls they please. If DLM_LSFL_SOFTIRQ for kernel
+ * users of dlm_new_lockspace() is passed the ast and bast callbacks
+ * can be processed in softirq context. Also some of the callback
+ * contexts are in the same context as the DLM lock request API, users
+ * must not hold locks while calling dlm lock request API and trying
+ * to acquire this lock in the callback again, this will end in a
+ * lock recursion. For newer implementation the DLM_LSFL_SOFTIRQ
+ * should be used.
  */
 
 int dlm_lock(dlm_lockspace_t *lockspace,
diff --git a/include/uapi/linux/dlm.h b/include/uapi/linux/dlm.h
index e7e905fb0bb2..4eaf835780b0 100644
--- a/include/uapi/linux/dlm.h
+++ b/include/uapi/linux/dlm.h
@@ -71,6 +71,8 @@ struct dlm_lksb {
 /* DLM_LSFL_TIMEWARN is deprecated and reserved. DO NOT USE! */
 #define DLM_LSFL_TIMEWARN	0x00000002
 #define DLM_LSFL_NEWEXCL     	0x00000008
+/* currently reserved due in-kernel use */
+#define __DLM_LSFL_RESERVED0	0x00000010
 
 
 #endif /* _UAPI__DLM_DOT_H__ */
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 6/8] dlm: implement LSFL_SOFTIRQ_SAFE
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
                   ` (4 preceding siblings ...)
  2024-06-03 21:55 ` [PATCH dlm/next 5/8] dlm: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 7/8] dlm: convert ls_cb_lock to rwlock Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace() Alexander Aring
  7 siblings, 0 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

This patch implements to allow to directly call ast and bast callbacks
in the context as soon they arrive. Currently every ast and bast callback
of a kernel lockspace  callback is queued through a workqueue to provide
the dlm user a different context. However some users can't currently handle
the strongest context of ast/bast which is a softirq context but new users
should directly implement their dlm application to run in softirq context.
Another mentioned requirement that is more unlikely that a current DLM
runs into is a lock recursion if a lock if the same lock is held in the
callback and when the DLM API is called.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          | 157 +++++++++++++++++++++++++++---------------
 fs/dlm/ast.h          |  11 ++-
 fs/dlm/dlm_internal.h |   1 +
 fs/dlm/lockspace.c    |   3 +
 fs/dlm/user.c         |  38 +++++-----
 5 files changed, 126 insertions(+), 84 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 52ce27031314..742b30b61c19 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -18,35 +18,52 @@
 #include "user.h"
 #include "ast.h"
 
-static void dlm_callback_work(struct work_struct *work)
+static void dlm_run_callback(uint32_t ls_id, uint32_t lkb_id, int8_t mode,
+			     uint32_t flags, uint8_t sb_flags, int sb_status,
+			     struct dlm_lksb *lksb,
+			     void (*astfn)(void *astparam),
+			     void (*bastfn)(void *astparam, int mode),
+			     void *astparam, const char *res_name,
+			     size_t res_length)
 {
-	struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
-
-	if (cb->flags & DLM_CB_BAST) {
-		trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
-			       cb->res_length);
-		cb->bastfn(cb->astparam, cb->mode);
-	} else if (cb->flags & DLM_CB_CAST) {
-		trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
-			      cb->sb_flags, cb->res_name, cb->res_length);
-		cb->lkb_lksb->sb_status = cb->sb_status;
-		cb->lkb_lksb->sb_flags = cb->sb_flags;
-		cb->astfn(cb->astparam);
+	if (flags & DLM_CB_BAST) {
+		trace_dlm_bast(ls_id, lkb_id, mode, res_name, res_length);
+		bastfn(astparam, mode);
+	} else if (flags & DLM_CB_CAST) {
+		trace_dlm_ast(ls_id, lkb_id, sb_status, sb_flags, res_name,
+			      res_length);
+		lksb->sb_status = sb_status;
+		lksb->sb_flags = sb_flags;
+		astfn(astparam);
 	}
+}
 
+static void dlm_do_callback(struct dlm_callback *cb)
+{
+	dlm_run_callback(cb->ls_id, cb->lkb_id, cb->mode, cb->flags,
+			 cb->sb_flags, cb->sb_status, cb->lkb_lksb,
+			 cb->astfn, cb->bastfn, cb->astparam,
+			 cb->res_name, cb->res_length);
 	dlm_free_cb(cb);
 }
 
-int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
-			   int status, uint32_t sbflags,
-			   struct dlm_callback **cb)
+static void dlm_callback_work(struct work_struct *work)
+{
+	struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
+
+	dlm_do_callback(cb);
+}
+
+bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+			   int status, uint32_t sbflags, int *copy_lvb)
 {
 	struct dlm_rsb *rsb = lkb->lkb_resource;
-	int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
 	struct dlm_ls *ls = rsb->res_ls;
-	int copy_lvb = 0;
 	int prev_mode;
 
+	if (copy_lvb)
+		*copy_lvb = 0;
+
 	if (flags & DLM_CB_BAST) {
 		/* if cb is a bast, it should be skipped if the blocking mode is
 		 * compatible with the last granted mode
@@ -56,7 +73,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
 				log_debug(ls, "skip %x bast mode %d for cast mode %d",
 					  lkb->lkb_id, mode,
 					  lkb->lkb_last_cast_cb_mode);
-				goto out;
+				return true;
 			}
 		}
 
@@ -74,7 +91,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
 			    (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
 				log_debug(ls, "skip %x add bast mode %d for bast mode %d",
 					  lkb->lkb_id, mode, prev_mode);
-				goto out;
+				return true;
 			}
 		}
 
@@ -85,8 +102,10 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
 			prev_mode = lkb->lkb_last_cast_cb_mode;
 
 			if (!status && lkb->lkb_lksb->sb_lvbptr &&
-			    dlm_lvb_operations[prev_mode + 1][mode + 1])
-				copy_lvb = 1;
+			    dlm_lvb_operations[prev_mode + 1][mode + 1]) {
+				if (copy_lvb)
+					*copy_lvb = 1;
+			}
 		}
 
 		lkb->lkb_last_cast_cb_mode = mode;
@@ -96,11 +115,19 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
 	lkb->lkb_last_cb_mode = mode;
 	lkb->lkb_last_cb_flags = flags;
 
+	return false;
+}
+
+int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode,
+	       int status, uint32_t sbflags,
+	       struct dlm_callback **cb)
+{
+	struct dlm_rsb *rsb = lkb->lkb_resource;
+	struct dlm_ls *ls = rsb->res_ls;
+
 	*cb = dlm_allocate_cb();
-	if (!*cb) {
-		rv = DLM_ENQUEUE_CALLBACK_FAILURE;
-		goto out;
-	}
+	if (WARN_ON_ONCE(!*cb))
+		return -ENOMEM;
 
 	/* for tracing */
 	(*cb)->lkb_id = lkb->lkb_id;
@@ -112,19 +139,34 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
 	(*cb)->mode = mode;
 	(*cb)->sb_status = status;
 	(*cb)->sb_flags = (sbflags & 0x000000FF);
-	(*cb)->copy_lvb = copy_lvb;
 	(*cb)->lkb_lksb = lkb->lkb_lksb;
 
-	rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
+	return 0;
+}
+
+static int dlm_get_queue_cb(struct dlm_lkb *lkb, uint32_t flags, int mode,
+			    int status, uint32_t sbflags,
+			    struct dlm_callback **cb)
+{
+	int rv;
 
-out:
-	return rv;
+	rv = dlm_get_cb(lkb, flags, mode, status, sbflags, cb);
+	if (rv)
+		return rv;
+
+	(*cb)->astfn = lkb->lkb_astfn;
+	(*cb)->bastfn = lkb->lkb_bastfn;
+	(*cb)->astparam = lkb->lkb_astparam;
+	INIT_WORK(&(*cb)->work, dlm_callback_work);
+
+	return 0;
 }
 
 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
-		  uint32_t sbflags)
+		uint32_t sbflags)
 {
-	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	struct dlm_rsb *rsb = lkb->lkb_resource;
+	struct dlm_ls *ls = rsb->res_ls;
 	struct dlm_callback *cb;
 	int rv;
 
@@ -133,35 +175,34 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 		return;
 	}
 
-	rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags,
-				    &cb);
-	switch (rv) {
-	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
-		cb->astfn = lkb->lkb_astfn;
-		cb->bastfn = lkb->lkb_bastfn;
-		cb->astparam = lkb->lkb_astparam;
-		INIT_WORK(&cb->work, dlm_callback_work);
-
-		spin_lock_bh(&ls->ls_cb_lock);
-		if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
+	if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL))
+		return;
+
+	spin_lock_bh(&ls->ls_cb_lock);
+	if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
+		rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb);
+		if (!rv)
 			list_add(&cb->list, &ls->ls_cb_delay);
-		else
-			queue_work(ls->ls_callback_wq, &cb->work);
-		spin_unlock_bh(&ls->ls_cb_lock);
-		break;
-	case DLM_ENQUEUE_CALLBACK_SUCCESS:
-		break;
-	case DLM_ENQUEUE_CALLBACK_FAILURE:
-		fallthrough;
-	default:
-		WARN_ON_ONCE(1);
-		break;
+	} else {
+		if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) {
+			dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags,
+					 sbflags, status, lkb->lkb_lksb,
+					 lkb->lkb_astfn, lkb->lkb_bastfn,
+					 lkb->lkb_astparam, rsb->res_name,
+					 rsb->res_length);
+		} else {
+			rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb);
+			if (!rv)
+				queue_work(ls->ls_callback_wq, &cb->work);
+		}
 	}
+	spin_unlock_bh(&ls->ls_cb_lock);
 }
 
 int dlm_callback_start(struct dlm_ls *ls)
 {
-	if (!test_bit(LSFL_FS, &ls->ls_flags))
+	if (!test_bit(LSFL_FS, &ls->ls_flags) ||
+	    test_bit(LSFL_SOFTIRQ, &ls->ls_flags))
 		return 0;
 
 	ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
@@ -207,7 +248,11 @@ void dlm_callback_resume(struct dlm_ls *ls)
 	spin_lock_bh(&ls->ls_cb_lock);
 	list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
 		list_del(&cb->list);
-		queue_work(ls->ls_callback_wq, &cb->work);
+		if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags))
+			dlm_do_callback(cb);
+		else
+			queue_work(ls->ls_callback_wq, &cb->work);
+
 		count++;
 		if (count == MAX_CB_QUEUE)
 			break;
diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h
index 9093ff043bee..e2b86845d331 100644
--- a/fs/dlm/ast.h
+++ b/fs/dlm/ast.h
@@ -11,12 +11,11 @@
 #ifndef __ASTD_DOT_H__
 #define __ASTD_DOT_H__
 
-#define DLM_ENQUEUE_CALLBACK_NEED_SCHED	1
-#define DLM_ENQUEUE_CALLBACK_SUCCESS	0
-#define DLM_ENQUEUE_CALLBACK_FAILURE	-1
-int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
-			   int status, uint32_t sbflags,
-			   struct dlm_callback **cb);
+bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+			   int status, uint32_t sbflags, int *copy_lvb);
+int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode,
+	       int status, uint32_t sbflags,
+	       struct dlm_callback **cb);
 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
                 uint32_t sbflags);
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3b026d80aa2b..e299d8d4d971 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -727,6 +727,7 @@ struct dlm_ls {
 #define LSFL_NODIR		10
 #define LSFL_RECV_MSG_BLOCKED	11
 #define LSFL_FS			12
+#define LSFL_SOFTIRQ		13
 
 #define DLM_PROC_FLAGS_CLOSING 1
 #define DLM_PROC_FLAGS_COMPAT  2
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 51f9516b710d..5b3a4c32ac99 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -407,6 +407,9 @@ static int new_lockspace(const char *name, const char *cluster,
 		ls->ls_ops_arg = ops_arg;
 	}
 
+	if (flags & DLM_LSFL_SOFTIRQ)
+		set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
+
 	/* ls_exflags are forced to match among nodes, and we don't
 	 * need to require all nodes to have some flags set
 	 */
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index f6635a5314f4..5cb3896be826 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -182,7 +182,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 	struct dlm_user_args *ua;
 	struct dlm_user_proc *proc;
 	struct dlm_callback *cb;
-	int rv;
+	int rv, copy_lvb;
 
 	if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
 	    test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
@@ -213,28 +213,22 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 
 	spin_lock_bh(&proc->asts_spin);
 
-	rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb);
-	switch (rv) {
-	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
-		cb->ua = *ua;
-		cb->lkb_lksb = &cb->ua.lksb;
-		if (cb->copy_lvb) {
-			memcpy(cb->lvbptr, ua->lksb.sb_lvbptr,
-			       DLM_USER_LVB_LEN);
-			cb->lkb_lksb->sb_lvbptr = cb->lvbptr;
+	if (!dlm_may_skip_callback(lkb, flags, mode, status, sbflags,
+				   &copy_lvb)) {
+		rv = dlm_get_cb(lkb, flags, mode, status, sbflags, &cb);
+		if (!rv) {
+			cb->copy_lvb = copy_lvb;
+			cb->ua = *ua;
+			cb->lkb_lksb = &cb->ua.lksb;
+			if (copy_lvb) {
+				memcpy(cb->lvbptr, ua->lksb.sb_lvbptr,
+				       DLM_USER_LVB_LEN);
+				cb->lkb_lksb->sb_lvbptr = cb->lvbptr;
+			}
+
+			list_add_tail(&cb->list, &proc->asts);
+			wake_up_interruptible(&proc->wait);
 		}
-
-		list_add_tail(&cb->list, &proc->asts);
-		wake_up_interruptible(&proc->wait);
-		break;
-	case DLM_ENQUEUE_CALLBACK_SUCCESS:
-		break;
-	case DLM_ENQUEUE_CALLBACK_FAILURE:
-		fallthrough;
-	default:
-		spin_unlock_bh(&proc->asts_spin);
-		WARN_ON_ONCE(1);
-		goto out;
 	}
 	spin_unlock_bh(&proc->asts_spin);
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 7/8] dlm: convert ls_cb_lock to rwlock
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
                   ` (5 preceding siblings ...)
  2024-06-03 21:55 ` [PATCH dlm/next 6/8] dlm: implement LSFL_SOFTIRQ_SAFE Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-03 21:55 ` [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace() Alexander Aring
  7 siblings, 0 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

Currently a parallel call of dlm_add_cb() can happen either from the DLM
API call or from the DLM message receive context. In this case a rwlock
could have a benefit when both context running into the same critical
section at the same time. In future more parallel message receive
context can happen at the same time so that this conversion of a per
lockspace lock to a rwlock is more useful. In far future the whole
delayed callbacks might not be necessary when the synchronization
happens in a different way than it is now done.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          | 21 +++++++++++++++------
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lockspace.c    |  2 +-
 3 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 742b30b61c19..ce8f1f5dfa0c 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -178,11 +178,20 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 	if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL))
 		return;
 
-	spin_lock_bh(&ls->ls_cb_lock);
+retry:
+	read_lock_bh(&ls->ls_cb_lock);
 	if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
+		read_unlock_bh(&ls->ls_cb_lock);
+		write_lock_bh(&ls->ls_cb_lock);
+		if (!test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
+			write_unlock_bh(&ls->ls_cb_lock);
+			goto retry;
+		}
+
 		rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb);
 		if (!rv)
 			list_add(&cb->list, &ls->ls_cb_delay);
+		write_unlock_bh(&ls->ls_cb_lock);
 	} else {
 		if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) {
 			dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags,
@@ -195,8 +204,8 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 			if (!rv)
 				queue_work(ls->ls_callback_wq, &cb->work);
 		}
+		read_unlock_bh(&ls->ls_cb_lock);
 	}
-	spin_unlock_bh(&ls->ls_cb_lock);
 }
 
 int dlm_callback_start(struct dlm_ls *ls)
@@ -225,9 +234,9 @@ void dlm_callback_suspend(struct dlm_ls *ls)
 	if (!test_bit(LSFL_FS, &ls->ls_flags))
 		return;
 
-	spin_lock_bh(&ls->ls_cb_lock);
+	write_lock_bh(&ls->ls_cb_lock);
 	set_bit(LSFL_CB_DELAY, &ls->ls_flags);
-	spin_unlock_bh(&ls->ls_cb_lock);
+	write_unlock_bh(&ls->ls_cb_lock);
 
 	if (ls->ls_callback_wq)
 		flush_workqueue(ls->ls_callback_wq);
@@ -245,7 +254,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 		return;
 
 more:
-	spin_lock_bh(&ls->ls_cb_lock);
+	write_lock_bh(&ls->ls_cb_lock);
 	list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
 		list_del(&cb->list);
 		if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags))
@@ -260,7 +269,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 	empty = list_empty(&ls->ls_cb_delay);
 	if (empty)
 		clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
-	spin_unlock_bh(&ls->ls_cb_lock);
+	write_unlock_bh(&ls->ls_cb_lock);
 
 	sum += count;
 	if (!empty) {
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index e299d8d4d971..5a7fbfec26fb 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -653,7 +653,7 @@ struct dlm_ls {
 
 	/* recovery related */
 
-	spinlock_t		ls_cb_lock;
+	rwlock_t		ls_cb_lock;
 	struct list_head	ls_cb_delay; /* save for queue_work later */
 	struct task_struct	*ls_recoverd_task;
 	struct mutex		ls_recoverd_active;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 5b3a4c32ac99..f6918f366faa 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -449,7 +449,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_completion(&ls->ls_recovery_done);
 	ls->ls_recovery_result = -1;
 
-	spin_lock_init(&ls->ls_cb_lock);
+	rwlock_init(&ls->ls_cb_lock);
 	INIT_LIST_HEAD(&ls->ls_cb_delay);
 
 	ls->ls_recoverd_task = NULL;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
  2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
                   ` (6 preceding siblings ...)
  2024-06-03 21:55 ` [PATCH dlm/next 7/8] dlm: convert ls_cb_lock to rwlock Alexander Aring
@ 2024-06-03 21:55 ` Alexander Aring
  2024-06-05 18:54   ` Alexander Aring
  7 siblings, 1 reply; 15+ messages in thread
From: Alexander Aring @ 2024-06-03 21:55 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid, aahringo

Recently the DLM subsystem introduced the flag DLM_LSFL_SOFTIRQ for
dlm_new_lockspace() to signal the capability to handle DLM ast/bast
callbacks in softirq context to avoid an additional context switch due
the DLM callback workqueue.

The md-cluster implementation only does synchronized calls above the
async DLM API. That synchronized API should may be also offered by DLM,
however it is very simple as md-cluster callbacks only does a complete()
call for their wait_for_completion() wait that is occurred after the
async DLM API call. This patch activates the recently introduced
DLM_LSFL_SOFTIRQ flag that allows that the DLM callbacks are executed in
a softirq context that md-cluster can handle. It is reducing a
unnecessary context workqueue switch and should speed up DLM in some
circumstance.

In future other DLM users e.g. gfs2 will also take usage of this flag to
avoid the additional context switch due the DLM callback workqueue. In
far future hopefully we can remove this kernel lockspace flag only and
remove the callback workqueue at all.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 drivers/md/md-cluster.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/drivers/md/md-cluster.c b/drivers/md/md-cluster.c
index 8e36a0feec09..eb9bbf12c8d8 100644
--- a/drivers/md/md-cluster.c
+++ b/drivers/md/md-cluster.c
@@ -887,7 +887,7 @@ static int join(struct mddev *mddev, int nodes)
 	memset(str, 0, 64);
 	sprintf(str, "%pU", mddev->uuid);
 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
-				0, LVB_SIZE, &md_ls_ops, mddev,
+				DLM_LSFL_SOFTIRQ, LVB_SIZE, &md_ls_ops, mddev,
 				&ops_rv, &cinfo->lockspace);
 	if (ret)
 		goto err;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* Re: [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
  2024-06-03 21:55 ` [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace() Alexander Aring
@ 2024-06-05 18:54   ` Alexander Aring
  2024-06-06  2:48     ` Heming Zhao
  0 siblings, 1 reply; 15+ messages in thread
From: Alexander Aring @ 2024-06-05 18:54 UTC (permalink / raw
  To: teigland, heming.zhao, yukuai3, song; +Cc: gfs2, linux-raid

Hi,

On Mon, Jun 3, 2024 at 5:56 PM Alexander Aring <aahringo@redhat.com> wrote:
>
> Recently the DLM subsystem introduced the flag DLM_LSFL_SOFTIRQ for
> dlm_new_lockspace() to signal the capability to handle DLM ast/bast
> callbacks in softirq context to avoid an additional context switch due
> the DLM callback workqueue.
>
> The md-cluster implementation only does synchronized calls above the
> async DLM API. That synchronized API should may be also offered by DLM,
> however it is very simple as md-cluster callbacks only does a complete()
> call for their wait_for_completion() wait that is occurred after the
> async DLM API call. This patch activates the recently introduced
> DLM_LSFL_SOFTIRQ flag that allows that the DLM callbacks are executed in
> a softirq context that md-cluster can handle. It is reducing a
> unnecessary context workqueue switch and should speed up DLM in some
> circumstance.
>

Can somebody with a md-cluster environment test it as well? All I was
doing was (with a working dlm_controld cluster env):

mdadm --create /dev/md0 --bitmap=clustered --metadata=1.2
--raid-devices=2 --level=mirror /dev/sda /dev/sdb

sda and sdb are shared block devices on each node.

Create a /etc/mdadm.conf with the content mostly out of:

mdadm --detail --scan

on each node.

Then call mdadm --assemble on all nodes where not "mdadm --create ..." happened.
I hope that is the right thing to do and I had with "dlm_tool ls" a
UUID as a lockspace name with some md-cluster locks being around.

To bring this new flag upstream, would it be okay to get this through
dlm-tree? I am requesting here for an "Acked-by" tag from the md
maintainers.

Thanks.

- Alex


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
  2024-06-05 18:54   ` Alexander Aring
@ 2024-06-06  2:48     ` Heming Zhao
  2024-06-06 14:33       ` Alexander Aring
  0 siblings, 1 reply; 15+ messages in thread
From: Heming Zhao @ 2024-06-06  2:48 UTC (permalink / raw
  To: Alexander Aring, teigland, yukuai3, song; +Cc: gfs2, linux-raid

On 6/6/24 02:54, Alexander Aring wrote:
> Hi,
> 
> On Mon, Jun 3, 2024 at 5:56 PM Alexander Aring <aahringo@redhat.com> wrote:
>>
>> Recently the DLM subsystem introduced the flag DLM_LSFL_SOFTIRQ for
>> dlm_new_lockspace() to signal the capability to handle DLM ast/bast
>> callbacks in softirq context to avoid an additional context switch due
>> the DLM callback workqueue.
>>
>> The md-cluster implementation only does synchronized calls above the
>> async DLM API. That synchronized API should may be also offered by DLM,
>> however it is very simple as md-cluster callbacks only does a complete()
>> call for their wait_for_completion() wait that is occurred after the
>> async DLM API call. This patch activates the recently introduced
>> DLM_LSFL_SOFTIRQ flag that allows that the DLM callbacks are executed in
>> a softirq context that md-cluster can handle. It is reducing a
>> unnecessary context workqueue switch and should speed up DLM in some
>> circumstance.
>>
> 
> Can somebody with a md-cluster environment test it as well? All I was
> doing was (with a working dlm_controld cluster env):
> 
> mdadm --create /dev/md0 --bitmap=clustered --metadata=1.2
> --raid-devices=2 --level=mirror /dev/sda /dev/sdb
> 
> sda and sdb are shared block devices on each node.
> 
> Create a /etc/mdadm.conf with the content mostly out of:
> 
> mdadm --detail --scan
> 
> on each node.
> 
> Then call mdadm --assemble on all nodes where not "mdadm --create ..." happened.
> I hope that is the right thing to do and I had with "dlm_tool ls" a
> UUID as a lockspace name with some md-cluster locks being around.

The above setup method is correct.
SUSE doc [1] provides more details on assembling the clustered array.

[1]: https://documentation.suse.com/fr-fr/sle-ha/15-SP5/html/SLE-HA-all/cha-ha-cluster-md.html#sec-ha-cluster-md-create

> 
> To bring this new flag upstream, would it be okay to get this through
> dlm-tree? I am requesting here for an "Acked-by" tag from the md
> maintainers.
> 

I compiled & tested the dlm-tree [2] with SUSE CI env, and didn't see these
patches introduce new issue.

[2]: https://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm.git/log/?h=next

Thanks,
Heming

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
  2024-06-06  2:48     ` Heming Zhao
@ 2024-06-06 14:33       ` Alexander Aring
  2024-06-06 22:55         ` Heming Zhao
  2024-06-08  4:21         ` Song Liu
  0 siblings, 2 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-06 14:33 UTC (permalink / raw
  To: Heming Zhao, yukuai3, song; +Cc: teigland, gfs2, linux-raid

Hi Heming,

On Wed, Jun 5, 2024 at 10:48 PM Heming Zhao <heming.zhao@suse.com> wrote:
>
> On 6/6/24 02:54, Alexander Aring wrote:
> > Hi,
> >
> > On Mon, Jun 3, 2024 at 5:56 PM Alexander Aring <aahringo@redhat.com> wrote:
> >>
> >> Recently the DLM subsystem introduced the flag DLM_LSFL_SOFTIRQ for
> >> dlm_new_lockspace() to signal the capability to handle DLM ast/bast
> >> callbacks in softirq context to avoid an additional context switch due
> >> the DLM callback workqueue.
> >>
> >> The md-cluster implementation only does synchronized calls above the
> >> async DLM API. That synchronized API should may be also offered by DLM,
> >> however it is very simple as md-cluster callbacks only does a complete()
> >> call for their wait_for_completion() wait that is occurred after the
> >> async DLM API call. This patch activates the recently introduced
> >> DLM_LSFL_SOFTIRQ flag that allows that the DLM callbacks are executed in
> >> a softirq context that md-cluster can handle. It is reducing a
> >> unnecessary context workqueue switch and should speed up DLM in some
> >> circumstance.
> >>
> >
> > Can somebody with a md-cluster environment test it as well? All I was
> > doing was (with a working dlm_controld cluster env):
> >
> > mdadm --create /dev/md0 --bitmap=clustered --metadata=1.2
> > --raid-devices=2 --level=mirror /dev/sda /dev/sdb
> >
> > sda and sdb are shared block devices on each node.
> >
> > Create a /etc/mdadm.conf with the content mostly out of:
> >
> > mdadm --detail --scan
> >
> > on each node.
> >
> > Then call mdadm --assemble on all nodes where not "mdadm --create ..." happened.
> > I hope that is the right thing to do and I had with "dlm_tool ls" a
> > UUID as a lockspace name with some md-cluster locks being around.
>
> The above setup method is correct.
> SUSE doc [1] provides more details on assembling the clustered array.
>

yea, I saw that and mostly cut it down into the necessary steps in my
development setup.

Thanks for confirming I did something right here.

> [1]: https://documentation.suse.com/fr-fr/sle-ha/15-SP5/html/SLE-HA-all/cha-ha-cluster-md.html#sec-ha-cluster-md-create
>
> >
> > To bring this new flag upstream, would it be okay to get this through
> > dlm-tree? I am requesting here for an "Acked-by" tag from the md
> > maintainers.
> >
>
> I compiled & tested the dlm-tree [2] with SUSE CI env, and didn't see these
> patches introduce new issue.
>

Thanks for doing that. So that means you tried the dlm-tree with this
patch series applied?

Song or Yu, can I get an "Acked-by" from you and an answer if it is
okay that this md-cluster.c patch goes upstream via dlm-tree?

Thanks.

- Alex


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
  2024-06-06 14:33       ` Alexander Aring
@ 2024-06-06 22:55         ` Heming Zhao
  2024-06-08  4:21         ` Song Liu
  1 sibling, 0 replies; 15+ messages in thread
From: Heming Zhao @ 2024-06-06 22:55 UTC (permalink / raw
  To: Alexander Aring, yukuai3, song; +Cc: teigland, gfs2, linux-raid

On 6/6/24 22:33, Alexander Aring wrote:
> Hi Heming,
> 
> On Wed, Jun 5, 2024 at 10:48 PM Heming Zhao <heming.zhao@suse.com> wrote:
>>
>> On 6/6/24 02:54, Alexander Aring wrote:
>>> Hi,
>>>
>>> On Mon, Jun 3, 2024 at 5:56 PM Alexander Aring <aahringo@redhat.com> wrote:
>>>>
>>>> Recently the DLM subsystem introduced the flag DLM_LSFL_SOFTIRQ for
>>>> dlm_new_lockspace() to signal the capability to handle DLM ast/bast
>>>> callbacks in softirq context to avoid an additional context switch due
>>>> the DLM callback workqueue.
>>>>
>>>> The md-cluster implementation only does synchronized calls above the
>>>> async DLM API. That synchronized API should may be also offered by DLM,
>>>> however it is very simple as md-cluster callbacks only does a complete()
>>>> call for their wait_for_completion() wait that is occurred after the
>>>> async DLM API call. This patch activates the recently introduced
>>>> DLM_LSFL_SOFTIRQ flag that allows that the DLM callbacks are executed in
>>>> a softirq context that md-cluster can handle. It is reducing a
>>>> unnecessary context workqueue switch and should speed up DLM in some
>>>> circumstance.
>>>>
>>>
>>> Can somebody with a md-cluster environment test it as well? All I was
>>> doing was (with a working dlm_controld cluster env):
>>>
>>> mdadm --create /dev/md0 --bitmap=clustered --metadata=1.2
>>> --raid-devices=2 --level=mirror /dev/sda /dev/sdb
>>>
>>> sda and sdb are shared block devices on each node.
>>>
>>> Create a /etc/mdadm.conf with the content mostly out of:
>>>
>>> mdadm --detail --scan
>>>
>>> on each node.
>>>
>>> Then call mdadm --assemble on all nodes where not "mdadm --create ..." happened.
>>> I hope that is the right thing to do and I had with "dlm_tool ls" a
>>> UUID as a lockspace name with some md-cluster locks being around.
>>
>> The above setup method is correct.
>> SUSE doc [1] provides more details on assembling the clustered array.
>>
> 
> yea, I saw that and mostly cut it down into the necessary steps in my
> development setup.
> 
> Thanks for confirming I did something right here.
> 
>> [1]: https://documentation.suse.com/fr-fr/sle-ha/15-SP5/html/SLE-HA-all/cha-ha-cluster-md.html#sec-ha-cluster-md-create
>>
>>>
>>> To bring this new flag upstream, would it be okay to get this through
>>> dlm-tree? I am requesting here for an "Acked-by" tag from the md
>>> maintainers.
>>>
>>
>> I compiled & tested the dlm-tree [2] with SUSE CI env, and didn't see these
>> patches introduce new issue.
>>
> 
> Thanks for doing that. So that means you tried the dlm-tree with this
> patch series applied?

Yes.

-Heming

> 
> Song or Yu, can I get an "Acked-by" from you and an answer if it is
> okay that this md-cluster.c patch goes upstream via dlm-tree?
> 
> Thanks.
> 
> - Alex
> 


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
  2024-06-06 14:33       ` Alexander Aring
  2024-06-06 22:55         ` Heming Zhao
@ 2024-06-08  4:21         ` Song Liu
  1 sibling, 0 replies; 15+ messages in thread
From: Song Liu @ 2024-06-08  4:21 UTC (permalink / raw
  To: Alexander Aring; +Cc: Heming Zhao, yukuai3, teigland, gfs2, linux-raid

On Thu, Jun 6, 2024 at 7:34 AM Alexander Aring <aahringo@redhat.com> wrote:
>
> Hi Heming,
>
> On Wed, Jun 5, 2024 at 10:48 PM Heming Zhao <heming.zhao@suse.com> wrote:
> >
> > On 6/6/24 02:54, Alexander Aring wrote:
> > > Hi,
> > >
> > > On Mon, Jun 3, 2024 at 5:56 PM Alexander Aring <aahringo@redhat.com> wrote:
> > >>
> > >> Recently the DLM subsystem introduced the flag DLM_LSFL_SOFTIRQ for
> > >> dlm_new_lockspace() to signal the capability to handle DLM ast/bast
> > >> callbacks in softirq context to avoid an additional context switch due
> > >> the DLM callback workqueue.
> > >>
> > >> The md-cluster implementation only does synchronized calls above the
> > >> async DLM API. That synchronized API should may be also offered by DLM,
> > >> however it is very simple as md-cluster callbacks only does a complete()
> > >> call for their wait_for_completion() wait that is occurred after the
> > >> async DLM API call. This patch activates the recently introduced
> > >> DLM_LSFL_SOFTIRQ flag that allows that the DLM callbacks are executed in
> > >> a softirq context that md-cluster can handle. It is reducing a
> > >> unnecessary context workqueue switch and should speed up DLM in some
> > >> circumstance.
> > >>
> > >
> > > Can somebody with a md-cluster environment test it as well? All I was
> > > doing was (with a working dlm_controld cluster env):
> > >
> > > mdadm --create /dev/md0 --bitmap=clustered --metadata=1.2
> > > --raid-devices=2 --level=mirror /dev/sda /dev/sdb
> > >
> > > sda and sdb are shared block devices on each node.
> > >
> > > Create a /etc/mdadm.conf with the content mostly out of:
> > >
> > > mdadm --detail --scan
> > >
> > > on each node.
> > >
> > > Then call mdadm --assemble on all nodes where not "mdadm --create ..." happened.
> > > I hope that is the right thing to do and I had with "dlm_tool ls" a
> > > UUID as a lockspace name with some md-cluster locks being around.
> >
> > The above setup method is correct.
> > SUSE doc [1] provides more details on assembling the clustered array.
> >
>
> yea, I saw that and mostly cut it down into the necessary steps in my
> development setup.
>
> Thanks for confirming I did something right here.
>
> > [1]: https://documentation.suse.com/fr-fr/sle-ha/15-SP5/html/SLE-HA-all/cha-ha-cluster-md.html#sec-ha-cluster-md-create
> >
> > >
> > > To bring this new flag upstream, would it be okay to get this through
> > > dlm-tree? I am requesting here for an "Acked-by" tag from the md
> > > maintainers.
> > >
> >
> > I compiled & tested the dlm-tree [2] with SUSE CI env, and didn't see these
> > patches introduce new issue.
> >
>
> Thanks for doing that. So that means you tried the dlm-tree with this
> patch series applied?
>
> Song or Yu, can I get an "Acked-by" from you and an answer if it is
> okay that this md-cluster.c patch goes upstream via dlm-tree?

LGTM.

Acked-by: Song Liu <song@kernel.org>

Yes, let's route this via dlm-tree.

Thanks,
Song

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH dlm/next 2/8] dlm: remove struct field with the same meaning
  2024-06-03 21:55 ` [PATCH dlm/next 2/8] dlm: remove struct field with the same meaning Alexander Aring
@ 2024-06-10 14:17   ` Alexander Aring
  0 siblings, 0 replies; 15+ messages in thread
From: Alexander Aring @ 2024-06-10 14:17 UTC (permalink / raw
  To: teigland; +Cc: gfs2, song, yukuai3, linux-raid

Hi,


On Mon, Jun 3, 2024 at 5:56 PM Alexander Aring <aahringo@redhat.com> wrote:
>
> There is currently "res_nodeid" and "res_master_nodeid" fields in
> "struct dlm_rsb". At some point a developer does not know when to use
> which one or forget to update one and they are out of sync. This patch
> removes the "res_nodeid" and allows "res_master_nodeid" only. They have
> different representation values about their invalid values, the
> "res_master_nodeid" seems to be the modern way of represent the actual
> master nodeid and actually use the nodeid value when the own nodeid is
> the master (on res_nodeid the 0 represented this value). Also the modern
> nodeid representation fits into a "unsigned" range as this avoids to
> convert negative values over the network. The old value representation
> is still part of the DLM networking protocol that's why the conversion
> functions dlm_res_nodeid() and dlm_res_master_nodeid() are still
> present. On a new major DLM version bump protocol the nodeid representation
> should be updated to the modern value representation. These conversion
> functions also applies for existing UAPI and the user space still
> assumes the old "res_nodeid" value representation.
>
> The same arguments applies to "lkb_nodeid" and "lkb_master_nodeid"
> wheras this is also only a copied value from another lkb related field
> "lkb_resource" and it's "res_master_nodeid" value. In this case it
> requires more code review because "lkb_resource" is not set sometimes.
>
> This patch so far makes the code easier to read and understandable
> because we don't have several fields with the same meaning in some
> structs. In case of the previously "res_nodeid" value, sometimes an
> additional check on our_nodeid() is required to set the value to 0 that
> represents in this value representation that we are the master node.
>
> Signed-off-by: Alexander Aring <aahringo@redhat.com>

__dlm_master_lookup() returns "-1" (the old value representation for
invalid nodeid) and it should return 0 for invalid now.
Also we forgot dlm_send_rcom_lookup()/receive_rcom_lookup() to update
using the conversion functions.

- Alex


^ permalink raw reply	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2024-06-10 14:17 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-06-03 21:55 [PATCH dlm/next 0/8] dlm: md: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 1/8] dlm: using rcu to avoid rsb lookup again Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 2/8] dlm: remove struct field with the same meaning Alexander Aring
2024-06-10 14:17   ` Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 3/8] dlm: use is_master() on checks if we are the master Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 4/8] dlm: use LSFL_FS to check if it's a kernel lockspace Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 5/8] dlm: introduce DLM_LSFL_SOFTIRQ_SAFE Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 6/8] dlm: implement LSFL_SOFTIRQ_SAFE Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 7/8] dlm: convert ls_cb_lock to rwlock Alexander Aring
2024-06-03 21:55 ` [PATCH dlm/next 8/8] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace() Alexander Aring
2024-06-05 18:54   ` Alexander Aring
2024-06-06  2:48     ` Heming Zhao
2024-06-06 14:33       ` Alexander Aring
2024-06-06 22:55         ` Heming Zhao
2024-06-08  4:21         ` Song Liu

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).