[PATCH 3/3] rbd: retrieve and check lock owner twice before blocklisting

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



An attempt to acquire exclusive lock can race with the current lock
owner closing the image:

1. lock is held by client123, rbd_lock() returns -EBUSY
2. get_lock_owner_info() returns client123 instance details
3. client123 closes the image, lock is released
4. find_watcher() returns 0 as there is no matching watcher anymore
5. client123 instance gets erroneously blocklisted

Particularly impacted is mirror snapshot scheduler in snapshot-based
mirroring since it happens to open and close images a lot (images are
opened only for as long as it takes to take the next mirror snapshot,
the same client instance is used for all images).

To reduce the potential for erroneous blocklisting, retrieve the lock
owner again after find_watcher() returns 0.  If it's still there, make
sure it matches the previously detected lock owner.

Cc: stable@xxxxxxxxxxxxxxx # 6d1736a0e432: rbd: make get_lock_owner_info() return a single locker or NULL
Cc: stable@xxxxxxxxxxxxxxx # 5dc06bec6a5b: rbd: harden get_lock_owner_info() a bit
Cc: stable@xxxxxxxxxxxxxxx
Signed-off-by: Ilya Dryomov <idryomov@xxxxxxxxx>
---
 drivers/block/rbd.c                  | 16 ++++++++++++++--
 include/linux/ceph/cls_lock_client.h | 10 ++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 94629e826369..e4b5829a03b4 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3969,11 +3969,11 @@ static int find_watcher(struct rbd_device *rbd_dev,
 static int rbd_try_lock(struct rbd_device *rbd_dev)
 {
 	struct ceph_client *client = rbd_dev->rbd_client->client;
-	struct ceph_locker *locker;
+	struct ceph_locker *locker, *refreshed_locker;
 	int ret;
 
 	for (;;) {
-		locker = NULL;
+		locker = refreshed_locker = NULL;
 
 		ret = rbd_lock(rbd_dev);
 		if (ret != -EBUSY)
@@ -3993,6 +3993,16 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
 		if (ret)
 			goto out; /* request lock or error */
 
+		refreshed_locker = get_lock_owner_info(rbd_dev);
+		if (IS_ERR(refreshed_locker)) {
+			ret = PTR_ERR(refreshed_locker);
+			refreshed_locker = NULL;
+			goto out;
+		}
+		if (!refreshed_locker ||
+		    !ceph_locker_equal(locker, refreshed_locker))
+			goto again;
+
 		rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
 			 ENTITY_NAME(locker->id.name));
 
@@ -4014,10 +4024,12 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
 		}
 
 again:
+		free_locker(refreshed_locker);
 		free_locker(locker);
 	}
 
 out:
+	free_locker(refreshed_locker);
 	free_locker(locker);
 	return ret;
 }
diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
index 17bc7584d1fe..b26f44ea38ca 100644
--- a/include/linux/ceph/cls_lock_client.h
+++ b/include/linux/ceph/cls_lock_client.h
@@ -24,6 +24,16 @@ struct ceph_locker {
 	struct ceph_locker_info info;
 };
 
+static inline bool ceph_locker_equal(const struct ceph_locker *lhs,
+				     const struct ceph_locker *rhs)
+{
+	return lhs->id.name.type == rhs->id.name.type &&
+	       lhs->id.name.num == rhs->id.name.num &&
+	       !strcmp(lhs->id.cookie, rhs->id.cookie) &&
+	       !memcmp(&lhs->info.addr, &rhs->info.addr,
+		       sizeof(rhs->info.addr));
+}
+
 int ceph_cls_lock(struct ceph_osd_client *osdc,
 		  struct ceph_object_id *oid,
 		  struct ceph_object_locator *oloc,
-- 
2.41.0




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux