On Wed, 2025-02-19 at 14:44 +0200, Alex Markuze wrote: > This fixes the TOCTOU problem of accessing the epoch field after map-free. > I'd like to know if it's not leaving a correctness problem instead. Is > the return value of have_mon_and_osd_map still valid and relevant in > this case, where it is being concurrently freed? > Frankly speaking, I don't quite follow your point. The handle_one_map() [1] can change the old map on new one: if (newmap != osdc->osdmap) { <skipped> ceph_osdmap_destroy(osdc->osdmap); <--- Thread could sleep here ---> osdc->osdmap = newmap; } And have_mon_and_osd_map() [2] can try to access osdc->osdmap in the middle of this change operation without using the lock, because this osdmap change is executing under osdc.lock. So, do you mean that it is impossible case? Or do you mean that the suggested fix is not enough for fixing the issue? What is your vision of the fix then? :) The issue is not about complete stop but about of switching from one osdmap to another one. And have_mon_and_osd_map() is used in __ceph_open_session() [3] to join the ceph cluster, and open root directory: /* * mount: join the ceph cluster, and open root directory. */ int __ceph_open_session(struct ceph_client *client, unsigned long started) { unsigned long timeout = client->options->mount_timeout; long err; /* open session, and wait for mon and osd maps */ err = ceph_monc_open_session(&client->monc); if (err < 0) return err; while (!have_mon_and_osd_map(client)) { if (timeout && time_after_eq(jiffies, started + timeout)) return -ETIMEDOUT; /* wait */ dout("mount waiting for mon_map\n"); err = wait_event_interruptible_timeout(client->auth_wq, have_mon_and_osd_map(client) || (client->auth_err < 0), ceph_timeout_jiffies(timeout)); if (err < 0) return err; if (client->auth_err < 0) return client->auth_err; } pr_info("client%llu fsid %pU\n", ceph_client_gid(client), &client->fsid); ceph_debugfs_client_init(client); return 0; } EXPORT_SYMBOL(__ceph_open_session); So, we simply need to be sure that some osdmap is available, as far as I can see. Potentially, maybe, we need to NULL the osdc->osdmap in ceph_osdc_stop() [4]: void ceph_osdc_stop(struct ceph_osd_client *osdc) { destroy_workqueue(osdc->completion_wq); destroy_workqueue(osdc->notify_wq); cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); down_write(&osdc->lock); while (!RB_EMPTY_ROOT(&osdc->osds)) { struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), struct ceph_osd, o_node); close_osd(osd); } up_write(&osdc->lock); WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); osd_cleanup(&osdc->homeless_osd); WARN_ON(!list_empty(&osdc->osd_lru)); WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); WARN_ON(atomic_read(&osdc->num_requests)); WARN_ON(atomic_read(&osdc->num_homeless)); ceph_osdmap_destroy(osdc->osdmap); <--- Here, we need to NULL the poiner mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); } Thanks, Slava. [1] https://elixir.bootlin.com/linux/v6.14-rc2/source/net/ceph/osd_client.c#L4025 [2] https://elixir.bootlin.com/linux/v6.14-rc2/source/net/ceph/ceph_common.c#L791 [3] https://elixir.bootlin.com/linux/v6.14-rc2/source/net/ceph/ceph_common.c#L800 [4] https://elixir.bootlin.com/linux/v6.14-rc2/source/net/ceph/osd_client.c#L5285