From: Jeff Layton <jlayton@redhat.com> Date: Thu, 5 Jun 2008 07:51:51 -0400 Subject: [nfs] sunrpc: fix a race in rpciod_down Message-id: 1212666712-30755-2-git-send-email-jlayton@redhat.com O-Subject: [RHEL5.3 PATCH 1/2] BZ#448754: SUNRPC: Fix a race in rpciod_down() Bugzilla: 448754 RH-Acked-by: Steve Dickson <SteveD@redhat.com> The patch that *did* get committed for bug 246642 can lead to deadlock. To ensure that an RPC client could always send an async RPC call, that patch made it so that a new RPC client would take a reference to the rpciod workqueue. Because of this, the last reference holder to the workqueue might be put by work running on that workqueue. Original problem description from upstream is here: --------------[snip]---------------- The commit 4ada539ed77c7a2bbcb75cafbbd7bd8d2b9bef7b lead to the unpleasant possibility of an asynchronous rpc_task being required to call rpciod_down() when it is complete. This again means that the rpciod workqueue may get to call destroy_workqueue on itself -> hang... Change rpciod_up/rpciod_down to just get/put the module, and then create/destroy the workqueues on module load/unload. Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com> --------------[snip]---------------- I've been able to reproduce this deadlock with a (rather contrived) series of steps that I was using to reproduce a different problem. This patch does fix it. This patch does change rpciod behavior somewhat. Rather than starting rpciod on the first rpciod_up, it starts rpciod on module load and stops it when the module is unloaded. This is probably not something most users will notice or care about, but it's worth noting. diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 8dde394..7e870ed 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -41,7 +41,6 @@ static mempool_t *rpc_task_mempool __read_mostly; static mempool_t *rpc_buffer_mempool __read_mostly; static void __rpc_default_timer(struct rpc_task *task); -static void rpciod_killall(void); static void rpc_async_schedule(void *); static void rpc_release_task(struct rpc_task *task); @@ -64,8 +63,6 @@ static LIST_HEAD(all_tasks); /* * rpciod-related stuff */ -static DEFINE_MUTEX(rpciod_mutex); -static unsigned int rpciod_users; struct workqueue_struct *rpciod_workqueue; /* @@ -1060,84 +1057,43 @@ void rpc_killall_tasks(struct rpc_clnt *clnt) spin_unlock(&rpc_sched_lock); } -static DECLARE_MUTEX_LOCKED(rpciod_running); - -static void rpciod_killall(void) +int rpciod_up(void) { - unsigned long flags; - - while (!list_empty(&all_tasks)) { - clear_thread_flag(TIF_SIGPENDING); - rpc_killall_tasks(NULL); - flush_workqueue(rpciod_workqueue); - if (!list_empty(&all_tasks)) { - dprintk("rpciod_killall: waiting for tasks to exit\n"); - yield(); - } - } + return try_module_get(THIS_MODULE) ? 0 : -EINVAL; +} - spin_lock_irqsave(¤t->sighand->siglock, flags); - recalc_sigpending(); - spin_unlock_irqrestore(¤t->sighand->siglock, flags); +void rpciod_down(void) +{ + module_put(THIS_MODULE); } /* - * Start up the rpciod process if it's not already running. + * Start up the rpciod workqueue. */ -int -rpciod_up(void) +static int rpciod_start(void) { struct workqueue_struct *wq; - int error = 0; - mutex_lock(&rpciod_mutex); - dprintk("rpciod_up: users %d\n", rpciod_users); - rpciod_users++; - if (rpciod_workqueue) - goto out; - /* - * If there's no pid, we should be the first user. - */ - if (rpciod_users > 1) - printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users); /* * Create the rpciod thread and wait for it to start. */ - error = -ENOMEM; + dprintk("RPC: creating workqueue rpciod\n"); wq = create_workqueue("rpciod"); - if (wq == NULL) { - printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error); - rpciod_users--; - goto out; - } rpciod_workqueue = wq; - error = 0; -out: - mutex_unlock(&rpciod_mutex); - return error; + return rpciod_workqueue != NULL; } -void -rpciod_down(void) +static void rpciod_stop(void) { - mutex_lock(&rpciod_mutex); - dprintk("rpciod_down sema %d\n", rpciod_users); - if (rpciod_users) { - if (--rpciod_users) - goto out; - } else - printk(KERN_WARNING "rpciod_down: no users??\n"); + struct workqueue_struct *wq = NULL; - if (!rpciod_workqueue) { - dprintk("rpciod_down: Nothing to do!\n"); - goto out; - } - rpciod_killall(); + if (rpciod_workqueue == NULL) + return; + dprintk("RPC: destroying workqueue rpciod\n"); - destroy_workqueue(rpciod_workqueue); + wq = rpciod_workqueue; rpciod_workqueue = NULL; - out: - mutex_unlock(&rpciod_mutex); + destroy_workqueue(wq); } #ifdef RPC_DEBUG @@ -1176,6 +1132,7 @@ void rpc_show_tasks(void) void rpc_destroy_mempool(void) { + rpciod_stop(); if (rpc_buffer_mempool) mempool_destroy(rpc_buffer_mempool); if (rpc_task_mempool) @@ -1209,6 +1166,8 @@ rpc_init_mempool(void) rpc_buffer_slabp); if (!rpc_buffer_mempool) goto err_nomem; + if (!rpciod_start()) + goto err_nomem; return 0; err_nomem: rpc_destroy_mempool();