Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client
[sfrench/cifs-2.6.git] / net / ceph / osd_client.c
index 242d7c0d92f8c34084ec5f3201c7529c55c681a9..924f07c36ddbc2864c7428723766d0a64729c7c4 100644 (file)
@@ -961,6 +961,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                       truncate_size, truncate_seq);
        }
 
+       req->r_abort_on_full = true;
        req->r_flags = flags;
        req->r_base_oloc.pool = layout->pool_id;
        req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
@@ -1005,7 +1006,7 @@ static bool osd_registered(struct ceph_osd *osd)
  */
 static void osd_init(struct ceph_osd *osd)
 {
-       atomic_set(&osd->o_ref, 1);
+       refcount_set(&osd->o_ref, 1);
        RB_CLEAR_NODE(&osd->o_node);
        osd->o_requests = RB_ROOT;
        osd->o_linger_requests = RB_ROOT;
@@ -1050,9 +1051,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 
 static struct ceph_osd *get_osd(struct ceph_osd *osd)
 {
-       if (atomic_inc_not_zero(&osd->o_ref)) {
-               dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
-                    atomic_read(&osd->o_ref));
+       if (refcount_inc_not_zero(&osd->o_ref)) {
+               dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
+                    refcount_read(&osd->o_ref));
                return osd;
        } else {
                dout("get_osd %p FAIL\n", osd);
@@ -1062,9 +1063,9 @@ static struct ceph_osd *get_osd(struct ceph_osd *osd)
 
 static void put_osd(struct ceph_osd *osd)
 {
-       dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
-            atomic_read(&osd->o_ref) - 1);
-       if (atomic_dec_and_test(&osd->o_ref)) {
+       dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
+            refcount_read(&osd->o_ref) - 1);
+       if (refcount_dec_and_test(&osd->o_ref)) {
                osd_cleanup(osd);
                kfree(osd);
        }
@@ -1297,8 +1298,9 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
                       __pool_full(pi);
 
        WARN_ON(pi->id != t->base_oloc.pool);
-       return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
-              (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
+       return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
+              ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
+              (osdc->osdmap->epoch < osdc->epoch_barrier);
 }
 
 enum calc_target_result {
@@ -1503,9 +1505,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
        ceph_encode_32(&p, req->r_flags);
        ceph_encode_timespec(p, &req->r_mtime);
        p += sizeof(struct ceph_timespec);
-       /* aka reassert_version */
-       memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
-       p += sizeof(req->r_replay_version);
+
+       /* reassert_version */
+       memset(p, 0, sizeof(struct ceph_eversion));
+       p += sizeof(struct ceph_eversion);
 
        /* oloc */
        ceph_start_encoding(&p, 5, 4,
@@ -1626,6 +1629,7 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
                ceph_monc_renew_subs(&osdc->client->monc);
 }
 
+static void complete_request(struct ceph_osd_request *req, int err);
 static void send_map_check(struct ceph_osd_request *req);
 
 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
@@ -1635,6 +1639,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
        enum calc_target_result ct_res;
        bool need_send = false;
        bool promoted = false;
+       bool need_abort = false;
 
        WARN_ON(req->r_tid);
        dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@ -1650,8 +1655,13 @@ again:
                goto promote;
        }
 
-       if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
-           ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
+       if (osdc->osdmap->epoch < osdc->epoch_barrier) {
+               dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
+                    osdc->epoch_barrier);
+               req->r_t.paused = true;
+               maybe_request_map(osdc);
+       } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+                  ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
                dout("req %p pausewr\n", req);
                req->r_t.paused = true;
                maybe_request_map(osdc);
@@ -1669,6 +1679,8 @@ again:
                pr_warn_ratelimited("FULL or reached pool quota\n");
                req->r_t.paused = true;
                maybe_request_map(osdc);
+               if (req->r_abort_on_full)
+                       need_abort = true;
        } else if (!osd_homeless(osd)) {
                need_send = true;
        } else {
@@ -1685,6 +1697,8 @@ again:
        link_request(osd, req);
        if (need_send)
                send_request(req);
+       else if (need_abort)
+               complete_request(req, -ENOSPC);
        mutex_unlock(&osd->lock);
 
        if (ct_res == CALC_TARGET_POOL_DNE)
@@ -1799,6 +1813,97 @@ static void abort_request(struct ceph_osd_request *req, int err)
        complete_request(req, err);
 }
 
+static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+{
+       if (likely(eb > osdc->epoch_barrier)) {
+               dout("updating epoch_barrier from %u to %u\n",
+                               osdc->epoch_barrier, eb);
+               osdc->epoch_barrier = eb;
+               /* Request map if we're not to the barrier yet */
+               if (eb > osdc->osdmap->epoch)
+                       maybe_request_map(osdc);
+       }
+}
+
+void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+{
+       down_read(&osdc->lock);
+       if (unlikely(eb > osdc->epoch_barrier)) {
+               up_read(&osdc->lock);
+               down_write(&osdc->lock);
+               update_epoch_barrier(osdc, eb);
+               up_write(&osdc->lock);
+       } else {
+               up_read(&osdc->lock);
+       }
+}
+EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
+
+/*
+ * Drop all pending requests that are stalled waiting on a full condition to
+ * clear, and complete them with ENOSPC as the return code. Set the
+ * osdc->epoch_barrier to the latest map epoch that we've seen if any were
+ * cancelled.
+ */
+static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
+{
+       struct rb_node *n;
+       bool victims = false;
+
+       dout("enter abort_on_full\n");
+
+       if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
+               goto out;
+
+       /* Scan list and see if there is anything to abort */
+       for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+               struct rb_node *m;
+
+               m = rb_first(&osd->o_requests);
+               while (m) {
+                       struct ceph_osd_request *req = rb_entry(m,
+                                       struct ceph_osd_request, r_node);
+                       m = rb_next(m);
+
+                       if (req->r_abort_on_full) {
+                               victims = true;
+                               break;
+                       }
+               }
+               if (victims)
+                       break;
+       }
+
+       if (!victims)
+               goto out;
+
+       /*
+        * Update the barrier to current epoch if it's behind that point,
+        * since we know we have some calls to be aborted in the tree.
+        */
+       update_epoch_barrier(osdc, osdc->osdmap->epoch);
+
+       for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+               struct rb_node *m;
+
+               m = rb_first(&osd->o_requests);
+               while (m) {
+                       struct ceph_osd_request *req = rb_entry(m,
+                                       struct ceph_osd_request, r_node);
+                       m = rb_next(m);
+
+                       if (req->r_abort_on_full &&
+                           (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
+                            pool_full(osdc, req->r_t.target_oloc.pool)))
+                               abort_request(req, -ENOSPC);
+               }
+       }
+out:
+       dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
+}
+
 static void check_pool_dne(struct ceph_osd_request *req)
 {
        struct ceph_osd_client *osdc = req->r_osdc;
@@ -3252,11 +3357,13 @@ done:
        pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
                  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
                  have_pool_full(osdc);
-       if (was_pauserd || was_pausewr || pauserd || pausewr)
+       if (was_pauserd || was_pausewr || pauserd || pausewr ||
+           osdc->osdmap->epoch < osdc->epoch_barrier)
                maybe_request_map(osdc);
 
        kick_requests(osdc, &need_resend, &need_resend_linger);
 
+       ceph_osdc_abort_on_full(osdc);
        ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
                          osdc->osdmap->epoch);
        up_write(&osdc->lock);
@@ -4126,7 +4233,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
                close_osd(osd);
        }
        up_write(&osdc->lock);
-       WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
+       WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
        osd_cleanup(&osdc->homeless_osd);
 
        WARN_ON(!list_empty(&osdc->osd_lru));