Merge branch 'for-5.1' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq
[sfrench/cifs-2.6.git] / kernel / workqueue.c
index fc5d23d752a574d3c03487c8c817352a5972f83c..69bd8083930c982b438d02ab1169925d62f5d95e 100644 (file)
@@ -259,6 +259,8 @@ struct workqueue_struct {
        struct wq_device        *wq_dev;        /* I: for sysfs interface */
 #endif
 #ifdef CONFIG_LOCKDEP
+       char                    *lock_name;
+       struct lock_class_key   key;
        struct lockdep_map      lockdep_map;
 #endif
        char                    name[WQ_NAME_LEN]; /* I: workqueue name */
@@ -646,7 +648,7 @@ static void set_work_pool_and_clear_pending(struct work_struct *work,
         * The following mb guarantees that previous clear of a PENDING bit
         * will not be reordered with any speculative LOADS or STORES from
         * work->current_func, which is executed afterwards.  This possible
-        * reordering can lead to a missed execution on attempt to qeueue
+        * reordering can lead to a missed execution on attempt to queue
         * the same @work.  E.g. consider this case:
         *
         *   CPU#0                         CPU#1
@@ -1341,7 +1343,7 @@ static bool is_chained_work(struct workqueue_struct *wq)
 
        worker = current_wq_worker();
        /*
-        * Return %true iff I'm a worker execuing a work item on @wq.  If
+        * Return %true iff I'm a worker executing a work item on @wq.  If
         * I'm @worker, it's safe to dereference it without locking.
         */
        return worker && worker->current_pwq->wq == wq;
@@ -1512,6 +1514,90 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
 }
 EXPORT_SYMBOL(queue_work_on);
 
+/**
+ * workqueue_select_cpu_near - Select a CPU based on NUMA node
+ * @node: NUMA node ID that we want to select a CPU from
+ *
+ * This function will attempt to find a "random" cpu available on a given
+ * node. If there are no CPUs available on the given node it will return
+ * WORK_CPU_UNBOUND indicating that we should just schedule to any
+ * available CPU if we need to schedule this work.
+ */
+static int workqueue_select_cpu_near(int node)
+{
+       int cpu;
+
+       /* No point in doing this if NUMA isn't enabled for workqueues */
+       if (!wq_numa_enabled)
+               return WORK_CPU_UNBOUND;
+
+       /* Delay binding to CPU if node is not valid or online */
+       if (node < 0 || node >= MAX_NUMNODES || !node_online(node))
+               return WORK_CPU_UNBOUND;
+
+       /* Use local node/cpu if we are already there */
+       cpu = raw_smp_processor_id();
+       if (node == cpu_to_node(cpu))
+               return cpu;
+
+       /* Use "random" otherwise know as "first" online CPU of node */
+       cpu = cpumask_any_and(cpumask_of_node(node), cpu_online_mask);
+
+       /* If CPU is valid return that, otherwise just defer */
+       return cpu < nr_cpu_ids ? cpu : WORK_CPU_UNBOUND;
+}
+
+/**
+ * queue_work_node - queue work on a "random" cpu for a given NUMA node
+ * @node: NUMA node that we are targeting the work for
+ * @wq: workqueue to use
+ * @work: work to queue
+ *
+ * We queue the work to a "random" CPU within a given NUMA node. The basic
+ * idea here is to provide a way to somehow associate work with a given
+ * NUMA node.
+ *
+ * This function will only make a best effort attempt at getting this onto
+ * the right NUMA node. If no node is requested or the requested node is
+ * offline then we just fall back to standard queue_work behavior.
+ *
+ * Currently the "random" CPU ends up being the first available CPU in the
+ * intersection of cpu_online_mask and the cpumask of the node, unless we
+ * are running on the node. In that case we just use the current CPU.
+ *
+ * Return: %false if @work was already on a queue, %true otherwise.
+ */
+bool queue_work_node(int node, struct workqueue_struct *wq,
+                    struct work_struct *work)
+{
+       unsigned long flags;
+       bool ret = false;
+
+       /*
+        * This current implementation is specific to unbound workqueues.
+        * Specifically we only return the first available CPU for a given
+        * node instead of cycling through individual CPUs within the node.
+        *
+        * If this is used with a per-cpu workqueue then the logic in
+        * workqueue_select_cpu_near would need to be updated to allow for
+        * some round robin type logic.
+        */
+       WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND));
+
+       local_irq_save(flags);
+
+       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+               int cpu = workqueue_select_cpu_near(node);
+
+               __queue_work(cpu, wq, work);
+               ret = true;
+       }
+
+       local_irq_restore(flags);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(queue_work_node);
+
 void delayed_work_timer_fn(struct timer_list *t)
 {
        struct delayed_work *dwork = from_timer(dwork, t, timer);
@@ -1639,7 +1725,7 @@ static void rcu_work_rcufn(struct rcu_head *rcu)
  *
  * Return: %false if @rwork was already pending, %true otherwise.  Note
  * that a full RCU grace period is guaranteed only after a %true return.
- * While @rwork is guarnateed to be executed after a %false return, the
+ * While @rwork is guaranteed to be executed after a %false return, the
  * execution may happen before a full RCU grace period has passed.
  */
 bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
@@ -2931,6 +3017,9 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
        if (WARN_ON(!wq_online))
                return false;
 
+       if (WARN_ON(!work->func))
+               return false;
+
        if (!from_cancel) {
                lock_map_acquire(&work->lockdep_map);
                lock_map_release(&work->lockdep_map);
@@ -3337,11 +3426,49 @@ static int init_worker_pool(struct worker_pool *pool)
        return 0;
 }
 
+#ifdef CONFIG_LOCKDEP
+static void wq_init_lockdep(struct workqueue_struct *wq)
+{
+       char *lock_name;
+
+       lockdep_register_key(&wq->key);
+       lock_name = kasprintf(GFP_KERNEL, "%s%s", "(wq_completion)", wq->name);
+       if (!lock_name)
+               lock_name = wq->name;
+       lockdep_init_map(&wq->lockdep_map, lock_name, &wq->key, 0);
+}
+
+static void wq_unregister_lockdep(struct workqueue_struct *wq)
+{
+       lockdep_unregister_key(&wq->key);
+}
+
+static void wq_free_lockdep(struct workqueue_struct *wq)
+{
+       if (wq->lock_name != wq->name)
+               kfree(wq->lock_name);
+}
+#else
+static void wq_init_lockdep(struct workqueue_struct *wq)
+{
+}
+
+static void wq_unregister_lockdep(struct workqueue_struct *wq)
+{
+}
+
+static void wq_free_lockdep(struct workqueue_struct *wq)
+{
+}
+#endif
+
 static void rcu_free_wq(struct rcu_head *rcu)
 {
        struct workqueue_struct *wq =
                container_of(rcu, struct workqueue_struct, rcu);
 
+       wq_free_lockdep(wq);
+
        if (!(wq->flags & WQ_UNBOUND))
                free_percpu(wq->cpu_pwqs);
        else
@@ -3532,8 +3659,10 @@ static void pwq_unbound_release_workfn(struct work_struct *work)
         * If we're the last pwq going away, @wq is already dead and no one
         * is gonna access it anymore.  Schedule RCU free.
         */
-       if (is_last)
+       if (is_last) {
+               wq_unregister_lockdep(wq);
                call_rcu(&wq->rcu, rcu_free_wq);
+       }
 }
 
 /**
@@ -4067,11 +4196,9 @@ static int init_rescuer(struct workqueue_struct *wq)
        return 0;
 }
 
-struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
-                                              unsigned int flags,
-                                              int max_active,
-                                              struct lock_class_key *key,
-                                              const char *lock_name, ...)
+struct workqueue_struct *alloc_workqueue(const char *fmt,
+                                        unsigned int flags,
+                                        int max_active, ...)
 {
        size_t tbl_size = 0;
        va_list args;
@@ -4106,7 +4233,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
                        goto err_free_wq;
        }
 
-       va_start(args, lock_name);
+       va_start(args, max_active);
        vsnprintf(wq->name, sizeof(wq->name), fmt, args);
        va_end(args);
 
@@ -4123,7 +4250,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
        INIT_LIST_HEAD(&wq->flusher_overflow);
        INIT_LIST_HEAD(&wq->maydays);
 
-       lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
+       wq_init_lockdep(wq);
        INIT_LIST_HEAD(&wq->list);
 
        if (alloc_and_link_pwqs(wq) < 0)
@@ -4161,7 +4288,7 @@ err_destroy:
        destroy_workqueue(wq);
        return NULL;
 }
-EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
+EXPORT_SYMBOL_GPL(alloc_workqueue);
 
 /**
  * destroy_workqueue - safely terminate a workqueue
@@ -4214,6 +4341,7 @@ void destroy_workqueue(struct workqueue_struct *wq)
                kthread_stop(wq->rescuer->task);
 
        if (!(wq->flags & WQ_UNBOUND)) {
+               wq_unregister_lockdep(wq);
                /*
                 * The base ref is never dropped on per-cpu pwqs.  Directly
                 * schedule RCU free.