libceph: multiple workspaces for CRUSH computations
[sfrench/cifs-2.6.git] / net / ceph / osdmap.c
index 96c25f5e064aaf698ae3dd832cb08f768f56020b..fa08c15be0c0c0d2b9ced03392fdf3184bf1547b 100644 (file)
@@ -964,6 +964,143 @@ bad:
        return -EINVAL;
 }
 
+/*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+       struct crush_work *work;
+       size_t work_size;
+
+       WARN_ON(!c->working_size);
+       work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+       dout("%s work_size %zu bytes\n", __func__, work_size);
+
+       work = ceph_kvmalloc(work_size, GFP_NOIO);
+       if (!work)
+               return NULL;
+
+       INIT_LIST_HEAD(&work->item);
+       crush_init_workspace(c, work);
+       return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+       WARN_ON(!list_empty(&work->item));
+       kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+       INIT_LIST_HEAD(&wsm->idle_ws);
+       spin_lock_init(&wsm->ws_lock);
+       atomic_set(&wsm->total_ws, 0);
+       wsm->free_ws = 0;
+       init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+                                 struct crush_work *work)
+{
+       WARN_ON(!list_empty(&wsm->idle_ws));
+
+       list_add(&work->item, &wsm->idle_ws);
+       atomic_set(&wsm->total_ws, 1);
+       wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+       struct crush_work *work;
+
+       while (!list_empty(&wsm->idle_ws)) {
+               work = list_first_entry(&wsm->idle_ws, struct crush_work,
+                                       item);
+               list_del_init(&work->item);
+               free_workspace(work);
+       }
+       atomic_set(&wsm->total_ws, 0);
+       wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one.  If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+                                       const struct crush_map *c)
+{
+       struct crush_work *work;
+       int cpus = num_online_cpus();
+
+again:
+       spin_lock(&wsm->ws_lock);
+       if (!list_empty(&wsm->idle_ws)) {
+               work = list_first_entry(&wsm->idle_ws, struct crush_work,
+                                       item);
+               list_del_init(&work->item);
+               wsm->free_ws--;
+               spin_unlock(&wsm->ws_lock);
+               return work;
+
+       }
+       if (atomic_read(&wsm->total_ws) > cpus) {
+               DEFINE_WAIT(wait);
+
+               spin_unlock(&wsm->ws_lock);
+               prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+               if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+                       schedule();
+               finish_wait(&wsm->ws_wait, &wait);
+               goto again;
+       }
+       atomic_inc(&wsm->total_ws);
+       spin_unlock(&wsm->ws_lock);
+
+       work = alloc_workspace(c);
+       if (!work) {
+               atomic_dec(&wsm->total_ws);
+               wake_up(&wsm->ws_wait);
+
+               /*
+                * Do not return the error but go back to waiting.  We
+                * have the inital workspace and the CRUSH computation
+                * time is bounded so we will get it eventually.
+                */
+               WARN_ON(atomic_read(&wsm->total_ws) < 1);
+               goto again;
+       }
+       return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+                         struct crush_work *work)
+{
+       spin_lock(&wsm->ws_lock);
+       if (wsm->free_ws <= num_online_cpus()) {
+               list_add(&work->item, &wsm->idle_ws);
+               wsm->free_ws++;
+               spin_unlock(&wsm->ws_lock);
+               goto wake;
+       }
+       spin_unlock(&wsm->ws_lock);
+
+       free_workspace(work);
+       atomic_dec(&wsm->total_ws);
+wake:
+       if (wq_has_sleeper(&wsm->ws_wait))
+               wake_up(&wsm->ws_wait);
+}
+
 /*
  * osd map
  */
@@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
        map->primary_temp = RB_ROOT;
        map->pg_upmap = RB_ROOT;
        map->pg_upmap_items = RB_ROOT;
-       mutex_init(&map->crush_workspace_mutex);
+
+       init_workspace_manager(&map->crush_wsm);
 
        return map;
 }
@@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 void ceph_osdmap_destroy(struct ceph_osdmap *map)
 {
        dout("osdmap_destroy %p\n", map);
+
        if (map->crush)
                crush_destroy(map->crush);
+       cleanup_workspace_manager(&map->crush_wsm);
+
        while (!RB_EMPTY_ROOT(&map->pg_temp)) {
                struct ceph_pg_mapping *pg =
                        rb_entry(rb_first(&map->pg_temp),
@@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
        kvfree(map->osd_weight);
        kvfree(map->osd_addr);
        kvfree(map->osd_primary_affinity);
-       kvfree(map->crush_workspace);
        kfree(map);
 }
 
@@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
 
 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 {
-       void *workspace;
-       size_t work_size;
+       struct crush_work *work;
 
        if (IS_ERR(crush))
                return PTR_ERR(crush);
 
-       work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
-       dout("%s work_size %zu bytes\n", __func__, work_size);
-       workspace = ceph_kvmalloc(work_size, GFP_NOIO);
-       if (!workspace) {
+       work = alloc_workspace(crush);
+       if (!work) {
                crush_destroy(crush);
                return -ENOMEM;
        }
-       crush_init_workspace(crush, workspace);
 
        if (map->crush)
                crush_destroy(map->crush);
-       kvfree(map->crush_workspace);
+       cleanup_workspace_manager(&map->crush_wsm);
        map->crush = crush;
-       map->crush_workspace = workspace;
+       add_initial_workspace(&map->crush_wsm, work);
        return 0;
 }
 
@@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
                    s64 choose_args_index)
 {
        struct crush_choose_arg_map *arg_map;
+       struct crush_work *work;
        int r;
 
        BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
                arg_map = lookup_choose_arg_map(&map->crush->choose_args,
                                                CEPH_DEFAULT_CHOOSE_ARGS);
 
-       mutex_lock(&map->crush_workspace_mutex);
+       work = get_workspace(&map->crush_wsm, map->crush);
        r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-                         weight, weight_max, map->crush_workspace,
+                         weight, weight_max, work,
                          arg_map ? arg_map->args : NULL);
-       mutex_unlock(&map->crush_workspace_mutex);
-
+       put_workspace(&map->crush_wsm, work);
        return r;
 }