]> bbs.cooldavid.org Git - net-next-2.6.git/blobdiff - fs/ceph/osd_client.c
ceph: use complete_all and wake_up_all
[net-next-2.6.git] / fs / ceph / osd_client.c
index 3514f71ff85f79a866fb57699ade4f831d02d859..e38522347898046fe778df23bdc467c5d4248363 100644 (file)
@@ -16,7 +16,7 @@
 #define OSD_OP_FRONT_LEN       4096
 #define OSD_OPREPLY_FRONT_LEN  512
 
-const static struct ceph_connection_operations osd_con_ops;
+static const struct ceph_connection_operations osd_con_ops;
 static int __kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_osd *kickosd);
 
@@ -147,7 +147,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                req = kzalloc(sizeof(*req), GFP_NOFS);
        }
        if (req == NULL)
-               return ERR_PTR(-ENOMEM);
+               return NULL;
 
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
@@ -164,10 +164,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
        else
                msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-                                  OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
-       if (IS_ERR(msg)) {
+                                  OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
+       if (!msg) {
                ceph_osdc_put_request(req);
-               return ERR_PTR(PTR_ERR(msg));
+               return NULL;
        }
        req->r_reply = msg;
 
@@ -178,10 +178,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
        else
-               msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
-       if (IS_ERR(msg)) {
+               msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
+       if (!msg) {
                ceph_osdc_put_request(req);
-               return ERR_PTR(PTR_ERR(msg));
+               return NULL;
        }
        msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
        memset(msg->front.iov_base, 0, msg->front.iov_len);
@@ -361,8 +361,13 @@ static void put_osd(struct ceph_osd *osd)
 {
        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
             atomic_read(&osd->o_ref) - 1);
-       if (atomic_dec_and_test(&osd->o_ref))
+       if (atomic_dec_and_test(&osd->o_ref)) {
+               struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
+
+               if (osd->o_authorizer)
+                       ac->ops->destroy_authorizer(ac, osd->o_authorizer);
                kfree(osd);
+       }
 }
 
 /*
@@ -715,7 +720,7 @@ static void handle_timeout(struct work_struct *work)
         * should mark the osd as failed and we should find out about
         * it from an updated osd map.
         */
-       while (!list_empty(&osdc->req_lru)) {
+       while (timeout && !list_empty(&osdc->req_lru)) {
                req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
                                 r_req_lru_item);
 
@@ -857,12 +862,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        if (req->r_callback)
                req->r_callback(req, msg);
        else
-               complete(&req->r_completion);
+               complete_all(&req->r_completion);
 
        if (flags & CEPH_OSD_FLAG_ONDISK) {
                if (req->r_safe_callback)
                        req->r_safe_callback(req, msg);
-               complete(&req->r_safe_completion);  /* fsync waiter */
+               complete_all(&req->r_safe_completion);  /* fsync waiter */
        }
 
 done:
@@ -1078,6 +1083,7 @@ done:
        if (newmap)
                kick_requests(osdc, NULL);
        up_read(&osdc->map_sem);
+       wake_up_all(&osdc->client->auth_wq);
        return;
 
 bad:
@@ -1087,45 +1093,6 @@ bad:
        return;
 }
 
-
-/*
- * A read request prepares specific pages that data is to be read into.
- * When a message is being read off the wire, we call prepare_pages to
- * find those pages.
- *  0 = success, -1 failure.
- */
-static int __prepare_pages(struct ceph_connection *con,
-                        struct ceph_msg_header *hdr,
-                        struct ceph_osd_request *req,
-                        u64 tid,
-                        struct ceph_msg *m)
-{
-       struct ceph_osd *osd = con->private;
-       struct ceph_osd_client *osdc;
-       int ret = -1;
-       int data_len = le32_to_cpu(hdr->data_len);
-       unsigned data_off = le16_to_cpu(hdr->data_off);
-
-       int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
-
-       if (!osd)
-               return -1;
-
-       osdc = osd->o_osdc;
-
-       dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
-            tid, req->r_num_pages, want);
-       if (unlikely(req->r_num_pages < want))
-               goto out;
-       m->pages = req->r_pages;
-       m->nr_pages = req->r_num_pages;
-       ret = 0; /* success */
-out:
-       BUG_ON(ret < 0 || m->nr_pages < want);
-
-       return ret;
-}
-
 /*
  * Register request, send initial attempt.
  */
@@ -1252,11 +1219,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        if (!osdc->req_mempool)
                goto out;
 
-       err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
+       err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
+                               "osd_op");
        if (err < 0)
                goto out_mempool;
        err = ceph_msgpool_init(&osdc->msgpool_op_reply,
-                               OSD_OPREPLY_FRONT_LEN, 10, true);
+                               OSD_OPREPLY_FRONT_LEN, 10, true,
+                               "osd_op_reply");
        if (err < 0)
                goto out_msgpool;
        return 0;
@@ -1302,8 +1271,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
                                    false, 1);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
+       if (!req)
+               return -ENOMEM;
 
        /* it may be a short read due to an object boundary */
        req->r_pages = pages;
@@ -1345,8 +1314,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                                    snapc, do_sync,
                                    truncate_seq, truncate_size, mtime,
                                    nofail, 1);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
+       if (!req)
+               return -ENOMEM;
 
        /* it may be a short write due to an object boundary */
        req->r_pages = pages;
@@ -1375,7 +1344,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        int type = le16_to_cpu(msg->hdr.type);
 
        if (!osd)
-               return;
+               goto out;
        osdc = osd->o_osdc;
 
        switch (type) {
@@ -1390,11 +1359,13 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                pr_err("received unknown message type %d %s\n", type,
                       ceph_msg_type_name(type));
        }
+out:
        ceph_msg_put(msg);
 }
 
 /*
- * lookup and return message for incoming reply
+ * lookup and return message for incoming reply.  set up reply message
+ * pages.
  */
 static struct ceph_msg *get_reply(struct ceph_connection *con,
                                  struct ceph_msg_header *hdr,
@@ -1407,7 +1378,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        int front = le32_to_cpu(hdr->front_len);
        int data_len = le32_to_cpu(hdr->data_len);
        u64 tid;
-       int err;
 
        tid = le64_to_cpu(hdr->tid);
        mutex_lock(&osdc->request_mutex);
@@ -1425,13 +1395,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                     req->r_reply, req->r_con_filling_msg);
                ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
                ceph_con_put(req->r_con_filling_msg);
+               req->r_con_filling_msg = NULL;
        }
 
        if (front > req->r_reply->front.iov_len) {
                pr_warning("get_reply front %d > preallocated %d\n",
                           front, (int)req->r_reply->front.iov_len);
-               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
-               if (IS_ERR(m))
+               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
+               if (!m)
                        goto out;
                ceph_msg_put(req->r_reply);
                req->r_reply = m;
@@ -1439,12 +1410,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        m = ceph_msg_get(req->r_reply);
 
        if (data_len > 0) {
-               err = __prepare_pages(con, hdr, req, tid, m);
-               if (err < 0) {
+               unsigned data_off = le16_to_cpu(hdr->data_off);
+               int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
+
+               if (unlikely(req->r_num_pages < want)) {
+                       pr_warning("tid %lld reply %d > expected %d pages\n",
+                                  tid, want, m->nr_pages);
                        *skip = 1;
                        ceph_msg_put(m);
-                       m = ERR_PTR(err);
+                       m = NULL;
+                       goto out;
                }
+               m->pages = req->r_pages;
+               m->nr_pages = req->r_num_pages;
        }
        *skip = 0;
        req->r_con_filling_msg = ceph_con_get(con);
@@ -1466,7 +1444,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 
        switch (type) {
        case CEPH_MSG_OSD_MAP:
-               return ceph_msg_new(type, front, 0, 0, NULL);
+               return ceph_msg_new(type, front, GFP_NOFS);
        case CEPH_MSG_OSD_OPREPLY:
                return get_reply(con, hdr, skip);
        default:
@@ -1552,7 +1530,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
        return ceph_monc_validate_auth(&osdc->client->monc);
 }
 
-const static struct ceph_connection_operations osd_con_ops = {
+static const struct ceph_connection_operations osd_con_ops = {
        .get = get_osd_con,
        .put = put_osd_con,
        .dispatch = dispatch,