gfs2.lists.linux.dev archive mirror
 help / color / mirror / Atom feed
* [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers
@ 2024-05-28 21:12 Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 02/11] dlm: don't kref_init rsbs created for toss list Alexander Aring
                   ` (9 more replies)
  0 siblings, 10 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch removes some leftover related code from dlm_scand that was
dropped in commit b1f2381c1a8d ("dlm: drop dlm_scand kthread and use
timers").

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h | 8 --------
 fs/dlm/lockspace.c    | 1 -
 2 files changed, 9 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 9085ba3b2f20..9618ce0720d9 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -559,13 +559,6 @@ struct rcom_lock {
 	char			rl_lvb[];
 };
 
-/*
- * The max number of resources per rsbtbl bucket that shrink will attempt
- * to remove in each iteration.
- */
-
-#define DLM_REMOVE_NAMES_MAX 8
-
 struct dlm_ls {
 	struct list_head	ls_list;	/* list of lockspaces */
 	dlm_lockspace_t		*ls_local_handle;
@@ -578,7 +571,6 @@ struct dlm_ls {
 	wait_queue_head_t	ls_count_wait;
 	int			ls_create_count; /* create/release refcount */
 	unsigned long		ls_flags;	/* LSFL_ */
-	unsigned long		ls_scan_time;
 	struct kobject		ls_kobj;
 
 	struct idr		ls_lkbidr;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 475ab4370dda..b6a1a6eb7f27 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -410,7 +410,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	atomic_set(&ls->ls_count, 0);
 	init_waitqueue_head(&ls->ls_count_wait);
 	ls->ls_flags = 0;
-	ls->ls_scan_time = jiffies;
 
 	if (ops && dlm_config.ci_recover_callbacks) {
 		ls->ls_ops = ops;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 02/11] dlm: don't kref_init rsbs created for toss list
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 03/11] dlm: remove unused parameter in dlm_midcomms_addr Alexander Aring
                   ` (8 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch removes a kref_init() that isn't necessary because the rsb is
created for toss list. Under toss list the rsb should not have any
reference counting logic. If in theory the rsb gets to into keep list
then a kref_init() for res_ref will be initiated.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f103b8c30592..e66972ed97b1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1385,7 +1385,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_dir_nodeid = our_nodeid;
 	r->res_master_nodeid = from_nodeid;
 	r->res_nodeid = from_nodeid;
-	kref_init(&r->res_ref);
 	rsb_set_flag(r, RSB_TOSS);
 
 	write_lock_bh(&ls->ls_rsbtbl_lock);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 03/11] dlm: remove unused parameter in dlm_midcomms_addr
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 02/11] dlm: don't kref_init rsbs created for toss list Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 04/11] dlm: remove ls_local_handle from struct dlm_ls Alexander Aring
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch removes an parameter which is currently not used by
dlm_midcomms_addr().

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/config.c   | 2 +-
 fs/dlm/lowcomms.c | 2 +-
 fs/dlm/lowcomms.h | 2 +-
 fs/dlm/midcomms.c | 4 ++--
 fs/dlm/midcomms.h | 2 +-
 5 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 517fa975dc5a..99952234799e 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -672,7 +672,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf,
 
 	memcpy(addr, buf, len);
 
-	rv = dlm_midcomms_addr(cm->nodeid, addr, len);
+	rv = dlm_midcomms_addr(cm->nodeid, addr);
 	if (rv) {
 		kfree(addr);
 		return rv;
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 6b8078085e56..591385701c7d 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -461,7 +461,7 @@ static bool dlm_lowcomms_con_has_addr(const struct connection *con,
 	return false;
 }
 
-int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
+int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr)
 {
 	struct connection *con;
 	bool ret, idx;
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 8deb16f8f620..fd0df604eb93 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -46,7 +46,7 @@ void dlm_lowcomms_put_msg(struct dlm_msg *msg);
 int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
 int dlm_lowcomms_connect_node(int nodeid);
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
-int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
+int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr);
 void dlm_midcomms_receive_done(int nodeid);
 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void);
 struct kmem_cache *dlm_lowcomms_msg_cache_create(void);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index c34f38e9ee5c..2c101bbe261a 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -334,12 +334,12 @@ static struct midcomms_node *nodeid2node(int nodeid)
 	return __find_node(nodeid, nodeid_hash(nodeid));
 }
 
-int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
+int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr)
 {
 	int ret, idx, r = nodeid_hash(nodeid);
 	struct midcomms_node *node;
 
-	ret = dlm_lowcomms_addr(nodeid, addr, len);
+	ret = dlm_lowcomms_addr(nodeid, addr);
 	if (ret)
 		return ret;
 
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index 278d26fdeb2c..7fad1d170bba 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -19,7 +19,7 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc);
 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
 				 int namelen);
-int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
+int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr);
 void dlm_midcomms_version_wait(void);
 int dlm_midcomms_close(int nodeid);
 int dlm_midcomms_start(void);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 04/11] dlm: remove ls_local_handle from struct dlm_ls
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 02/11] dlm: don't kref_init rsbs created for toss list Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 03/11] dlm: remove unused parameter in dlm_midcomms_addr Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 05/11] dlm: rename dlm_find_lockspace_local to dlm_get_lockspace Alexander Aring
                   ` (6 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch removes ls_local_handle from struct dlm_ls as it stores the
ls pointer of the top level structure itesef and this isn't necessary.
There is a lookup functionality to lookup the lockspace in
dlm_find_lockspace_local() but the given input parameter is the pointer
already. This might be more safe to lookup a lockspace but given a wrong
lockspace pointer is a bug in the code and we save the additional lookup
here. The dlm_ls structure can be still hidden by using dlm_lockspace_t
handle pointer.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  1 -
 fs/dlm/lockspace.c    | 16 +++-------------
 fs/dlm/user.c         |  4 ++--
 3 files changed, 5 insertions(+), 16 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 9618ce0720d9..e93ed8f7addd 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -561,7 +561,6 @@ struct rcom_lock {
 
 struct dlm_ls {
 	struct list_head	ls_list;	/* list of lockspaces */
-	dlm_lockspace_t		*ls_local_handle;
 	uint32_t		ls_global_id;	/* global unique lockspace ID */
 	uint32_t		ls_generation;
 	uint32_t		ls_exflags;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index b6a1a6eb7f27..8155d7475c79 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -38,7 +38,7 @@ static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 
 	if (rc)
 		return rc;
-	ls = dlm_find_lockspace_local(ls->ls_local_handle);
+	ls = dlm_find_lockspace_local(ls);
 	if (!ls)
 		return -EINVAL;
 
@@ -265,18 +265,9 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 
 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 {
-	struct dlm_ls *ls;
+	struct dlm_ls *ls = lockspace;
 
-	spin_lock_bh(&lslist_lock);
-	list_for_each_entry(ls, &lslist, ls_list) {
-		if (ls->ls_local_handle == lockspace) {
-			atomic_inc(&ls->ls_count);
-			goto out;
-		}
-	}
-	ls = NULL;
- out:
-	spin_unlock_bh(&lslist_lock);
+	atomic_inc(&ls->ls_count);
 	return ls;
 }
 
@@ -496,7 +487,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	idr_init(&ls->ls_recover_idr);
 	spin_lock_init(&ls->ls_recover_idr_lock);
 	ls->ls_recover_list_count = 0;
-	ls->ls_local_handle = ls;
 	init_waitqueue_head(&ls->ls_wait_general);
 	INIT_LIST_HEAD(&ls->ls_masters_list);
 	rwlock_init(&ls->ls_masters_lock);
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 3173b974e8c8..f6635a5314f4 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -454,7 +454,7 @@ static int device_remove_lockspace(struct dlm_lspace_params *params)
 	if (params->flags & DLM_USER_LSFLG_FORCEFREE)
 		force = 2;
 
-	lockspace = ls->ls_local_handle;
+	lockspace = ls;
 	dlm_put_lockspace(ls);
 
 	/* The final dlm_release_lockspace waits for references to go to
@@ -657,7 +657,7 @@ static int device_open(struct inode *inode, struct file *file)
 		return -ENOMEM;
 	}
 
-	proc->lockspace = ls->ls_local_handle;
+	proc->lockspace = ls;
 	INIT_LIST_HEAD(&proc->asts);
 	INIT_LIST_HEAD(&proc->locks);
 	INIT_LIST_HEAD(&proc->unlocking);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 05/11] dlm: rename dlm_find_lockspace_local to dlm_get_lockspace
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
                   ` (2 preceding siblings ...)
  2024-05-28 21:12 ` [PATCH v6.10-rc1 04/11] dlm: remove ls_local_handle from struct dlm_ls Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 06/11] dlm: drop own rsb pre allocation mechanism Alexander Aring
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Previous patch got rid of the unnecessary loop to lookup the lockspace
pointer but the pointer to lookup was already the pointer that was
looking for. The purpose of dlm_find_lockspace_local() is more a
dlm_get_lockspace() to avoid lockspace releases. There is another
question if dlm_get_lockspace() is really necessary to call as it is
called in a lockspace resource itself and during this time the lockspace
cannot be freed as it is used by e.g. udev. Another questionable current
handling is refcounting when calling dlm api that might be an DLM user
problem because calling dlm_lockspace_release() when there are still
pending API calls around. However this might need more of a full code
review in future to remove some of them.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c      |  4 ++--
 fs/dlm/lockspace.c |  8 ++++----
 fs/dlm/lockspace.h |  2 +-
 fs/dlm/plock.c     |  8 ++++----
 fs/dlm/recoverd.c  |  4 ++--
 fs/dlm/user.c      | 12 ++++++------
 6 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e66972ed97b1..205d6a20fcee 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3391,7 +3391,7 @@ int dlm_lock(dlm_lockspace_t *lockspace,
 	struct dlm_args args;
 	int error, convert = flags & DLM_LKF_CONVERT;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -EINVAL;
 
@@ -3443,7 +3443,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 	struct dlm_args args;
 	int error;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -EINVAL;
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 8155d7475c79..d25a09d6c710 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -38,7 +38,7 @@ static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 
 	if (rc)
 		return rc;
-	ls = dlm_find_lockspace_local(ls);
+	ls = dlm_get_lockspace(ls);
 	if (!ls)
 		return -EINVAL;
 
@@ -263,7 +263,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 	return ls;
 }
 
-struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
+struct dlm_ls *dlm_get_lockspace(dlm_lockspace_t *lockspace)
 {
 	struct dlm_ls *ls = lockspace;
 
@@ -794,12 +794,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
  * 3 - destroy lockspace as part of a forced shutdown
  */
 
-int dlm_release_lockspace(void *lockspace, int force)
+int dlm_release_lockspace(dlm_lockspace_t *lockspace, int force)
 {
 	struct dlm_ls *ls;
 	int error;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -EINVAL;
 	dlm_put_lockspace(ls);
diff --git a/fs/dlm/lockspace.h b/fs/dlm/lockspace.h
index 47ebd4411926..d59ce3fea03b 100644
--- a/fs/dlm/lockspace.h
+++ b/fs/dlm/lockspace.h
@@ -23,7 +23,7 @@
 int dlm_lockspace_init(void);
 void dlm_lockspace_exit(void);
 struct dlm_ls *dlm_find_lockspace_global(uint32_t id);
-struct dlm_ls *dlm_find_lockspace_local(void *id);
+struct dlm_ls *dlm_get_lockspace(dlm_lockspace_t *lockspace);
 struct dlm_ls *dlm_find_lockspace_device(int minor);
 void dlm_put_lockspace(struct dlm_ls *ls);
 void dlm_stop_lockspaces(void);
diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c
index 9ca83ef70ed1..1ff172718d59 100644
--- a/fs/dlm/plock.c
+++ b/fs/dlm/plock.c
@@ -127,7 +127,7 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
 	struct plock_op *op;
 	int rv;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -EINVAL;
 
@@ -293,7 +293,7 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
 	int rv;
 	unsigned char saved_flags = fl->c.flc_flags;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -EINVAL;
 
@@ -370,7 +370,7 @@ int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file,
 	if (WARN_ON_ONCE(!fl->fl_lmops || !fl->fl_lmops->lm_grant))
 		return -EOPNOTSUPP;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -EINVAL;
 
@@ -426,7 +426,7 @@ int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file,
 	struct plock_op *op;
 	int rv;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -EINVAL;
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 17a40d1e6036..7e6c2c27d815 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -389,9 +389,9 @@ static void do_ls_recovery(struct dlm_ls *ls)
 
 static int dlm_recoverd(void *arg)
 {
-	struct dlm_ls *ls;
+	struct dlm_ls *ls = arg;
 
-	ls = dlm_find_lockspace_local(arg);
+	ls = dlm_get_lockspace(arg);
 	if (!ls) {
 		log_print("dlm_recoverd: no lockspace %p", arg);
 		return -1;
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index f6635a5314f4..73d7fa768f73 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -259,7 +259,7 @@ static int device_user_lock(struct dlm_user_proc *proc,
 	uint32_t lkid;
 	int error = -ENOMEM;
 
-	ls = dlm_find_lockspace_local(proc->lockspace);
+	ls = dlm_get_lockspace(proc->lockspace);
 	if (!ls)
 		return -ENOENT;
 
@@ -309,7 +309,7 @@ static int device_user_unlock(struct dlm_user_proc *proc,
 	struct dlm_user_args *ua;
 	int error = -ENOMEM;
 
-	ls = dlm_find_lockspace_local(proc->lockspace);
+	ls = dlm_get_lockspace(proc->lockspace);
 	if (!ls)
 		return -ENOENT;
 
@@ -337,7 +337,7 @@ static int device_user_deadlock(struct dlm_user_proc *proc,
 	struct dlm_ls *ls;
 	int error;
 
-	ls = dlm_find_lockspace_local(proc->lockspace);
+	ls = dlm_get_lockspace(proc->lockspace);
 	if (!ls)
 		return -ENOENT;
 
@@ -398,7 +398,7 @@ static int device_user_purge(struct dlm_user_proc *proc,
 	struct dlm_ls *ls;
 	int error;
 
-	ls = dlm_find_lockspace_local(proc->lockspace);
+	ls = dlm_get_lockspace(proc->lockspace);
 	if (!ls)
 		return -ENOENT;
 
@@ -423,7 +423,7 @@ static int device_create_lockspace(struct dlm_lspace_params *params)
 	if (error)
 		return error;
 
-	ls = dlm_find_lockspace_local(lockspace);
+	ls = dlm_get_lockspace(lockspace);
 	if (!ls)
 		return -ENOENT;
 
@@ -674,7 +674,7 @@ static int device_close(struct inode *inode, struct file *file)
 	struct dlm_user_proc *proc = file->private_data;
 	struct dlm_ls *ls;
 
-	ls = dlm_find_lockspace_local(proc->lockspace);
+	ls = dlm_get_lockspace(proc->lockspace);
 	if (!ls)
 		return -ENOENT;
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 06/11] dlm: drop own rsb pre allocation mechanism
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
                   ` (3 preceding siblings ...)
  2024-05-28 21:12 ` [PATCH v6.10-rc1 05/11] dlm: rename dlm_find_lockspace_local to dlm_get_lockspace Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 07/11] dlm: using rcu to avoid rsb lookup again Alexander Aring
                   ` (4 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch drops the own written rsb pre allocation mechanism as this is
already done by using kmem caches, we don't need another layer on top of
that to running some pre allocation scheme.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  9 +----
 fs/dlm/lock.c         | 92 ++++++-------------------------------------
 fs/dlm/lockspace.c    | 11 ------
 3 files changed, 13 insertions(+), 99 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index e93ed8f7addd..61dc58bdd006 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -322,10 +322,7 @@ struct dlm_rsb {
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
-	union {
-		struct list_head	res_hashchain;
-		struct rhash_head	res_node; /* rsbtbl */
-	};
+	struct rhash_head	res_node;	/* rsbtbl */
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
@@ -596,10 +593,6 @@ struct dlm_ls {
 	spinlock_t		ls_orphans_lock;
 	struct list_head	ls_orphans;
 
-	spinlock_t		ls_new_rsb_spin;
-	int			ls_new_rsb_count;
-	struct list_head	ls_new_rsb;	/* new rsb structs */
-
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 205d6a20fcee..818e4b282b14 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -389,38 +389,6 @@ void dlm_put_rsb(struct dlm_rsb *r)
 	put_rsb(r);
 }
 
-static int pre_rsb_struct(struct dlm_ls *ls)
-{
-	struct dlm_rsb *r1, *r2;
-	int count = 0;
-
-	spin_lock_bh(&ls->ls_new_rsb_spin);
-	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
-		spin_unlock_bh(&ls->ls_new_rsb_spin);
-		return 0;
-	}
-	spin_unlock_bh(&ls->ls_new_rsb_spin);
-
-	r1 = dlm_allocate_rsb(ls);
-	r2 = dlm_allocate_rsb(ls);
-
-	spin_lock_bh(&ls->ls_new_rsb_spin);
-	if (r1) {
-		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
-		ls->ls_new_rsb_count++;
-	}
-	if (r2) {
-		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
-		ls->ls_new_rsb_count++;
-	}
-	count = ls->ls_new_rsb_count;
-	spin_unlock_bh(&ls->ls_new_rsb_spin);
-
-	if (!count)
-		return -ENOMEM;
-	return 0;
-}
-
 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
  * new timers when recovery is triggered and don't run them
  * again until a dlm_timer_resume() tries it again.
@@ -652,22 +620,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 			  struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r;
-	int count;
 
-	spin_lock_bh(&ls->ls_new_rsb_spin);
-	if (list_empty(&ls->ls_new_rsb)) {
-		count = ls->ls_new_rsb_count;
-		spin_unlock_bh(&ls->ls_new_rsb_spin);
-		log_debug(ls, "find_rsb retry %d %d %s",
-			  count, dlm_config.ci_new_rsb_count,
-			  (const char *)name);
-		return -EAGAIN;
-	}
-
-	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
-	list_del(&r->res_hashchain);
-	ls->ls_new_rsb_count--;
-	spin_unlock_bh(&ls->ls_new_rsb_spin);
+	r = dlm_allocate_rsb(ls);
+	if (!r)
+		return -ENOMEM;
 
 	r->res_ls = ls;
 	r->res_length = len;
@@ -792,13 +748,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	}
 
  retry:
-	if (create) {
-		error = pre_rsb_struct(ls);
-		if (error < 0)
-			goto out;
-	}
-
- retry_lookup:
 
 	/* check if the rsb is in keep state under read lock - likely path */
 	read_lock_bh(&ls->ls_rsbtbl_lock);
@@ -832,7 +781,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 	if (!error) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
-			goto retry_lookup;
+			goto retry;
 		}
 	} else {
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -898,9 +847,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		goto out;
 
 	error = get_rsb_struct(ls, name, len, &r);
-	if (error == -EAGAIN)
-		goto retry;
-	if (error)
+	if (WARN_ON_ONCE(error))
 		goto out;
 
 	r->res_hash = hash;
@@ -952,7 +899,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		 */
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
-		goto retry_lookup;
+		goto retry;
 	} else if (!error) {
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
 	}
@@ -976,11 +923,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	int error;
 
  retry:
-	error = pre_rsb_struct(ls);
-	if (error < 0)
-		goto out;
-
- retry_lookup:
 
 	/* check if the rsb is in keep state under read lock - likely path */
 	read_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1015,7 +957,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	if (!error) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
-			goto retry_lookup;
+			goto retry;
 		}
 	} else {
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1070,10 +1012,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	 */
 
 	error = get_rsb_struct(ls, name, len, &r);
-	if (error == -EAGAIN) {
-		goto retry;
-	}
-	if (error)
+	if (WARN_ON_ONCE(error))
 		goto out;
 
 	r->res_hash = hash;
@@ -1090,7 +1029,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 		 */
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
-		goto retry_lookup;
+		goto retry;
 	} else if (!error) {
 		list_add(&r->res_rsbs_list, &ls->ls_keep);
 	}
@@ -1304,11 +1243,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	}
 
  retry:
-	error = pre_rsb_struct(ls);
-	if (error < 0)
-		return error;
-
- retry_lookup:
 
 	/* check if the rsb is in keep state under read lock - likely path */
 	read_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1354,7 +1288,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 			/* something as changed, very unlikely but
 			 * try again
 			 */
-			goto retry_lookup;
+			goto retry;
 		}
 	} else {
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1376,9 +1310,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
  not_found:
 	error = get_rsb_struct(ls, name, len, &r);
-	if (error == -EAGAIN)
-		goto retry;
-	if (error)
+	if (WARN_ON_ONCE(error))
 		goto out;
 
 	r->res_hash = hash;
@@ -1395,7 +1327,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		 */
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		dlm_free_rsb(r);
-		goto retry_lookup;
+		goto retry;
 	} else if (error) {
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
 		/* should never happen */
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index d25a09d6c710..ecb9b5fcad8e 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -428,9 +428,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	INIT_LIST_HEAD(&ls->ls_orphans);
 	spin_lock_init(&ls->ls_orphans_lock);
 
-	INIT_LIST_HEAD(&ls->ls_new_rsb);
-	spin_lock_init(&ls->ls_new_rsb_spin);
-
 	INIT_LIST_HEAD(&ls->ls_nodes);
 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
 	ls->ls_num_nodes = 0;
@@ -688,7 +685,6 @@ static void rhash_free_rsb(void *ptr, void *arg)
 
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
-	struct dlm_rsb *rsb;
 	int busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -756,13 +752,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
 
-	while (!list_empty(&ls->ls_new_rsb)) {
-		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
-				       res_hashchain);
-		list_del(&rsb->res_hashchain);
-		dlm_free_rsb(rsb);
-	}
-
 	/*
 	 * Free structures on any other lists
 	 */
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 07/11] dlm: using rcu to avoid rsb lookup again
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
                   ` (4 preceding siblings ...)
  2024-05-28 21:12 ` [PATCH v6.10-rc1 06/11] dlm: drop own rsb pre allocation mechanism Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 08/11] dlm: move lkb idr to xarray datastructure Alexander Aring
                   ` (3 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts the rsb to be protected under rcu. When the rcu lock
is held it cannot be freed. In combination with the new introduced flag
RSB_HASHED we can check if the rsb is still part of the ls_rsbtbl
hashtable, it cannot be part of another table. If its still part of the
ls_rsbtbl we can avoid a second dlm_search_rsb_tree() call.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  2 ++
 fs/dlm/lock.c         | 64 ++++++++++++++++++++++++++++++-------------
 fs/dlm/memory.c       |  9 +++++-
 3 files changed, 55 insertions(+), 20 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 61dc58bdd006..5acdc6c2b4b8 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -333,6 +333,7 @@ struct dlm_rsb {
 	struct list_head	res_recover_list;   /* used for recovery */
 	struct list_head	res_toss_q_list;
 	int			res_recover_locks_count;
+	struct rcu_head		rcu;
 
 	char			*res_lvbptr;
 	char			res_name[DLM_RESNAME_MAXLEN+1];
@@ -366,6 +367,7 @@ enum rsb_flags {
 	RSB_RECOVER_GRANT,
 	RSB_RECOVER_LVB_INVAL,
 	RSB_TOSS,
+	RSB_HASHED,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 818e4b282b14..69a25ec82d1e 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -582,6 +582,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
 		list_del(&r->res_rsbs_list);
 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 				       dlm_rhash_rsb_params);
+		rsb_clear_flag(r, RSB_HASHED);
 
 		/* not necessary to held the ls_rsbtbl_lock when
 		 * calling send_remove()
@@ -658,8 +659,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
 
 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-	return rhashtable_insert_fast(rhash, &rsb->res_node,
-				      dlm_rhash_rsb_params);
+	int rv;
+
+	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+				    dlm_rhash_rsb_params);
+	if (!rv)
+		rsb_set_flag(rsb, RSB_HASHED);
+
+	return rv;
 }
 
 /*
@@ -773,12 +780,11 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
  do_toss:
 	write_lock_bh(&ls->ls_rsbtbl_lock);
-
-	/* retry lookup under write lock to see if its still in toss state
-	 * if not it's in keep state and we relookup - unlikely path.
+	/* retry lookup under write lock to see if its still hashed and in
+	 * toss state if not it's in keep state and we relookup - unlikely
+	 * path.
 	 */
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (!error) {
+	if (rsb_flag(r, RSB_HASHED)) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			goto retry;
@@ -949,12 +955,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
  do_toss:
 	write_lock_bh(&ls->ls_rsbtbl_lock);
-
-	/* retry lookup under write lock to see if its still in toss state
-	 * if not it's in keep state and we relookup - unlikely path.
+	/* retry lookup under write lock to see if its still hashed and in
+	 * toss state if not it's in keep state and we relookup - unlikely
+	 * path.
 	 */
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (!error) {
+	if (rsb_flag(r, RSB_HASHED)) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			goto retry;
@@ -1046,6 +1051,7 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
 {
 	int dir_nodeid;
 	uint32_t hash;
+	int rv;
 
 	if (len > DLM_RESNAME_MAXLEN)
 		return -EINVAL;
@@ -1053,12 +1059,20 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
 	hash = jhash(name, len, 0);
 	dir_nodeid = dlm_hash2nodeid(ls, hash);
 
+	/* hold the rcu lock here to prevent freeing of the rsb
+	 * while looking it up, there are currently a optimization
+	 * to not lookup the rsb twice instead look if its still
+	 * part of the rsbtbl hash.
+	 */
+	rcu_read_lock();
 	if (dlm_no_directory(ls))
-		return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
-				      from_nodeid, flags, r_ret);
-	else
-		return find_rsb_dir(ls, name, len, hash, dir_nodeid,
+		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
 				    from_nodeid, flags, r_ret);
+	else
+		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
+				  from_nodeid, flags, r_ret);
+	rcu_read_unlock();
+	return rv;
 }
 
 /* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1244,6 +1258,12 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
  retry:
 
+	/* the rcu lock here is combined to RSB_HASHED that the rsb did not
+	 * got freed during the read to write ls_rsbtbl_lock lock state change.
+	 * We can then simply check if the rsb is still hashed instead of doing
+	 * a lookup again.
+	 */
+	rcu_read_lock();
 	/* check if the rsb is in keep state under read lock - likely path */
 	read_lock_bh(&ls->ls_rsbtbl_lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
@@ -1267,10 +1287,12 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		/* the rsb was active */
 		unlock_rsb(r);
 		put_rsb(r);
+		rcu_read_unlock();
 
 		return 0;
 	} else {
 		read_unlock_bh(&ls->ls_rsbtbl_lock);
+		rcu_read_unlock();
 		goto not_found;
 	}
 
@@ -1279,19 +1301,22 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	write_lock_bh(&ls->ls_rsbtbl_lock);
 
 	/* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
-	 * check if the rsb is still in toss state, if not relookup
+	 * check if the rsb is still hashed and in toss state, if not relookup
 	 */
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-	if (!error) {
+	if (rsb_flag(r, RSB_HASHED)) {
 		if (!rsb_flag(r, RSB_TOSS)) {
 			write_unlock_bh(&ls->ls_rsbtbl_lock);
 			/* something as changed, very unlikely but
 			 * try again
 			 */
+			rcu_read_unlock();
 			goto retry;
 		}
+
+		rcu_read_unlock();
 	} else {
 		write_unlock_bh(&ls->ls_rsbtbl_lock);
+		rcu_read_unlock();
 		goto not_found;
 	}
 
@@ -4306,6 +4331,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	list_del(&r->res_rsbs_list);
 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
 			       dlm_rhash_rsb_params);
+	rsb_clear_flag(r, RSB_HASHED);
 	write_unlock_bh(&ls->ls_rsbtbl_lock);
 
 	free_toss_rsb(r);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 15a8b1cee433..a7b994517bd8 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -101,13 +101,20 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 	return r;
 }
 
-void dlm_free_rsb(struct dlm_rsb *r)
+static void __dlm_free_rsb(struct rcu_head *rcu)
 {
+	struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu);
+
 	if (r->res_lvbptr)
 		dlm_free_lvb(r->res_lvbptr);
 	kmem_cache_free(rsb_cache, r);
 }
 
+void dlm_free_rsb(struct dlm_rsb *r)
+{
+	call_rcu(&r->rcu, __dlm_free_rsb);
+}
+
 struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 08/11] dlm: move lkb idr to xarray datastructure
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
                   ` (5 preceding siblings ...)
  2024-05-28 21:12 ` [PATCH v6.10-rc1 07/11] dlm: using rcu to avoid rsb lookup again Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 09/11] dlm: move recover " Alexander Aring
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

According to kernel doc idr is deprecated and xarrays should be used
nowadays. This patch is moving the lkb idr implementation to xarrays.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  5 ++--
 fs/dlm/lock.c         | 30 ++++++++++++----------
 fs/dlm/lockspace.c    | 60 +++++++++++++++++++++----------------------
 3 files changed, 49 insertions(+), 46 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 5acdc6c2b4b8..3a62675bf52e 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -37,6 +37,7 @@
 #include <linux/rhashtable.h>
 #include <linux/mutex.h>
 #include <linux/idr.h>
+#include <linux/xarray.h>
 #include <linux/ratelimit.h>
 #include <linux/uaccess.h>
 
@@ -571,8 +572,8 @@ struct dlm_ls {
 	unsigned long		ls_flags;	/* LSFL_ */
 	struct kobject		ls_kobj;
 
-	struct idr		ls_lkbidr;
-	rwlock_t		ls_lkbidr_lock;
+	struct xarray		ls_lkbxa;
+	rwlock_t		ls_lkbxa_lock;
 
 	struct rhashtable	ls_rsbtbl;
 	rwlock_t		ls_rsbtbl_lock;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 69a25ec82d1e..e176cf48011a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1460,11 +1460,15 @@ static void detach_lkb(struct dlm_lkb *lkb)
 }
 
 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
-		       int start, int end)
+		       unsigned long start, unsigned long end)
 {
+	struct xa_limit limit;
 	struct dlm_lkb *lkb;
 	int rv;
 
+	limit.max = end;
+	limit.min = start;
+
 	lkb = dlm_allocate_lkb(ls);
 	if (!lkb)
 		return -ENOMEM;
@@ -1478,14 +1482,12 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 
-	write_lock_bh(&ls->ls_lkbidr_lock);
-	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
-	if (rv >= 0)
-		lkb->lkb_id = rv;
-	write_unlock_bh(&ls->ls_lkbidr_lock);
+	write_lock_bh(&ls->ls_lkbxa_lock);
+	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
+	write_unlock_bh(&ls->ls_lkbxa_lock);
 
 	if (rv < 0) {
-		log_error(ls, "create_lkb idr error %d", rv);
+		log_error(ls, "create_lkb xa error %d", rv);
 		dlm_free_lkb(lkb);
 		return rv;
 	}
@@ -1496,18 +1498,18 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 
 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 {
-	return _create_lkb(ls, lkb_ret, 1, 0);
+	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
 }
 
 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 {
 	struct dlm_lkb *lkb;
 
-	read_lock_bh(&ls->ls_lkbidr_lock);
-	lkb = idr_find(&ls->ls_lkbidr, lkid);
+	read_lock_bh(&ls->ls_lkbxa_lock);
+	lkb = xa_load(&ls->ls_lkbxa, lkid);
 	if (lkb)
 		kref_get(&lkb->lkb_ref);
-	read_unlock_bh(&ls->ls_lkbidr_lock);
+	read_unlock_bh(&ls->ls_lkbxa_lock);
 
 	*lkb_ret = lkb;
 	return lkb ? 0 : -ENOENT;
@@ -1532,10 +1534,10 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 	int rv;
 
 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
-					&ls->ls_lkbidr_lock);
+					&ls->ls_lkbxa_lock);
 	if (rv) {
-		idr_remove(&ls->ls_lkbidr, lkid);
-		write_unlock_bh(&ls->ls_lkbidr_lock);
+		xa_erase(&ls->ls_lkbxa, lkid);
+		write_unlock_bh(&ls->ls_lkbxa_lock);
 
 		detach_lkb(lkb);
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index ecb9b5fcad8e..a1aefe804727 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -420,8 +420,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	if (error)
 		goto out_lsfree;
 
-	idr_init(&ls->ls_lkbidr);
-	rwlock_init(&ls->ls_lkbidr_lock);
+	xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
+	rwlock_init(&ls->ls_lkbxa_lock);
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
 	spin_lock_init(&ls->ls_waiters_lock);
@@ -471,7 +471,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
 	if (!ls->ls_recover_buf) {
 		error = -ENOMEM;
-		goto out_lkbidr;
+		goto out_lkbxa;
 	}
 
 	ls->ls_slot = 0;
@@ -572,8 +572,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	spin_unlock_bh(&lslist_lock);
 	idr_destroy(&ls->ls_recover_idr);
 	kfree(ls->ls_recover_buf);
- out_lkbidr:
-	idr_destroy(&ls->ls_lkbidr);
+ out_lkbxa:
+	xa_destroy(&ls->ls_lkbxa);
 	rhashtable_destroy(&ls->ls_rsbtbl);
  out_lsfree:
 	if (do_unreg)
@@ -633,22 +633,8 @@ int dlm_new_user_lockspace(const char *name, const char *cluster,
 				   ops_arg, ops_result, lockspace);
 }
 
-static int lkb_idr_is_local(int id, void *p, void *data)
+static int lkb_idr_free(struct dlm_lkb *lkb)
 {
-	struct dlm_lkb *lkb = p;
-
-	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
-}
-
-static int lkb_idr_is_any(int id, void *p, void *data)
-{
-	return 1;
-}
-
-static int lkb_idr_free(int id, void *p, void *data)
-{
-	struct dlm_lkb *lkb = p;
-
 	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
 		dlm_free_lvb(lkb->lkb_lvbptr);
 
@@ -656,23 +642,34 @@ static int lkb_idr_free(int id, void *p, void *data)
 	return 0;
 }
 
-/* NOTE: We check the lkbidr here rather than the resource table.
+/* NOTE: We check the lkbxa here rather than the resource table.
    This is because there may be LKBs queued as ASTs that have been unlinked
    from their RSBs and are pending deletion once the AST has been delivered */
 
 static int lockspace_busy(struct dlm_ls *ls, int force)
 {
-	int rv;
+	struct dlm_lkb *lkb;
+	unsigned long id;
+	int rv = 0;
 
-	read_lock_bh(&ls->ls_lkbidr_lock);
+	read_lock_bh(&ls->ls_lkbxa_lock);
 	if (force == 0) {
-		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
+		xa_for_each(&ls->ls_lkbxa, id, lkb) {
+			rv = 1;
+			break;
+		}
 	} else if (force == 1) {
-		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
+		xa_for_each(&ls->ls_lkbxa, id, lkb) {
+			if (lkb->lkb_nodeid == 0 &&
+			    lkb->lkb_grmode != DLM_LOCK_IV) {
+				rv = 1;
+				break;
+			}
+		}
 	} else {
 		rv = 0;
 	}
-	read_unlock_bh(&ls->ls_lkbidr_lock);
+	read_unlock_bh(&ls->ls_lkbxa_lock);
 	return rv;
 }
 
@@ -685,6 +682,8 @@ static void rhash_free_rsb(void *ptr, void *arg)
 
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
+	struct dlm_lkb *lkb;
+	unsigned long id;
 	int busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -741,11 +740,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	kfree(ls->ls_recover_buf);
 
 	/*
-	 * Free all lkb's in idr
+	 * Free all lkb's in xa
 	 */
-
-	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
-	idr_destroy(&ls->ls_lkbidr);
+	xa_for_each(&ls->ls_lkbxa, id, lkb) {
+		lkb_idr_free(lkb);
+	}
+	xa_destroy(&ls->ls_lkbxa);
 
 	/*
 	 * Free all rsb's on rsbtbl
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 09/11] dlm: move recover idr to xarray datastructure
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
                   ` (6 preceding siblings ...)
  2024-05-28 21:12 ` [PATCH v6.10-rc1 08/11] dlm: move lkb idr to xarray datastructure Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 10/11] dlm: merge nodeid field to master_nodeid Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 11/11] dlm: use is_master() where it's prossible Alexander Aring
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

According to kdoc idr is deprecated and xarrays should be used nowadays.
This patch is moving the recover idr implementation to xarray
datastructure.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  7 +++--
 fs/dlm/lockspace.c    |  8 +++---
 fs/dlm/recover.c      | 61 +++++++++++++++++++++++--------------------
 3 files changed, 40 insertions(+), 36 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3a62675bf52e..78960d914f68 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -36,7 +36,6 @@
 #include <linux/miscdevice.h>
 #include <linux/rhashtable.h>
 #include <linux/mutex.h>
-#include <linux/idr.h>
 #include <linux/xarray.h>
 #include <linux/ratelimit.h>
 #include <linux/uaccess.h>
@@ -317,7 +316,7 @@ struct dlm_rsb {
 	int			res_nodeid;
 	int			res_master_nodeid;
 	int			res_dir_nodeid;
-	int			res_id;		/* for ls_recover_idr */
+	unsigned long		res_id;		/* for ls_recover_xa */
 	uint32_t                res_lvbseq;
 	uint32_t		res_hash;
 	unsigned long		res_toss_time;
@@ -651,8 +650,8 @@ struct dlm_ls {
 	struct list_head	ls_recover_list;
 	spinlock_t		ls_recover_list_lock;
 	int			ls_recover_list_count;
-	struct idr		ls_recover_idr;
-	spinlock_t		ls_recover_idr_lock;
+	struct xarray		ls_recover_xa;
+	spinlock_t		ls_recover_xa_lock;
 	wait_queue_head_t	ls_wait_general;
 	wait_queue_head_t	ls_recover_lock_wait;
 	spinlock_t		ls_clear_proc_locks;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1aefe804727..9c3118a698d6 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -481,8 +481,8 @@ static int new_lockspace(const char *name, const char *cluster,
 
 	INIT_LIST_HEAD(&ls->ls_recover_list);
 	spin_lock_init(&ls->ls_recover_list_lock);
-	idr_init(&ls->ls_recover_idr);
-	spin_lock_init(&ls->ls_recover_idr_lock);
+	xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
+	spin_lock_init(&ls->ls_recover_xa_lock);
 	ls->ls_recover_list_count = 0;
 	init_waitqueue_head(&ls->ls_wait_general);
 	INIT_LIST_HEAD(&ls->ls_masters_list);
@@ -570,7 +570,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	spin_lock_bh(&lslist_lock);
 	list_del(&ls->ls_list);
 	spin_unlock_bh(&lslist_lock);
-	idr_destroy(&ls->ls_recover_idr);
+	xa_destroy(&ls->ls_recover_xa);
 	kfree(ls->ls_recover_buf);
  out_lkbxa:
 	xa_destroy(&ls->ls_lkbxa);
@@ -736,7 +736,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	dlm_delete_debug_file(ls);
 
-	idr_destroy(&ls->ls_recover_idr);
+	xa_destroy(&ls->ls_recover_xa);
 	kfree(ls->ls_recover_buf);
 
 	/*
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index f493d5f30c58..d156196b9e69 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -293,73 +293,78 @@ static void recover_list_clear(struct dlm_ls *ls)
 	spin_unlock_bh(&ls->ls_recover_list_lock);
 }
 
-static int recover_idr_empty(struct dlm_ls *ls)
+static int recover_xa_empty(struct dlm_ls *ls)
 {
 	int empty = 1;
 
-	spin_lock_bh(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_xa_lock);
 	if (ls->ls_recover_list_count)
 		empty = 0;
-	spin_unlock_bh(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_xa_lock);
 
 	return empty;
 }
 
-static int recover_idr_add(struct dlm_rsb *r)
+static int recover_xa_add(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
+	struct xa_limit limit = {
+		.min = 1,
+		.max = UINT_MAX,
+	};
+	uint32_t id;
 	int rv;
 
-	spin_lock_bh(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_xa_lock);
 	if (r->res_id) {
 		rv = -1;
 		goto out_unlock;
 	}
-	rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT);
+	rv = xa_alloc(&ls->ls_recover_xa, &id, r, limit, GFP_ATOMIC);
 	if (rv < 0)
 		goto out_unlock;
 
-	r->res_id = rv;
+	r->res_id = id;
 	ls->ls_recover_list_count++;
 	dlm_hold_rsb(r);
 	rv = 0;
 out_unlock:
-	spin_unlock_bh(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_xa_lock);
 	return rv;
 }
 
-static void recover_idr_del(struct dlm_rsb *r)
+static void recover_xa_del(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
 
-	spin_lock_bh(&ls->ls_recover_idr_lock);
-	idr_remove(&ls->ls_recover_idr, r->res_id);
+	spin_lock_bh(&ls->ls_recover_xa_lock);
+	xa_erase_bh(&ls->ls_recover_xa, r->res_id);
 	r->res_id = 0;
 	ls->ls_recover_list_count--;
-	spin_unlock_bh(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_xa_lock);
 
 	dlm_put_rsb(r);
 }
 
-static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
+static struct dlm_rsb *recover_xa_find(struct dlm_ls *ls, uint64_t id)
 {
 	struct dlm_rsb *r;
 
-	spin_lock_bh(&ls->ls_recover_idr_lock);
-	r = idr_find(&ls->ls_recover_idr, (int)id);
-	spin_unlock_bh(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_xa_lock);
+	r = xa_load(&ls->ls_recover_xa, (int)id);
+	spin_unlock_bh(&ls->ls_recover_xa_lock);
 	return r;
 }
 
-static void recover_idr_clear(struct dlm_ls *ls)
+static void recover_xa_clear(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
-	int id;
+	unsigned long id;
 
-	spin_lock_bh(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_xa_lock);
 
-	idr_for_each_entry(&ls->ls_recover_idr, r, id) {
-		idr_remove(&ls->ls_recover_idr, id);
+	xa_for_each(&ls->ls_recover_xa, id, r) {
+		xa_erase_bh(&ls->ls_recover_xa, id);
 		r->res_id = 0;
 		r->res_recover_locks_count = 0;
 		ls->ls_recover_list_count--;
@@ -372,7 +377,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
 			  ls->ls_recover_list_count);
 		ls->ls_recover_list_count = 0;
 	}
-	spin_unlock_bh(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_xa_lock);
 }
 
 
@@ -470,7 +475,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
 		set_new_master(r);
 		error = 0;
 	} else {
-		recover_idr_add(r);
+		recover_xa_add(r);
 		error = dlm_send_rcom_lookup(r, dir_nodeid, seq);
 	}
 
@@ -551,10 +556,10 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
 
 	log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
 
-	error = dlm_wait_function(ls, &recover_idr_empty);
+	error = dlm_wait_function(ls, &recover_xa_empty);
  out:
 	if (error)
-		recover_idr_clear(ls);
+		recover_xa_clear(ls);
 	return error;
 }
 
@@ -563,7 +568,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
 	struct dlm_rsb *r;
 	int ret_nodeid, new_master;
 
-	r = recover_idr_find(ls, le64_to_cpu(rc->rc_id));
+	r = recover_xa_find(ls, le64_to_cpu(rc->rc_id));
 	if (!r) {
 		log_error(ls, "dlm_recover_master_reply no id %llx",
 			  (unsigned long long)le64_to_cpu(rc->rc_id));
@@ -582,9 +587,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
 	r->res_nodeid = new_master;
 	set_new_master(r);
 	unlock_rsb(r);
-	recover_idr_del(r);
+	recover_xa_del(r);
 
-	if (recover_idr_empty(ls))
+	if (recover_xa_empty(ls))
 		wake_up(&ls->ls_wait_general);
  out:
 	return 0;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 10/11] dlm: merge nodeid field to master_nodeid
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
                   ` (7 preceding siblings ...)
  2024-05-28 21:12 ` [PATCH v6.10-rc1 09/11] dlm: move recover " Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  2024-05-28 21:12 ` [PATCH v6.10-rc1 11/11] dlm: use is_master() where it's prossible Alexander Aring
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

There is a TODO in dlm_internal.h for the dlm_rsb struct:

"remove res_nodeid and only use res_master_nodeid"

The res_nodeid and res_master_nodeid should represent the same
information in a different numbers mapping. This patch tries to work on
this TODO by removing res_nodeid and use res_master_nodeid only. The
same applies for struct lkb with it's lkb_nodeid and lkb_master_nodeid
fields. It use the new representation for those numbers and introduce
backwardscompability because those representation are used for debugfs
and the DLM protocol. In future we might use the new number
representation for res_master_nodeid only and remove the copied value in
struct lkb.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |  33 +++++-----
 fs/dlm/dlm_internal.h |  33 ++++++++--
 fs/dlm/lock.c         | 144 ++++++++++++++++++------------------------
 fs/dlm/lock.h         |   5 +-
 fs/dlm/lockspace.c    |   2 +-
 fs/dlm/rcom.c         |   4 +-
 fs/dlm/recover.c      |  20 ++----
 fs/dlm/recoverd.c     |   2 +-
 8 files changed, 119 insertions(+), 124 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 6ab3ed4074c6..40eb1696f3b6 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -58,9 +58,10 @@ static void print_format1_lock(struct seq_file *s, struct dlm_lkb *lkb,
 	    lkb->lkb_status == DLM_LKSTS_WAITING)
 		seq_printf(s, " (%s)", print_lockmode(lkb->lkb_rqmode));
 
-	if (lkb->lkb_nodeid) {
-		if (lkb->lkb_nodeid != res->res_nodeid)
-			seq_printf(s, " Remote: %3d %08x", lkb->lkb_nodeid,
+	if (lkb->lkb_master_nodeid != dlm_our_nodeid()) {
+		if (lkb->lkb_master_nodeid != res->res_master_nodeid)
+			seq_printf(s, " Remote: %3d %08x",
+				   dlm_res_nodeid(lkb->lkb_master_nodeid),
 				   lkb->lkb_remid);
 		else
 			seq_printf(s, " Master:     %08x", lkb->lkb_remid);
@@ -88,16 +89,15 @@ static void print_format1(struct dlm_rsb *res, struct seq_file *s)
 			seq_printf(s, "%c", '.');
 	}
 
-	if (res->res_nodeid > 0)
-		seq_printf(s, "\"\nLocal Copy, Master is node %d\n",
-			   res->res_nodeid);
-	else if (res->res_nodeid == 0)
-		seq_puts(s, "\"\nMaster Copy\n");
-	else if (res->res_nodeid == -1)
+
+	if (res->res_master_nodeid == 0)
 		seq_printf(s, "\"\nLooking up master (lkid %x)\n",
 			   res->res_first_lkid);
+	else if (res->res_master_nodeid == dlm_our_nodeid())
+		seq_puts(s, "\"\nMaster Copy\n");
 	else
-		seq_printf(s, "\"\nInvalid master %d\n", res->res_nodeid);
+		seq_printf(s, "\"\nLocal Copy, Master is node %d\n",
+			   res->res_master_nodeid);
 	if (seq_has_overflowed(s))
 		goto out;
 
@@ -184,7 +184,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
 
 	seq_printf(s, "%x %d %x %u %llu %x %x %d %d %d %llu %u %d \"%s\"\n",
 		   lkb->lkb_id,
-		   lkb->lkb_nodeid,
+		   dlm_res_nodeid(lkb->lkb_master_nodeid),
 		   lkb->lkb_remid,
 		   lkb->lkb_ownpid,
 		   (unsigned long long)xid,
@@ -194,7 +194,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
 		   lkb->lkb_grmode,
 		   lkb->lkb_rqmode,
 		   (unsigned long long)us,
-		   r->res_nodeid,
+		   dlm_res_nodeid(r->res_master_nodeid),
 		   r->res_length,
 		   r->res_name);
 }
@@ -238,7 +238,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
 
 	seq_printf(s, "lkb %x %d %x %u %llu %x %x %d %d %d %d %d %d %u %llu %llu\n",
 		   lkb->lkb_id,
-		   lkb->lkb_nodeid,
+		   dlm_res_nodeid(lkb->lkb_master_nodeid),
 		   lkb->lkb_remid,
 		   lkb->lkb_ownpid,
 		   (unsigned long long)xid,
@@ -265,7 +265,7 @@ static void print_format3(struct dlm_rsb *r, struct seq_file *s)
 
 	seq_printf(s, "rsb %p %d %x %lx %d %d %u %d ",
 		   r,
-		   r->res_nodeid,
+		   dlm_res_nodeid(r->res_master_nodeid),
 		   r->res_first_lkid,
 		   r->res_flags,
 		   !list_empty(&r->res_root_list),
@@ -341,7 +341,7 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s)
 
 	seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
 		   r,
-		   r->res_nodeid,
+		   dlm_res_nodeid(r->res_master_nodeid),
 		   r->res_master_nodeid,
 		   r->res_dir_nodeid,
 		   our_nodeid,
@@ -611,7 +611,8 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
 		ret = snprintf(debug_buf + pos, len - pos, "%x %d %d %s\n",
 			       lkb->lkb_id, lkb->lkb_wait_type,
-			       lkb->lkb_nodeid, lkb->lkb_resource->res_name);
+			       dlm_res_nodeid(lkb->lkb_master_nodeid),
+			       lkb->lkb_resource->res_name);
 		if (ret >= len - pos)
 			break;
 		pos += ret;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 78960d914f68..b3b9aa7dbaa4 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -253,7 +253,7 @@ struct dlm_callback {
 struct dlm_lkb {
 	struct dlm_rsb		*lkb_resource;	/* the rsb */
 	struct kref		lkb_ref;
-	int			lkb_nodeid;	/* copied from rsb */
+	unsigned int		lkb_master_nodeid;	/* copied from rsb */
 	int			lkb_ownpid;	/* pid of lock owner */
 	uint32_t		lkb_id;		/* our lock ID */
 	uint32_t		lkb_remid;	/* lock ID on remote partner */
@@ -303,18 +303,41 @@ struct dlm_lkb {
  *
  * res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
  * greater than zero when another nodeid.
- *
- * (TODO: remove res_nodeid and only use res_master_nodeid)
  */
 
+/* For backwards compatibility, see above.
+ * The protocol is still using res_nodeid.
+ */
+static inline int dlm_res_nodeid(unsigned int res_master_nodeid)
+{
+	if (res_master_nodeid == 0)
+		return -1;
+	else if (res_master_nodeid == dlm_our_nodeid())
+		return 0;
+	else
+		return res_master_nodeid;
+}
+
+/* For backwards compatibility, see above.
+ * The protocol is still using res_nodeid.
+ */
+static inline unsigned int dlm_res_master_nodeid(int res_nodeid)
+{
+	if (res_nodeid == -1)
+		return 0;
+	else if (res_nodeid == 0)
+		return dlm_our_nodeid();
+	else
+		return res_nodeid;
+}
+
 struct dlm_rsb {
 	struct dlm_ls		*res_ls;	/* the lockspace */
 	struct kref		res_ref;
 	spinlock_t		res_lock;
 	unsigned long		res_flags;
 	int			res_length;	/* length of rsb name */
-	int			res_nodeid;
-	int			res_master_nodeid;
+	unsigned int		res_master_nodeid;
 	int			res_dir_nodeid;
 	unsigned long		res_id;		/* for ls_recover_xa */
 	uint32_t                res_lvbseq;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e176cf48011a..a044edc2d143 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,21 +160,19 @@ static const int __quecvt_compat_matrix[8][8] = {
 
 void dlm_print_lkb(struct dlm_lkb *lkb)
 {
-	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
+	pr_err("lkb: master_nodeid %d id %x remid %x exflags %x flags %x "
 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
-	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
-	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
-	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
-	       (unsigned long long)lkb->lkb_recover_seq);
+	       lkb->lkb_master_nodeid, lkb->lkb_id, lkb->lkb_remid,
+	       lkb->lkb_exflags, dlm_iflags_val(lkb), lkb->lkb_status,
+	       lkb->lkb_rqmode, lkb->lkb_grmode, lkb->lkb_wait_type,
+	       lkb->lkb_wait_nodeid, (unsigned long long)lkb->lkb_recover_seq);
 }
 
 static void dlm_print_rsb(struct dlm_rsb *r)
 {
-	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
-	       "rlc %d name %s\n",
-	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
-	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
-	       r->res_name);
+	pr_err("rsb: master %d dir %d flags %lx first %x rlc %d name %s\n",
+	       r->res_master_nodeid, r->res_dir_nodeid, r->res_flags,
+	       r->res_first_lkid, r->res_recover_locks_count, r->res_name);
 }
 
 void dlm_dump_rsb(struct dlm_rsb *r)
@@ -243,13 +241,13 @@ static inline int is_granted(struct dlm_lkb *lkb)
 
 static inline int is_remote(struct dlm_rsb *r)
 {
-	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
-	return !!r->res_nodeid;
+	DLM_ASSERT(r->res_master_nodeid != 0, dlm_print_rsb(r););
+	return r->res_master_nodeid != dlm_our_nodeid();
 }
 
 static inline int is_process_copy(struct dlm_lkb *lkb)
 {
-	return lkb->lkb_nodeid &&
+	return lkb->lkb_master_nodeid != dlm_our_nodeid() &&
 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
 }
 
@@ -819,7 +817,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		dlm_print_rsb(r);
 		/* fix it and go on */
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 		r->res_first_lkid = 0;
 	}
@@ -865,7 +862,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
 			  from_nodeid, r->res_name);
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 		goto out_add;
 	}
 
@@ -888,11 +884,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 		/* When we are the dir nodeid, we can set the master
 		   node immediately */
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 	} else {
 		/* set_master will send_lookup to dir_nodeid */
 		r->res_master_nodeid = 0;
-		r->res_nodeid = -1;
 	}
 
  out_add:
@@ -995,7 +989,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
 		dlm_print_rsb(r);
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
 	}
 
 	list_move(&r->res_rsbs_list, &ls->ls_keep);
@@ -1023,7 +1016,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	r->res_hash = hash;
 	r->res_dir_nodeid = dir_nodeid;
 	r->res_master_nodeid = dir_nodeid;
-	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 	kref_init(&r->res_ref);
 
 	write_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1103,7 +1095,8 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
 		return -ENOTBLK;
 	} else {
 		/* our rsb is not master, but the dir nodeid has sent us a
-	   	   request; this could happen with master 0 / res_nodeid -1 */
+		 * request; this could happen with res_master_nodeid 0
+		 */
 
 		if (r->res_master_nodeid) {
 			log_error(ls, "validate master from_dir %d master %d "
@@ -1113,7 +1106,6 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
 		}
 
 		r->res_master_nodeid = dlm_our_nodeid();
-		r->res_nodeid = 0;
 		return 0;
 	}
 }
@@ -1136,11 +1128,10 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		/* Recovery uses this function to set a new master when
 		 * the previous master failed.  Setting NEW_MASTER will
 		 * force dlm_recover_masters to call recover_master on this
-		 * rsb even though the res_nodeid is no longer removed.
+		 * rsb even though the res_master_nodeid is no longer removed.
 		 */
 
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
 		rsb_set_flag(r, RSB_NEW_MASTER);
 
 		if (toss_list) {
@@ -1156,9 +1147,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		 * cycle before recovering this master value
 		 */
 
-		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
+		log_limit(ls, "%s from_master %d master_nodeid %d first %x %s",
 			  __func__, from_nodeid, r->res_master_nodeid,
-			  r->res_nodeid, r->res_first_lkid, r->res_name);
+			  r->res_first_lkid, r->res_name);
 
 		if (r->res_master_nodeid == our_nodeid) {
 			log_error(ls, "from_master %d our_master", from_nodeid);
@@ -1167,7 +1158,6 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		}
 
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
 		rsb_set_flag(r, RSB_NEW_MASTER);
 	}
 
@@ -1179,7 +1169,6 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
 			  from_nodeid, r->res_first_lkid, r->res_name);
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
 	}
 
 	if (!from_master && !fix_master &&
@@ -1341,7 +1330,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	r->res_hash = hash;
 	r->res_dir_nodeid = our_nodeid;
 	r->res_master_nodeid = from_nodeid;
-	r->res_nodeid = from_nodeid;
+	kref_init(&r->res_ref);
 	rsb_set_flag(r, RSB_TOSS);
 
 	write_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1476,7 +1465,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
-	lkb->lkb_nodeid = -1;
 	lkb->lkb_grmode = DLM_LOCK_IV;
 	kref_init(&lkb->lkb_ref);
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
@@ -2438,8 +2426,9 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
 		}
 
 		if (!demoted && is_demoted(lkb)) {
-			log_print("WARN: pending demoted %x node %d %s",
-				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
+			log_print("WARN: pending demoted %x master_node %d %s",
+				  lkb->lkb_id, lkb->lkb_master_nodeid,
+				  r->res_name);
 			demote_restart = 1;
 			continue;
 		}
@@ -2457,7 +2446,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
 				}
 			} else {
 				log_print("WARN: pending deadlock %x node %d %s",
-					  lkb->lkb_id, lkb->lkb_nodeid,
+					  lkb->lkb_id, lkb->lkb_master_nodeid,
 					  r->res_name);
 				dlm_dump_rsb(r);
 			}
@@ -2526,7 +2515,8 @@ static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
 	int cw = 0;
 
 	if (!is_master(r)) {
-		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+		log_print("%s r master_nodeid %d", __func__,
+			  r->res_master_nodeid);
 		dlm_dump_rsb(r);
 		return;
 	}
@@ -2622,7 +2612,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 		r->res_first_lkid = lkb->lkb_id;
-		lkb->lkb_nodeid = r->res_nodeid;
+		lkb->lkb_master_nodeid = r->res_master_nodeid;
 		return 0;
 	}
 
@@ -2631,13 +2621,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 		return 1;
 	}
 
-	if (r->res_master_nodeid == our_nodeid) {
-		lkb->lkb_nodeid = 0;
-		return 0;
-	}
-
 	if (r->res_master_nodeid) {
-		lkb->lkb_nodeid = r->res_master_nodeid;
+		lkb->lkb_master_nodeid = r->res_master_nodeid;
 		return 0;
 	}
 
@@ -2652,8 +2637,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
 			  r->res_name);
 		r->res_master_nodeid = our_nodeid;
-		r->res_nodeid = 0;
-		lkb->lkb_nodeid = 0;
+		lkb->lkb_master_nodeid = our_nodeid;
 		return 0;
 	}
 
@@ -2850,7 +2834,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
    for success */
 
-/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
+/* note: it's valid for lkb_master_nodeid/res_master_nodeid to be 0 when we get here
    because there may be a lookup in progress and it's valid to do
    cancel/unlockf on it */
 
@@ -3531,7 +3515,7 @@ static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
 		      struct dlm_message *ms)
 {
-	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
+	ms->m_nodeid   = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
@@ -3578,7 +3562,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = r->res_nodeid;
+	to_nodeid = r->res_master_nodeid;
 
 	error = add_to_waiters(lkb, mstype, to_nodeid);
 	if (error)
@@ -3642,7 +3626,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = lkb->lkb_nodeid;
+	to_nodeid = lkb->lkb_master_nodeid;
 
 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
 	if (error)
@@ -3663,7 +3647,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = lkb->lkb_nodeid;
+	to_nodeid = lkb->lkb_master_nodeid;
 
 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
 	if (error)
@@ -3733,7 +3717,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	to_nodeid = lkb->lkb_nodeid;
+	to_nodeid = lkb->lkb_master_nodeid;
 
 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 	if (error)
@@ -3849,7 +3833,7 @@ static void fake_astfn(void *astparam)
 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 				const struct dlm_message *ms)
 {
-	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+	lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(ms->m_header.h_nodeid));
 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
 	lkb->lkb_grmode = DLM_LOCK_IV;
@@ -3897,7 +3881,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
 {
 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
-	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+	lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(ms->m_header.h_nodeid));
 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
 }
 
@@ -3922,7 +3906,7 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
 	case cpu_to_le32(DLM_MSG_CONVERT):
 	case cpu_to_le32(DLM_MSG_UNLOCK):
 	case cpu_to_le32(DLM_MSG_CANCEL):
-		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
+		if (!is_master_copy(lkb) || lkb->lkb_master_nodeid != from)
 			error = -EINVAL;
 		break;
 
@@ -3931,14 +3915,14 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
 	case cpu_to_le32(DLM_MSG_GRANT):
 	case cpu_to_le32(DLM_MSG_BAST):
-		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
+		if (!is_process_copy(lkb) || lkb->lkb_master_nodeid != from)
 			error = -EINVAL;
 		break;
 
 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
 		if (!is_process_copy(lkb))
 			error = -EINVAL;
-		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
+		else if (lkb->lkb_master_nodeid != 0 && lkb->lkb_master_nodeid != from)
 			error = -EINVAL;
 		break;
 
@@ -3952,7 +3936,7 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
 			  "ignore invalid message %d from %d %x %x %x %d",
 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
 			  lkb->lkb_remid, dlm_iflags_val(lkb),
-			  lkb->lkb_nodeid);
+			  lkb->lkb_master_nodeid);
 	return error;
 }
 
@@ -4378,8 +4362,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 	   lookup as a request and sent request reply instead of lookup reply */
 	if (mstype == DLM_MSG_LOOKUP) {
 		r->res_master_nodeid = from_nodeid;
-		r->res_nodeid = from_nodeid;
-		lkb->lkb_nodeid = from_nodeid;
+		lkb->lkb_master_nodeid = from_nodeid;
 	}
 
 	/* this is the value returned from do_request() on the master */
@@ -4421,8 +4404,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 		    r->res_master_nodeid != dlm_our_nodeid()) {
 			/* cause _request_lock->set_master->send_lookup */
 			r->res_master_nodeid = 0;
-			r->res_nodeid = -1;
-			lkb->lkb_nodeid = -1;
+			lkb->lkb_master_nodeid = 0;
 		}
 
 		if (is_overlap(lkb)) {
@@ -4696,7 +4678,6 @@ static void receive_lookup_reply(struct dlm_ls *ls,
 
 	if (ret_nodeid == dlm_our_nodeid()) {
 		r->res_master_nodeid = ret_nodeid;
-		r->res_nodeid = 0;
 		do_lookup_list = 1;
 		r->res_first_lkid = 0;
 	} else if (ret_nodeid == -1) {
@@ -4704,12 +4685,10 @@ static void receive_lookup_reply(struct dlm_ls *ls,
 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
 		r->res_master_nodeid = 0;
-		r->res_nodeid = -1;
-		lkb->lkb_nodeid = -1;
+		lkb->lkb_master_nodeid = 0;
 	} else {
-		/* set_master() will set lkb_nodeid from r */
+		/* set_master() will set lkb_master_nodeid from r */
 		r->res_master_nodeid = ret_nodeid;
-		r->res_nodeid = ret_nodeid;
 	}
 
 	if (is_overlap(lkb)) {
@@ -4976,7 +4955,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
 		memset(ms_local, 0, sizeof(struct dlm_message));
 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
-		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+		ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 		_receive_convert_reply(lkb, ms_local, true);
 
 		/* Same special case as in receive_rcom_lock_args() */
@@ -5032,13 +5011,12 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 		   many and they aren't very interesting */
 
 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
-			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
-				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+			log_debug(ls, "waiter %x remote %x msg %d master_nodeid %d lkb_master_nodeid %d wait_nodeid %d dir_nodeid %d",
 				  lkb->lkb_id,
 				  lkb->lkb_remid,
 				  lkb->lkb_wait_type,
-				  lkb->lkb_resource->res_nodeid,
-				  lkb->lkb_nodeid,
+				  lkb->lkb_resource->res_master_nodeid,
+				  lkb->lkb_master_nodeid,
 				  lkb->lkb_wait_nodeid,
 				  dir_nodeid);
 		}
@@ -5095,7 +5073,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			memset(ms_local, 0, sizeof(struct dlm_message));
 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
-			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+			ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 			_receive_unlock_reply(lkb, ms_local, true);
 			dlm_put_lkb(lkb);
 			break;
@@ -5105,7 +5083,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			memset(ms_local, 0, sizeof(struct dlm_message));
 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
-			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+			ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
 			_receive_cancel_reply(lkb, ms_local, true);
 			dlm_put_lkb(lkb);
 			break;
@@ -5201,11 +5179,10 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 					&lkb->lkb_iflags);
 		err = 0;
 
-		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
-			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
-			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
-			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
-			  dlm_dir_nodeid(r), oc, ou);
+		log_debug(ls, "waiter %x remote %x msg %d master_nodeid %d lkb_master_nodeid %d wait_nodeid %d dir_nodeid %d overlap %d %d",
+			  lkb->lkb_id, lkb->lkb_remid, mstype,
+			  r->res_master_nodeid, lkb->lkb_master_nodeid,
+			  lkb->lkb_wait_nodeid, dlm_dir_nodeid(r), oc, ou);
 
 		/*
 		 * No reply to the pre-recovery operation will now be received,
@@ -5278,9 +5255,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		}
 
 		if (err) {
-			log_error(ls, "waiter %x msg %d r_nodeid %d "
+			log_error(ls, "waiter %x msg %d master_nodeid %d "
 				  "dir_nodeid %d overlap %d %d",
-				  lkb->lkb_id, mstype, r->res_nodeid,
+				  lkb->lkb_id, mstype, r->res_master_nodeid,
 				  dlm_dir_nodeid(r), oc, ou);
 		}
 		unlock_rsb(r);
@@ -5333,8 +5310,8 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
 		if (!is_master_copy(lkb))
 			continue;
 
-		if ((lkb->lkb_nodeid == nodeid_gone) ||
-		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
+		if ((lkb->lkb_master_nodeid == nodeid_gone) ||
+		    dlm_is_removed(ls, lkb->lkb_master_nodeid)) {
 
 			/* tell recover_lvb to invalidate the lvb
 			   because a node holding EX/PW failed */
@@ -5471,7 +5448,7 @@ static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
 	struct dlm_lkb *lkb;
 
 	list_for_each_entry(lkb, head, lkb_statequeue) {
-		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
+		if (lkb->lkb_master_nodeid == nodeid && lkb->lkb_remid == remid)
 			return lkb;
 	}
 	return NULL;
@@ -5500,7 +5477,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 {
 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
 
-	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
+	lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(rc->rc_header.h_nodeid));
 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
@@ -6247,7 +6224,8 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
 
 /* debug functionality */
 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
-		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
+		      int lkb_master_nodeid, unsigned int lkb_dflags,
+		      int lkb_status)
 {
 	struct dlm_lksb *lksb;
 	struct dlm_lkb *lkb;
@@ -6269,7 +6247,7 @@ int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
 	}
 
 	dlm_set_dflags_val(lkb, lkb_dflags);
-	lkb->lkb_nodeid = lkb_nodeid;
+	lkb->lkb_master_nodeid = lkb_master_nodeid;
 	lkb->lkb_lksb = lksb;
 	/* user specific pointer, just don't have it NULL for kernel locks */
 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 8de9dee4c058..e59161d2fe84 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -61,13 +61,14 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid);
 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
-		      int lkb_nodeid, unsigned int lkb_flags, int lkb_status);
+		      int lkb_master_nodeid, unsigned int lkb_flags,
+		      int lkb_status);
 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
 				 int mstype, int to_nodeid);
 
 static inline int is_master(struct dlm_rsb *r)
 {
-	return !r->res_nodeid;
+	return r->res_master_nodeid == dlm_our_nodeid();
 }
 
 static inline void lock_rsb(struct dlm_rsb *r)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 9c3118a698d6..e011167a924a 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -660,7 +660,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 		}
 	} else if (force == 1) {
 		xa_for_each(&ls->ls_lkbxa, id, lkb) {
-			if (lkb->lkb_nodeid == 0 &&
+			if (lkb->lkb_master_nodeid == dlm_our_nodeid() &&
 			    lkb->lkb_grmode != DLM_LOCK_IV) {
 				rv = 1;
 				break;
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index be1a71a6303a..b545cd7a23cd 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -455,8 +455,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
 	if (lkb->lkb_lvbptr)
 		len += ls->ls_lvblen;
 
-	error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
-			    seq);
+	error = create_rcom(ls, r->res_master_nodeid, DLM_RCOM_LOCK, len, &rc,
+			    &mh, seq);
 	if (error)
 		goto out;
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index d156196b9e69..d948d28d8f92 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -408,7 +408,7 @@ static void set_lock_master(struct list_head *queue, int nodeid)
 
 	list_for_each_entry(lkb, queue, lkb_statequeue) {
 		if (!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
-			lkb->lkb_nodeid = nodeid;
+			lkb->lkb_master_nodeid = nodeid;
 			lkb->lkb_remid = 0;
 		}
 	}
@@ -416,9 +416,9 @@ static void set_lock_master(struct list_head *queue, int nodeid)
 
 static void set_master_lkbs(struct dlm_rsb *r)
 {
-	set_lock_master(&r->res_grantqueue, r->res_nodeid);
-	set_lock_master(&r->res_convertqueue, r->res_nodeid);
-	set_lock_master(&r->res_waitqueue, r->res_nodeid);
+	set_lock_master(&r->res_grantqueue, r->res_master_nodeid);
+	set_lock_master(&r->res_convertqueue, r->res_master_nodeid);
+	set_lock_master(&r->res_waitqueue, r->res_master_nodeid);
 }
 
 /*
@@ -455,7 +455,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
 	if (is_master(r))
 		return 0;
 
-	is_removed = dlm_is_removed(ls, r->res_nodeid);
+	is_removed = dlm_is_removed(ls, r->res_master_nodeid);
 
 	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
 		return 0;
@@ -464,10 +464,8 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
 	dir_nodeid = dlm_dir_nodeid(r);
 
 	if (dir_nodeid == our_nodeid) {
-		if (is_removed) {
+		if (is_removed)
 			r->res_master_nodeid = our_nodeid;
-			r->res_nodeid = 0;
-		}
 
 		/* set master of lkbs to ourself when is_removed, or to
 		   another new master which we set along with NEW_MASTER
@@ -501,14 +499,9 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
 static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 {
 	int dir_nodeid = dlm_dir_nodeid(r);
-	int new_master = dir_nodeid;
-
-	if (dir_nodeid == dlm_our_nodeid())
-		new_master = 0;
 
 	dlm_purge_mstcpy_locks(r);
 	r->res_master_nodeid = dir_nodeid;
-	r->res_nodeid = new_master;
 	set_new_master(r);
 	(*count)++;
 	return 0;
@@ -584,7 +577,6 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
 
 	lock_rsb(r);
 	r->res_master_nodeid = ret_nodeid;
-	r->res_nodeid = new_master;
 	set_new_master(r);
 	unlock_rsb(r);
 	recover_xa_del(r);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 7e6c2c27d815..aae97bdc05f6 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -34,7 +34,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 
 	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
-		if (r->res_nodeid)
+		if (r->res_master_nodeid != dlm_our_nodeid())
 			continue;
 
 		list_add(&r->res_masters_list, &ls->ls_masters_list);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v6.10-rc1 11/11] dlm: use is_master() where it's prossible
  2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
                   ` (8 preceding siblings ...)
  2024-05-28 21:12 ` [PATCH v6.10-rc1 10/11] dlm: merge nodeid field to master_nodeid Alexander Aring
@ 2024-05-28 21:12 ` Alexander Aring
  9 siblings, 0 replies; 11+ messages in thread
From: Alexander Aring @ 2024-05-28 21:12 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

There are numbers of checks to check if the node itself of a rsb is the
master or not. There is a helper is_master() for it. This patch changes
various places to use this helper.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c     | 9 ++++-----
 fs/dlm/recoverd.c | 2 +-
 2 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index a044edc2d143..14a73948bbd5 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -242,7 +242,7 @@ static inline int is_granted(struct dlm_lkb *lkb)
 static inline int is_remote(struct dlm_rsb *r)
 {
 	DLM_ASSERT(r->res_master_nodeid != 0, dlm_print_rsb(r););
-	return r->res_master_nodeid != dlm_our_nodeid();
+	return !is_master(r);
 }
 
 static inline int is_process_copy(struct dlm_lkb *lkb)
@@ -3978,7 +3978,7 @@ static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
 
 	lock_rsb(r);
 
-	if (r->res_master_nodeid != dlm_our_nodeid()) {
+	if (!is_master(r)) {
 		error = validate_master_nodeid(ls, r, from_nodeid);
 		if (error) {
 			unlock_rsb(r);
@@ -4400,8 +4400,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 			  from_nodeid, result, r->res_master_nodeid,
 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
 
-		if (r->res_dir_nodeid != dlm_our_nodeid() &&
-		    r->res_master_nodeid != dlm_our_nodeid()) {
+		if (r->res_dir_nodeid != dlm_our_nodeid() && !is_master(r)) {
 			/* cause _request_lock->set_master->send_lookup */
 			r->res_master_nodeid = 0;
 			lkb->lkb_master_nodeid = 0;
@@ -4415,7 +4414,7 @@ static int receive_request_reply(struct dlm_ls *ls,
 		} else {
 			_request_lock(r, lkb);
 
-			if (r->res_master_nodeid == dlm_our_nodeid())
+			if (is_master(r))
 				confirm_master(r, 0);
 		}
 		break;
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index aae97bdc05f6..dd41ad444e98 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -34,7 +34,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 
 	read_lock_bh(&ls->ls_rsbtbl_lock);
 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
-		if (r->res_master_nodeid != dlm_our_nodeid())
+		if (!is_master(r))
 			continue;
 
 		list_add(&r->res_masters_list, &ls->ls_masters_list);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2024-05-28 21:12 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 02/11] dlm: don't kref_init rsbs created for toss list Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 03/11] dlm: remove unused parameter in dlm_midcomms_addr Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 04/11] dlm: remove ls_local_handle from struct dlm_ls Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 05/11] dlm: rename dlm_find_lockspace_local to dlm_get_lockspace Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 06/11] dlm: drop own rsb pre allocation mechanism Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 07/11] dlm: using rcu to avoid rsb lookup again Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 08/11] dlm: move lkb idr to xarray datastructure Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 09/11] dlm: move recover " Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 10/11] dlm: merge nodeid field to master_nodeid Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 11/11] dlm: use is_master() where it's prossible Alexander Aring

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).