]> bbs.cooldavid.org Git - net-next-2.6.git/blob - fs/ceph/osd_client.c
ceph: put unused osd connections on lru
[net-next-2.6.git] / fs / ceph / osd_client.c
1 #include "ceph_debug.h"
2
3 #include <linux/err.h>
4 #include <linux/highmem.h>
5 #include <linux/mm.h>
6 #include <linux/pagemap.h>
7 #include <linux/slab.h>
8 #include <linux/uaccess.h>
9
10 #include "super.h"
11 #include "osd_client.h"
12 #include "messenger.h"
13 #include "decode.h"
14 #include "auth.h"
15
16 #define OSD_REPLY_RESERVE_FRONT_LEN     512
17
18 const static struct ceph_connection_operations osd_con_ops;
19
20 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
21
22 /*
23  * Implement client access to distributed object storage cluster.
24  *
25  * All data objects are stored within a cluster/cloud of OSDs, or
26  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
27  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
28  * remote daemons serving up and coordinating consistent and safe
29  * access to storage.
30  *
31  * Cluster membership and the mapping of data objects onto storage devices
32  * are described by the osd map.
33  *
34  * We keep track of pending OSD requests (read, write), resubmit
35  * requests to different OSDs when the cluster topology/data layout
36  * change, or retry the affected requests when the communications
37  * channel with an OSD is reset.
38  */
39
40 /*
41  * calculate the mapping of a file extent onto an object, and fill out the
42  * request accordingly.  shorten extent as necessary if it crosses an
43  * object boundary.
44  *
45  * fill osd op in request message.
46  */
47 static void calc_layout(struct ceph_osd_client *osdc,
48                         struct ceph_vino vino, struct ceph_file_layout *layout,
49                         u64 off, u64 *plen,
50                         struct ceph_osd_request *req)
51 {
52         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
53         struct ceph_osd_op *op = (void *)(reqhead + 1);
54         u64 orig_len = *plen;
55         u64 objoff, objlen;    /* extent in object */
56         u64 bno;
57
58         reqhead->snapid = cpu_to_le64(vino.snap);
59
60         /* object extent? */
61         ceph_calc_file_object_mapping(layout, off, plen, &bno,
62                                       &objoff, &objlen);
63         if (*plen < orig_len)
64                 dout(" skipping last %llu, final file extent %llu~%llu\n",
65                      orig_len - *plen, off, *plen);
66
67         sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
68         req->r_oid_len = strlen(req->r_oid);
69
70         op->extent.offset = cpu_to_le64(objoff);
71         op->extent.length = cpu_to_le64(objlen);
72         req->r_num_pages = calc_pages_for(off, *plen);
73
74         dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
75              req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
76 }
77
78 static void remove_replies(struct ceph_osd_request *req)
79 {
80         int i;
81         int max = ARRAY_SIZE(req->replies);
82
83         for (i=0; i<max; i++) {
84                 if (req->replies[i])
85                         ceph_msg_put(req->replies[i]);
86         }
87 }
88
89 /*
90  * requests
91  */
92 void ceph_osdc_release_request(struct kref *kref)
93 {
94         struct ceph_osd_request *req = container_of(kref,
95                                                     struct ceph_osd_request,
96                                                     r_kref);
97
98         if (req->r_request)
99                 ceph_msg_put(req->r_request);
100         if (req->r_reply)
101                 ceph_msg_put(req->r_reply);
102         remove_replies(req);
103         if (req->r_con_filling_msg) {
104                 dout("release_request revoking pages %p from con %p\n",
105                      req->r_pages, req->r_con_filling_msg);
106                 ceph_con_revoke_message(req->r_con_filling_msg,
107                                       req->r_reply);
108                 ceph_con_put(req->r_con_filling_msg);
109         }
110         if (req->r_own_pages)
111                 ceph_release_page_vector(req->r_pages,
112                                          req->r_num_pages);
113         ceph_put_snap_context(req->r_snapc);
114         if (req->r_mempool)
115                 mempool_free(req, req->r_osdc->req_mempool);
116         else
117                 kfree(req);
118 }
119
120 static int alloc_replies(struct ceph_osd_request *req, int num_reply)
121 {
122         int i;
123         int max = ARRAY_SIZE(req->replies);
124
125         BUG_ON(num_reply > max);
126
127         for (i=0; i<num_reply; i++) {
128                 req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
129                 if (IS_ERR(req->replies[i])) {
130                         int j;
131                         int err = PTR_ERR(req->replies[i]);
132                         for (j = 0; j<=i; j++) {
133                                 ceph_msg_put(req->replies[j]);
134                         }
135                         return err;
136                 }
137         }
138
139         for (; i<max; i++) {
140                 req->replies[i] = NULL;
141         }
142
143         req->cur_reply = 0;
144
145         return 0;
146 }
147
148 static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
149                                        struct ceph_osd_request *req,
150                                        int front_len)
151 {
152         struct ceph_msg *reply;
153         if (req->r_con_filling_msg) {
154                 dout("revoking reply msg %p from old con %p\n", req->r_reply,
155                      req->r_con_filling_msg);
156                 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
157                 ceph_con_put(req->r_con_filling_msg);
158                 req->cur_reply = 0;
159         }
160         reply = req->replies[req->cur_reply];
161         if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
162                 /* maybe we can allocate it now? */
163                 reply = ceph_msg_new(0, front_len, 0, 0, NULL);
164                 if (!reply || IS_ERR(reply)) {
165                         pr_err(" reply alloc failed, front_len=%d\n", front_len);
166                         return ERR_PTR(-ENOMEM);
167                 }
168         }
169         req->r_con_filling_msg = ceph_con_get(con);
170         req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
171         return ceph_msg_get(reply);
172 }
173
174 /*
175  * build new request AND message, calculate layout, and adjust file
176  * extent as needed.
177  *
178  * if the file was recently truncated, we include information about its
179  * old and new size so that the object can be updated appropriately.  (we
180  * avoid synchronously deleting truncated objects because it's slow.)
181  *
182  * if @do_sync, include a 'startsync' command so that the osd will flush
183  * data quickly.
184  */
185 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
186                                                struct ceph_file_layout *layout,
187                                                struct ceph_vino vino,
188                                                u64 off, u64 *plen,
189                                                int opcode, int flags,
190                                                struct ceph_snap_context *snapc,
191                                                int do_sync,
192                                                u32 truncate_seq,
193                                                u64 truncate_size,
194                                                struct timespec *mtime,
195                                                bool use_mempool, int num_reply)
196 {
197         struct ceph_osd_request *req;
198         struct ceph_msg *msg;
199         struct ceph_osd_request_head *head;
200         struct ceph_osd_op *op;
201         void *p;
202         int num_op = 1 + do_sync;
203         size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
204         int err, i;
205
206         if (use_mempool) {
207                 req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
208                 memset(req, 0, sizeof(*req));
209         } else {
210                 req = kzalloc(sizeof(*req), GFP_NOFS);
211         }
212         if (req == NULL)
213                 return ERR_PTR(-ENOMEM);
214
215         err = alloc_replies(req, num_reply);
216         if (err) {
217                 ceph_osdc_put_request(req);
218                 return ERR_PTR(-ENOMEM);
219         }
220         req->r_num_prealloc_reply = num_reply;
221
222         req->r_osdc = osdc;
223         req->r_mempool = use_mempool;
224         kref_init(&req->r_kref);
225         init_completion(&req->r_completion);
226         init_completion(&req->r_safe_completion);
227         INIT_LIST_HEAD(&req->r_unsafe_item);
228         req->r_flags = flags;
229
230         WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
231
232         /* create message; allow space for oid */
233         msg_size += 40;
234         if (snapc)
235                 msg_size += sizeof(u64) * snapc->num_snaps;
236         if (use_mempool)
237                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
238         else
239                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
240         if (IS_ERR(msg)) {
241                 ceph_osdc_put_request(req);
242                 return ERR_PTR(PTR_ERR(msg));
243         }
244         msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
245         memset(msg->front.iov_base, 0, msg->front.iov_len);
246         head = msg->front.iov_base;
247         op = (void *)(head + 1);
248         p = (void *)(op + num_op);
249
250         req->r_request = msg;
251         req->r_snapc = ceph_get_snap_context(snapc);
252
253         head->client_inc = cpu_to_le32(1); /* always, for now. */
254         head->flags = cpu_to_le32(flags);
255         if (flags & CEPH_OSD_FLAG_WRITE)
256                 ceph_encode_timespec(&head->mtime, mtime);
257         head->num_ops = cpu_to_le16(num_op);
258         op->op = cpu_to_le16(opcode);
259
260         /* calculate max write size */
261         calc_layout(osdc, vino, layout, off, plen, req);
262         req->r_file_layout = *layout;  /* keep a copy */
263
264         if (flags & CEPH_OSD_FLAG_WRITE) {
265                 req->r_request->hdr.data_off = cpu_to_le16(off);
266                 req->r_request->hdr.data_len = cpu_to_le32(*plen);
267                 op->payload_len = cpu_to_le32(*plen);
268         }
269         op->extent.truncate_size = cpu_to_le64(truncate_size);
270         op->extent.truncate_seq = cpu_to_le32(truncate_seq);
271
272         /* fill in oid */
273         head->object_len = cpu_to_le32(req->r_oid_len);
274         memcpy(p, req->r_oid, req->r_oid_len);
275         p += req->r_oid_len;
276
277         if (do_sync) {
278                 op++;
279                 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
280         }
281         if (snapc) {
282                 head->snap_seq = cpu_to_le64(snapc->seq);
283                 head->num_snaps = cpu_to_le32(snapc->num_snaps);
284                 for (i = 0; i < snapc->num_snaps; i++) {
285                         put_unaligned_le64(snapc->snaps[i], p);
286                         p += sizeof(u64);
287                 }
288         }
289
290         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
291         return req;
292 }
293
294 /*
295  * We keep osd requests in an rbtree, sorted by ->r_tid.
296  */
297 static void __insert_request(struct ceph_osd_client *osdc,
298                              struct ceph_osd_request *new)
299 {
300         struct rb_node **p = &osdc->requests.rb_node;
301         struct rb_node *parent = NULL;
302         struct ceph_osd_request *req = NULL;
303
304         while (*p) {
305                 parent = *p;
306                 req = rb_entry(parent, struct ceph_osd_request, r_node);
307                 if (new->r_tid < req->r_tid)
308                         p = &(*p)->rb_left;
309                 else if (new->r_tid > req->r_tid)
310                         p = &(*p)->rb_right;
311                 else
312                         BUG();
313         }
314
315         rb_link_node(&new->r_node, parent, p);
316         rb_insert_color(&new->r_node, &osdc->requests);
317 }
318
319 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
320                                                  u64 tid)
321 {
322         struct ceph_osd_request *req;
323         struct rb_node *n = osdc->requests.rb_node;
324
325         while (n) {
326                 req = rb_entry(n, struct ceph_osd_request, r_node);
327                 if (tid < req->r_tid)
328                         n = n->rb_left;
329                 else if (tid > req->r_tid)
330                         n = n->rb_right;
331                 else
332                         return req;
333         }
334         return NULL;
335 }
336
337 static struct ceph_osd_request *
338 __lookup_request_ge(struct ceph_osd_client *osdc,
339                     u64 tid)
340 {
341         struct ceph_osd_request *req;
342         struct rb_node *n = osdc->requests.rb_node;
343
344         while (n) {
345                 req = rb_entry(n, struct ceph_osd_request, r_node);
346                 if (tid < req->r_tid) {
347                         if (!n->rb_left)
348                                 return req;
349                         n = n->rb_left;
350                 } else if (tid > req->r_tid) {
351                         n = n->rb_right;
352                 } else {
353                         return req;
354                 }
355         }
356         return NULL;
357 }
358
359
360 /*
361  * If the osd connection drops, we need to resubmit all requests.
362  */
363 static void osd_reset(struct ceph_connection *con)
364 {
365         struct ceph_osd *osd = con->private;
366         struct ceph_osd_client *osdc;
367
368         if (!osd)
369                 return;
370         dout("osd_reset osd%d\n", osd->o_osd);
371         osdc = osd->o_osdc;
372         osd->o_incarnation++;
373         down_read(&osdc->map_sem);
374         kick_requests(osdc, osd);
375         up_read(&osdc->map_sem);
376 }
377
378 /*
379  * Track open sessions with osds.
380  */
381 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
382 {
383         struct ceph_osd *osd;
384
385         osd = kzalloc(sizeof(*osd), GFP_NOFS);
386         if (!osd)
387                 return NULL;
388
389         atomic_set(&osd->o_ref, 1);
390         osd->o_osdc = osdc;
391         INIT_LIST_HEAD(&osd->o_requests);
392         INIT_LIST_HEAD(&osd->o_osd_lru);
393         osd->o_incarnation = 1;
394
395         ceph_con_init(osdc->client->msgr, &osd->o_con);
396         osd->o_con.private = osd;
397         osd->o_con.ops = &osd_con_ops;
398         osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
399
400         return osd;
401 }
402
403 static struct ceph_osd *get_osd(struct ceph_osd *osd)
404 {
405         if (atomic_inc_not_zero(&osd->o_ref)) {
406                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
407                      atomic_read(&osd->o_ref));
408                 return osd;
409         } else {
410                 dout("get_osd %p FAIL\n", osd);
411                 return NULL;
412         }
413 }
414
415 static void put_osd(struct ceph_osd *osd)
416 {
417         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
418              atomic_read(&osd->o_ref) - 1);
419         if (atomic_dec_and_test(&osd->o_ref))
420                 kfree(osd);
421 }
422
423 /*
424  * remove an osd from our map
425  */
426 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
427 {
428         dout("__remove_osd %p\n", osd);
429         BUG_ON(!list_empty(&osd->o_requests));
430         rb_erase(&osd->o_node, &osdc->osds);
431         list_del_init(&osd->o_osd_lru);
432         ceph_con_close(&osd->o_con);
433         put_osd(osd);
434 }
435
436 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
437                               struct ceph_osd *osd)
438 {
439         dout("__move_osd_to_lru %p\n", osd);
440         BUG_ON(!list_empty(&osd->o_osd_lru));
441         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
442         osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
443 }
444
445 static void __remove_osd_from_lru(struct ceph_osd *osd)
446 {
447         dout("__remove_osd_from_lru %p\n", osd);
448         if (!list_empty(&osd->o_osd_lru))
449                 list_del_init(&osd->o_osd_lru);
450 }
451
452 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
453 {
454         struct ceph_osd *osd, *nosd;
455
456         dout("__remove_old_osds %p\n", osdc);
457         mutex_lock(&osdc->request_mutex);
458         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
459                 if (!remove_all && time_before(jiffies, osd->lru_ttl))
460                         break;
461                 __remove_osd(osdc, osd);
462         }
463         mutex_unlock(&osdc->request_mutex);
464 }
465
466 /*
467  * reset osd connect
468  */
469 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
470 {
471         int ret = 0;
472
473         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
474         if (list_empty(&osd->o_requests)) {
475                 __remove_osd(osdc, osd);
476         } else {
477                 ceph_con_close(&osd->o_con);
478                 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
479                 osd->o_incarnation++;
480         }
481         return ret;
482 }
483
484 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
485 {
486         struct rb_node **p = &osdc->osds.rb_node;
487         struct rb_node *parent = NULL;
488         struct ceph_osd *osd = NULL;
489
490         while (*p) {
491                 parent = *p;
492                 osd = rb_entry(parent, struct ceph_osd, o_node);
493                 if (new->o_osd < osd->o_osd)
494                         p = &(*p)->rb_left;
495                 else if (new->o_osd > osd->o_osd)
496                         p = &(*p)->rb_right;
497                 else
498                         BUG();
499         }
500
501         rb_link_node(&new->o_node, parent, p);
502         rb_insert_color(&new->o_node, &osdc->osds);
503 }
504
505 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
506 {
507         struct ceph_osd *osd;
508         struct rb_node *n = osdc->osds.rb_node;
509
510         while (n) {
511                 osd = rb_entry(n, struct ceph_osd, o_node);
512                 if (o < osd->o_osd)
513                         n = n->rb_left;
514                 else if (o > osd->o_osd)
515                         n = n->rb_right;
516                 else
517                         return osd;
518         }
519         return NULL;
520 }
521
522
523 /*
524  * Register request, assign tid.  If this is the first request, set up
525  * the timeout event.
526  */
527 static void register_request(struct ceph_osd_client *osdc,
528                              struct ceph_osd_request *req)
529 {
530         mutex_lock(&osdc->request_mutex);
531         req->r_tid = ++osdc->last_tid;
532         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
533
534         dout("register_request %p tid %lld\n", req, req->r_tid);
535         __insert_request(osdc, req);
536         ceph_osdc_get_request(req);
537         osdc->num_requests++;
538
539         req->r_timeout_stamp =
540                 jiffies + osdc->client->mount_args->osd_timeout*HZ;
541
542         if (osdc->num_requests == 1) {
543                 osdc->timeout_tid = req->r_tid;
544                 dout("  timeout on tid %llu at %lu\n", req->r_tid,
545                      req->r_timeout_stamp);
546                 schedule_delayed_work(&osdc->timeout_work,
547                       round_jiffies_relative(req->r_timeout_stamp - jiffies));
548         }
549         mutex_unlock(&osdc->request_mutex);
550 }
551
552 /*
553  * called under osdc->request_mutex
554  */
555 static void __unregister_request(struct ceph_osd_client *osdc,
556                                  struct ceph_osd_request *req)
557 {
558         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
559         rb_erase(&req->r_node, &osdc->requests);
560         osdc->num_requests--;
561
562         if (req->r_osd) {
563                 /* make sure the original request isn't in flight. */
564                 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
565
566                 list_del_init(&req->r_osd_item);
567                 if (list_empty(&req->r_osd->o_requests))
568                         __move_osd_to_lru(osdc, req->r_osd);
569                 req->r_osd = NULL;
570         }
571
572         ceph_osdc_put_request(req);
573
574         if (req->r_tid == osdc->timeout_tid) {
575                 if (osdc->num_requests == 0) {
576                         dout("no requests, canceling timeout\n");
577                         osdc->timeout_tid = 0;
578                         cancel_delayed_work(&osdc->timeout_work);
579                 } else {
580                         req = rb_entry(rb_first(&osdc->requests),
581                                        struct ceph_osd_request, r_node);
582                         osdc->timeout_tid = req->r_tid;
583                         dout("rescheduled timeout on tid %llu at %lu\n",
584                              req->r_tid, req->r_timeout_stamp);
585                         schedule_delayed_work(&osdc->timeout_work,
586                               round_jiffies_relative(req->r_timeout_stamp -
587                                                      jiffies));
588                 }
589         }
590 }
591
592 /*
593  * Cancel a previously queued request message
594  */
595 static void __cancel_request(struct ceph_osd_request *req)
596 {
597         if (req->r_sent) {
598                 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
599                 req->r_sent = 0;
600         }
601 }
602
603 /*
604  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
605  * (as needed), and set the request r_osd appropriately.  If there is
606  * no up osd, set r_osd to NULL.
607  *
608  * Return 0 if unchanged, 1 if changed, or negative on error.
609  *
610  * Caller should hold map_sem for read and request_mutex.
611  */
612 static int __map_osds(struct ceph_osd_client *osdc,
613                       struct ceph_osd_request *req)
614 {
615         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
616         struct ceph_pg pgid;
617         int o = -1;
618         int err;
619         struct ceph_osd *newosd = NULL;
620
621         dout("map_osds %p tid %lld\n", req, req->r_tid);
622         err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
623                                       &req->r_file_layout, osdc->osdmap);
624         if (err)
625                 return err;
626         pgid = reqhead->layout.ol_pgid;
627         req->r_pgid = pgid;
628
629         o = ceph_calc_pg_primary(osdc->osdmap, pgid);
630
631         if ((req->r_osd && req->r_osd->o_osd == o &&
632              req->r_sent >= req->r_osd->o_incarnation) ||
633             (req->r_osd == NULL && o == -1))
634                 return 0;  /* no change */
635
636         dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
637              req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
638              req->r_osd ? req->r_osd->o_osd : -1);
639
640         if (req->r_osd) {
641                 __cancel_request(req);
642                 list_del_init(&req->r_osd_item);
643                 if (list_empty(&req->r_osd->o_requests)) {
644                         /* try to re-use r_osd if possible */
645                         newosd = get_osd(req->r_osd);
646                         __remove_osd(osdc, newosd);
647                 }
648                 req->r_osd = NULL;
649         }
650
651         req->r_osd = __lookup_osd(osdc, o);
652         if (!req->r_osd && o >= 0) {
653                 if (newosd) {
654                         req->r_osd = newosd;
655                         newosd = NULL;
656                 } else {
657                         err = -ENOMEM;
658                         req->r_osd = create_osd(osdc);
659                         if (!req->r_osd)
660                                 goto out;
661                 }
662
663                 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
664                 req->r_osd->o_osd = o;
665                 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
666                 __insert_osd(osdc, req->r_osd);
667
668                 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
669         }
670
671         if (req->r_osd) {
672                 __remove_osd_from_lru(req->r_osd);
673                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
674         }
675         err = 1;   /* osd changed */
676
677 out:
678         if (newosd)
679                 put_osd(newosd);
680         return err;
681 }
682
683 /*
684  * caller should hold map_sem (for read) and request_mutex
685  */
686 static int __send_request(struct ceph_osd_client *osdc,
687                           struct ceph_osd_request *req)
688 {
689         struct ceph_osd_request_head *reqhead;
690         int err;
691
692         err = __map_osds(osdc, req);
693         if (err < 0)
694                 return err;
695         if (req->r_osd == NULL) {
696                 dout("send_request %p no up osds in pg\n", req);
697                 ceph_monc_request_next_osdmap(&osdc->client->monc);
698                 return 0;
699         }
700
701         dout("send_request %p tid %llu to osd%d flags %d\n",
702              req, req->r_tid, req->r_osd->o_osd, req->r_flags);
703
704         reqhead = req->r_request->front.iov_base;
705         reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
706         reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
707         reqhead->reassert_version = req->r_reassert_version;
708
709         req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
710
711         ceph_msg_get(req->r_request); /* send consumes a ref */
712         ceph_con_send(&req->r_osd->o_con, req->r_request);
713         req->r_sent = req->r_osd->o_incarnation;
714         return 0;
715 }
716
717 /*
718  * Timeout callback, called every N seconds when 1 or more osd
719  * requests has been active for more than N seconds.  When this
720  * happens, we ping all OSDs with requests who have timed out to
721  * ensure any communications channel reset is detected.  Reset the
722  * request timeouts another N seconds in the future as we go.
723  * Reschedule the timeout event another N seconds in future (unless
724  * there are no open requests).
725  */
726 static void handle_timeout(struct work_struct *work)
727 {
728         struct ceph_osd_client *osdc =
729                 container_of(work, struct ceph_osd_client, timeout_work.work);
730         struct ceph_osd_request *req;
731         struct ceph_osd *osd;
732         unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
733         unsigned long next_timeout = timeout + jiffies;
734         struct rb_node *p;
735
736         dout("timeout\n");
737         down_read(&osdc->map_sem);
738
739         ceph_monc_request_next_osdmap(&osdc->client->monc);
740
741         mutex_lock(&osdc->request_mutex);
742         for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
743                 req = rb_entry(p, struct ceph_osd_request, r_node);
744
745                 if (req->r_resend) {
746                         int err;
747
748                         dout("osdc resending prev failed %lld\n", req->r_tid);
749                         err = __send_request(osdc, req);
750                         if (err)
751                                 dout("osdc failed again on %lld\n", req->r_tid);
752                         else
753                                 req->r_resend = false;
754                         continue;
755                 }
756         }
757         for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
758                 osd = rb_entry(p, struct ceph_osd, o_node);
759                 if (list_empty(&osd->o_requests))
760                         continue;
761                 req = list_first_entry(&osd->o_requests,
762                                        struct ceph_osd_request, r_osd_item);
763                 if (time_before(jiffies, req->r_timeout_stamp))
764                         continue;
765
766                 dout(" tid %llu (at least) timed out on osd%d\n",
767                      req->r_tid, osd->o_osd);
768                 req->r_timeout_stamp = next_timeout;
769                 ceph_con_keepalive(&osd->o_con);
770         }
771
772         if (osdc->timeout_tid)
773                 schedule_delayed_work(&osdc->timeout_work,
774                                       round_jiffies_relative(timeout));
775
776         mutex_unlock(&osdc->request_mutex);
777
778         up_read(&osdc->map_sem);
779 }
780
781 static void handle_osds_timeout(struct work_struct *work)
782 {
783         struct ceph_osd_client *osdc =
784                 container_of(work, struct ceph_osd_client,
785                              osds_timeout_work.work);
786         unsigned long delay =
787                 osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
788
789         dout("osds timeout\n");
790         down_read(&osdc->map_sem);
791         remove_old_osds(osdc, 0);
792         up_read(&osdc->map_sem);
793
794         schedule_delayed_work(&osdc->osds_timeout_work,
795                               round_jiffies_relative(delay));
796 }
797
798 /*
799  * handle osd op reply.  either call the callback if it is specified,
800  * or do the completion to wake up the waiting thread.
801  */
802 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
803                          struct ceph_connection *con)
804 {
805         struct ceph_osd_reply_head *rhead = msg->front.iov_base;
806         struct ceph_osd_request *req;
807         u64 tid;
808         int numops, object_len, flags;
809
810         tid = le64_to_cpu(msg->hdr.tid);
811         if (msg->front.iov_len < sizeof(*rhead))
812                 goto bad;
813         numops = le32_to_cpu(rhead->num_ops);
814         object_len = le32_to_cpu(rhead->object_len);
815         if (msg->front.iov_len != sizeof(*rhead) + object_len +
816             numops * sizeof(struct ceph_osd_op))
817                 goto bad;
818         dout("handle_reply %p tid %llu\n", msg, tid);
819
820         /* lookup */
821         mutex_lock(&osdc->request_mutex);
822         req = __lookup_request(osdc, tid);
823         if (req == NULL) {
824                 dout("handle_reply tid %llu dne\n", tid);
825                 mutex_unlock(&osdc->request_mutex);
826                 return;
827         }
828         ceph_osdc_get_request(req);
829         flags = le32_to_cpu(rhead->flags);
830
831         /*
832          * if this connection filled our message, drop our reference now, to
833          * avoid a (safe but slower) revoke later.
834          */
835         if (req->r_con_filling_msg == con && req->r_reply == msg) {
836                 dout(" got pages, dropping con_filling_msg ref %p\n", con);
837                 req->r_con_filling_msg = NULL;
838                 ceph_con_put(con);
839         }
840
841         if (req->r_reply) {
842                 /*
843                  * once we see the message has been received, we don't
844                  * need a ref (which is only needed for revoking
845                  * pages)
846                  */
847                 ceph_msg_put(req->r_reply);
848                 req->r_reply = NULL;
849         }
850
851         if (!req->r_got_reply) {
852                 unsigned bytes;
853
854                 req->r_result = le32_to_cpu(rhead->result);
855                 bytes = le32_to_cpu(msg->hdr.data_len);
856                 dout("handle_reply result %d bytes %d\n", req->r_result,
857                      bytes);
858                 if (req->r_result == 0)
859                         req->r_result = bytes;
860
861                 /* in case this is a write and we need to replay, */
862                 req->r_reassert_version = rhead->reassert_version;
863
864                 req->r_got_reply = 1;
865         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
866                 dout("handle_reply tid %llu dup ack\n", tid);
867                 mutex_unlock(&osdc->request_mutex);
868                 goto done;
869         }
870
871         dout("handle_reply tid %llu flags %d\n", tid, flags);
872
873         /* either this is a read, or we got the safe response */
874         if ((flags & CEPH_OSD_FLAG_ONDISK) ||
875             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
876                 __unregister_request(osdc, req);
877
878         mutex_unlock(&osdc->request_mutex);
879
880         if (req->r_callback)
881                 req->r_callback(req, msg);
882         else
883                 complete(&req->r_completion);
884
885         if (flags & CEPH_OSD_FLAG_ONDISK) {
886                 if (req->r_safe_callback)
887                         req->r_safe_callback(req, msg);
888                 complete(&req->r_safe_completion);  /* fsync waiter */
889         }
890
891 done:
892         ceph_osdc_put_request(req);
893         return;
894
895 bad:
896         pr_err("corrupt osd_op_reply got %d %d expected %d\n",
897                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
898                (int)sizeof(*rhead));
899         ceph_msg_dump(msg);
900 }
901
902
903 /*
904  * Resubmit osd requests whose osd or osd address has changed.  Request
905  * a new osd map if osds are down, or we are otherwise unable to determine
906  * how to direct a request.
907  *
908  * Close connections to down osds.
909  *
910  * If @who is specified, resubmit requests for that specific osd.
911  *
912  * Caller should hold map_sem for read and request_mutex.
913  */
914 static void kick_requests(struct ceph_osd_client *osdc,
915                           struct ceph_osd *kickosd)
916 {
917         struct ceph_osd_request *req;
918         struct rb_node *p, *n;
919         int needmap = 0;
920         int err;
921
922         dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
923         mutex_lock(&osdc->request_mutex);
924         if (!kickosd) {
925                 for (p = rb_first(&osdc->osds); p; p = n) {
926                         struct ceph_osd *osd =
927                                 rb_entry(p, struct ceph_osd, o_node);
928
929                         n = rb_next(p);
930                         if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
931                             memcmp(&osd->o_con.peer_addr,
932                                    ceph_osd_addr(osdc->osdmap,
933                                                  osd->o_osd),
934                                    sizeof(struct ceph_entity_addr)) != 0)
935                                 __reset_osd(osdc, osd);
936                 }
937         }
938
939         for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
940                 req = rb_entry(p, struct ceph_osd_request, r_node);
941
942                 if (req->r_resend) {
943                         dout(" r_resend set on tid %llu\n", req->r_tid);
944                         __cancel_request(req);
945                         goto kick;
946                 }
947                 if (req->r_osd && kickosd == req->r_osd) {
948                         __cancel_request(req);
949                         goto kick;
950                 }
951
952                 err = __map_osds(osdc, req);
953                 if (err == 0)
954                         continue;  /* no change */
955                 if (err < 0) {
956                         /*
957                          * FIXME: really, we should set the request
958                          * error and fail if this isn't a 'nofail'
959                          * request, but that's a fair bit more
960                          * complicated to do.  So retry!
961                          */
962                         dout(" setting r_resend on %llu\n", req->r_tid);
963                         req->r_resend = true;
964                         continue;
965                 }
966                 if (req->r_osd == NULL) {
967                         dout("tid %llu maps to no valid osd\n", req->r_tid);
968                         needmap++;  /* request a newer map */
969                         continue;
970                 }
971
972 kick:
973                 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
974                      req->r_osd->o_osd);
975                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
976                 err = __send_request(osdc, req);
977                 if (err) {
978                         dout(" setting r_resend on %llu\n", req->r_tid);
979                         req->r_resend = true;
980                 }
981         }
982         mutex_unlock(&osdc->request_mutex);
983
984         if (needmap) {
985                 dout("%d requests for down osds, need new map\n", needmap);
986                 ceph_monc_request_next_osdmap(&osdc->client->monc);
987         }
988 }
989
990 /*
991  * Process updated osd map.
992  *
993  * The message contains any number of incremental and full maps, normally
994  * indicating some sort of topology change in the cluster.  Kick requests
995  * off to different OSDs as needed.
996  */
997 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
998 {
999         void *p, *end, *next;
1000         u32 nr_maps, maplen;
1001         u32 epoch;
1002         struct ceph_osdmap *newmap = NULL, *oldmap;
1003         int err;
1004         struct ceph_fsid fsid;
1005
1006         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1007         p = msg->front.iov_base;
1008         end = p + msg->front.iov_len;
1009
1010         /* verify fsid */
1011         ceph_decode_need(&p, end, sizeof(fsid), bad);
1012         ceph_decode_copy(&p, &fsid, sizeof(fsid));
1013         if (ceph_check_fsid(osdc->client, &fsid) < 0)
1014                 return;
1015
1016         down_write(&osdc->map_sem);
1017
1018         /* incremental maps */
1019         ceph_decode_32_safe(&p, end, nr_maps, bad);
1020         dout(" %d inc maps\n", nr_maps);
1021         while (nr_maps > 0) {
1022                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1023                 epoch = ceph_decode_32(&p);
1024                 maplen = ceph_decode_32(&p);
1025                 ceph_decode_need(&p, end, maplen, bad);
1026                 next = p + maplen;
1027                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1028                         dout("applying incremental map %u len %d\n",
1029                              epoch, maplen);
1030                         newmap = osdmap_apply_incremental(&p, next,
1031                                                           osdc->osdmap,
1032                                                           osdc->client->msgr);
1033                         if (IS_ERR(newmap)) {
1034                                 err = PTR_ERR(newmap);
1035                                 goto bad;
1036                         }
1037                         BUG_ON(!newmap);
1038                         if (newmap != osdc->osdmap) {
1039                                 ceph_osdmap_destroy(osdc->osdmap);
1040                                 osdc->osdmap = newmap;
1041                         }
1042                 } else {
1043                         dout("ignoring incremental map %u len %d\n",
1044                              epoch, maplen);
1045                 }
1046                 p = next;
1047                 nr_maps--;
1048         }
1049         if (newmap)
1050                 goto done;
1051
1052         /* full maps */
1053         ceph_decode_32_safe(&p, end, nr_maps, bad);
1054         dout(" %d full maps\n", nr_maps);
1055         while (nr_maps) {
1056                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1057                 epoch = ceph_decode_32(&p);
1058                 maplen = ceph_decode_32(&p);
1059                 ceph_decode_need(&p, end, maplen, bad);
1060                 if (nr_maps > 1) {
1061                         dout("skipping non-latest full map %u len %d\n",
1062                              epoch, maplen);
1063                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1064                         dout("skipping full map %u len %d, "
1065                              "older than our %u\n", epoch, maplen,
1066                              osdc->osdmap->epoch);
1067                 } else {
1068                         dout("taking full map %u len %d\n", epoch, maplen);
1069                         newmap = osdmap_decode(&p, p+maplen);
1070                         if (IS_ERR(newmap)) {
1071                                 err = PTR_ERR(newmap);
1072                                 goto bad;
1073                         }
1074                         BUG_ON(!newmap);
1075                         oldmap = osdc->osdmap;
1076                         osdc->osdmap = newmap;
1077                         if (oldmap)
1078                                 ceph_osdmap_destroy(oldmap);
1079                 }
1080                 p += maplen;
1081                 nr_maps--;
1082         }
1083
1084 done:
1085         downgrade_write(&osdc->map_sem);
1086         ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1087         if (newmap)
1088                 kick_requests(osdc, NULL);
1089         up_read(&osdc->map_sem);
1090         return;
1091
1092 bad:
1093         pr_err("osdc handle_map corrupt msg\n");
1094         ceph_msg_dump(msg);
1095         up_write(&osdc->map_sem);
1096         return;
1097 }
1098
1099
1100 /*
1101  * A read request prepares specific pages that data is to be read into.
1102  * When a message is being read off the wire, we call prepare_pages to
1103  * find those pages.
1104  *  0 = success, -1 failure.
1105  */
1106 static int __prepare_pages(struct ceph_connection *con,
1107                          struct ceph_msg_header *hdr,
1108                          struct ceph_osd_request *req,
1109                          u64 tid,
1110                          struct ceph_msg *m)
1111 {
1112         struct ceph_osd *osd = con->private;
1113         struct ceph_osd_client *osdc;
1114         int ret = -1;
1115         int data_len = le32_to_cpu(hdr->data_len);
1116         unsigned data_off = le16_to_cpu(hdr->data_off);
1117
1118         int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1119
1120         if (!osd)
1121                 return -1;
1122
1123         osdc = osd->o_osdc;
1124
1125         dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
1126              tid, req->r_num_pages, want);
1127         if (unlikely(req->r_num_pages < want))
1128                 goto out;
1129         m->pages = req->r_pages;
1130         m->nr_pages = req->r_num_pages;
1131         ret = 0; /* success */
1132 out:
1133         BUG_ON(ret < 0 || m->nr_pages < want);
1134
1135         return ret;
1136 }
1137
1138 /*
1139  * Register request, send initial attempt.
1140  */
1141 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1142                             struct ceph_osd_request *req,
1143                             bool nofail)
1144 {
1145         int rc = 0;
1146
1147         req->r_request->pages = req->r_pages;
1148         req->r_request->nr_pages = req->r_num_pages;
1149
1150         register_request(osdc, req);
1151
1152         down_read(&osdc->map_sem);
1153         mutex_lock(&osdc->request_mutex);
1154         /*
1155          * a racing kick_requests() may have sent the message for us
1156          * while we dropped request_mutex above, so only send now if
1157          * the request still han't been touched yet.
1158          */
1159         if (req->r_sent == 0) {
1160                 rc = __send_request(osdc, req);
1161                 if (rc) {
1162                         if (nofail) {
1163                                 dout("osdc_start_request failed send, "
1164                                      " marking %lld\n", req->r_tid);
1165                                 req->r_resend = true;
1166                                 rc = 0;
1167                         } else {
1168                                 __unregister_request(osdc, req);
1169                         }
1170                 }
1171         }
1172         mutex_unlock(&osdc->request_mutex);
1173         up_read(&osdc->map_sem);
1174         return rc;
1175 }
1176
1177 /*
1178  * wait for a request to complete
1179  */
1180 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1181                            struct ceph_osd_request *req)
1182 {
1183         int rc;
1184
1185         rc = wait_for_completion_interruptible(&req->r_completion);
1186         if (rc < 0) {
1187                 mutex_lock(&osdc->request_mutex);
1188                 __cancel_request(req);
1189                 __unregister_request(osdc, req);
1190                 mutex_unlock(&osdc->request_mutex);
1191                 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1192                 return rc;
1193         }
1194
1195         dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1196         return req->r_result;
1197 }
1198
1199 /*
1200  * sync - wait for all in-flight requests to flush.  avoid starvation.
1201  */
1202 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1203 {
1204         struct ceph_osd_request *req;
1205         u64 last_tid, next_tid = 0;
1206
1207         mutex_lock(&osdc->request_mutex);
1208         last_tid = osdc->last_tid;
1209         while (1) {
1210                 req = __lookup_request_ge(osdc, next_tid);
1211                 if (!req)
1212                         break;
1213                 if (req->r_tid > last_tid)
1214                         break;
1215
1216                 next_tid = req->r_tid + 1;
1217                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1218                         continue;
1219
1220                 ceph_osdc_get_request(req);
1221                 mutex_unlock(&osdc->request_mutex);
1222                 dout("sync waiting on tid %llu (last is %llu)\n",
1223                      req->r_tid, last_tid);
1224                 wait_for_completion(&req->r_safe_completion);
1225                 mutex_lock(&osdc->request_mutex);
1226                 ceph_osdc_put_request(req);
1227         }
1228         mutex_unlock(&osdc->request_mutex);
1229         dout("sync done (thru tid %llu)\n", last_tid);
1230 }
1231
1232 /*
1233  * init, shutdown
1234  */
1235 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1236 {
1237         int err;
1238
1239         dout("init\n");
1240         osdc->client = client;
1241         osdc->osdmap = NULL;
1242         init_rwsem(&osdc->map_sem);
1243         init_completion(&osdc->map_waiters);
1244         osdc->last_requested_map = 0;
1245         mutex_init(&osdc->request_mutex);
1246         osdc->timeout_tid = 0;
1247         osdc->last_tid = 0;
1248         osdc->osds = RB_ROOT;
1249         INIT_LIST_HEAD(&osdc->osd_lru);
1250         osdc->requests = RB_ROOT;
1251         osdc->num_requests = 0;
1252         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1253         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1254
1255         schedule_delayed_work(&osdc->osds_timeout_work,
1256            round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
1257
1258         err = -ENOMEM;
1259         osdc->req_mempool = mempool_create_kmalloc_pool(10,
1260                                         sizeof(struct ceph_osd_request));
1261         if (!osdc->req_mempool)
1262                 goto out;
1263
1264         err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
1265         if (err < 0)
1266                 goto out_mempool;
1267         return 0;
1268
1269 out_mempool:
1270         mempool_destroy(osdc->req_mempool);
1271 out:
1272         return err;
1273 }
1274
1275 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1276 {
1277         cancel_delayed_work_sync(&osdc->timeout_work);
1278         cancel_delayed_work_sync(&osdc->osds_timeout_work);
1279         if (osdc->osdmap) {
1280                 ceph_osdmap_destroy(osdc->osdmap);
1281                 osdc->osdmap = NULL;
1282         }
1283         remove_old_osds(osdc, 1);
1284         mempool_destroy(osdc->req_mempool);
1285         ceph_msgpool_destroy(&osdc->msgpool_op);
1286 }
1287
1288 /*
1289  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1290  * *plen.  Return number of bytes read, or error.
1291  */
1292 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1293                         struct ceph_vino vino, struct ceph_file_layout *layout,
1294                         u64 off, u64 *plen,
1295                         u32 truncate_seq, u64 truncate_size,
1296                         struct page **pages, int num_pages)
1297 {
1298         struct ceph_osd_request *req;
1299         int rc = 0;
1300
1301         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1302              vino.snap, off, *plen);
1303         req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1304                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1305                                     NULL, 0, truncate_seq, truncate_size, NULL,
1306                                     false, 1);
1307         if (IS_ERR(req))
1308                 return PTR_ERR(req);
1309
1310         /* it may be a short read due to an object boundary */
1311         req->r_pages = pages;
1312         num_pages = calc_pages_for(off, *plen);
1313         req->r_num_pages = num_pages;
1314
1315         dout("readpages  final extent is %llu~%llu (%d pages)\n",
1316              off, *plen, req->r_num_pages);
1317
1318         rc = ceph_osdc_start_request(osdc, req, false);
1319         if (!rc)
1320                 rc = ceph_osdc_wait_request(osdc, req);
1321
1322         ceph_osdc_put_request(req);
1323         dout("readpages result %d\n", rc);
1324         return rc;
1325 }
1326
1327 /*
1328  * do a synchronous write on N pages
1329  */
1330 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1331                          struct ceph_file_layout *layout,
1332                          struct ceph_snap_context *snapc,
1333                          u64 off, u64 len,
1334                          u32 truncate_seq, u64 truncate_size,
1335                          struct timespec *mtime,
1336                          struct page **pages, int num_pages,
1337                          int flags, int do_sync, bool nofail)
1338 {
1339         struct ceph_osd_request *req;
1340         int rc = 0;
1341
1342         BUG_ON(vino.snap != CEPH_NOSNAP);
1343         req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1344                                     CEPH_OSD_OP_WRITE,
1345                                     flags | CEPH_OSD_FLAG_ONDISK |
1346                                             CEPH_OSD_FLAG_WRITE,
1347                                     snapc, do_sync,
1348                                     truncate_seq, truncate_size, mtime,
1349                                     nofail, 1);
1350         if (IS_ERR(req))
1351                 return PTR_ERR(req);
1352
1353         /* it may be a short write due to an object boundary */
1354         req->r_pages = pages;
1355         req->r_num_pages = calc_pages_for(off, len);
1356         dout("writepages %llu~%llu (%d pages)\n", off, len,
1357              req->r_num_pages);
1358
1359         rc = ceph_osdc_start_request(osdc, req, nofail);
1360         if (!rc)
1361                 rc = ceph_osdc_wait_request(osdc, req);
1362
1363         ceph_osdc_put_request(req);
1364         if (rc == 0)
1365                 rc = len;
1366         dout("writepages result %d\n", rc);
1367         return rc;
1368 }
1369
1370 /*
1371  * handle incoming message
1372  */
1373 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1374 {
1375         struct ceph_osd *osd = con->private;
1376         struct ceph_osd_client *osdc;
1377         int type = le16_to_cpu(msg->hdr.type);
1378
1379         if (!osd)
1380                 return;
1381         osdc = osd->o_osdc;
1382
1383         switch (type) {
1384         case CEPH_MSG_OSD_MAP:
1385                 ceph_osdc_handle_map(osdc, msg);
1386                 break;
1387         case CEPH_MSG_OSD_OPREPLY:
1388                 handle_reply(osdc, msg, con);
1389                 break;
1390
1391         default:
1392                 pr_err("received unknown message type %d %s\n", type,
1393                        ceph_msg_type_name(type));
1394         }
1395         ceph_msg_put(msg);
1396 }
1397
1398 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1399                                   struct ceph_msg_header *hdr,
1400                                   int *skip)
1401 {
1402         struct ceph_osd *osd = con->private;
1403         struct ceph_osd_client *osdc = osd->o_osdc;
1404         int type = le16_to_cpu(hdr->type);
1405         int front = le32_to_cpu(hdr->front_len);
1406         int data_len = le32_to_cpu(hdr->data_len);
1407         struct ceph_msg *m;
1408         struct ceph_osd_request *req;
1409         u64 tid;
1410         int err;
1411
1412         *skip = 0;
1413         if (type != CEPH_MSG_OSD_OPREPLY)
1414                 return NULL;
1415
1416         tid = le64_to_cpu(hdr->tid);
1417         mutex_lock(&osdc->request_mutex);
1418         req = __lookup_request(osdc, tid);
1419         if (!req) {
1420                 *skip = 1;
1421                 m = NULL;
1422                 dout("alloc_msg unknown tid %llu\n", tid);
1423                 goto out;
1424         }
1425         m = __get_next_reply(con, req, front);
1426         if (!m || IS_ERR(m)) {
1427                 *skip = 1;
1428                 goto out;
1429         }
1430
1431         if (data_len > 0) {
1432                 err = __prepare_pages(con, hdr, req, tid, m);
1433                 if (err < 0) {
1434                         *skip = 1;
1435                         ceph_msg_put(m);
1436                         m = ERR_PTR(err);
1437                 }
1438         }
1439
1440 out:
1441         mutex_unlock(&osdc->request_mutex);
1442
1443         return m;
1444 }
1445
1446 /*
1447  * Wrappers to refcount containing ceph_osd struct
1448  */
1449 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1450 {
1451         struct ceph_osd *osd = con->private;
1452         if (get_osd(osd))
1453                 return con;
1454         return NULL;
1455 }
1456
1457 static void put_osd_con(struct ceph_connection *con)
1458 {
1459         struct ceph_osd *osd = con->private;
1460         put_osd(osd);
1461 }
1462
1463 /*
1464  * authentication
1465  */
1466 static int get_authorizer(struct ceph_connection *con,
1467                           void **buf, int *len, int *proto,
1468                           void **reply_buf, int *reply_len, int force_new)
1469 {
1470         struct ceph_osd *o = con->private;
1471         struct ceph_osd_client *osdc = o->o_osdc;
1472         struct ceph_auth_client *ac = osdc->client->monc.auth;
1473         int ret = 0;
1474
1475         if (force_new && o->o_authorizer) {
1476                 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1477                 o->o_authorizer = NULL;
1478         }
1479         if (o->o_authorizer == NULL) {
1480                 ret = ac->ops->create_authorizer(
1481                         ac, CEPH_ENTITY_TYPE_OSD,
1482                         &o->o_authorizer,
1483                         &o->o_authorizer_buf,
1484                         &o->o_authorizer_buf_len,
1485                         &o->o_authorizer_reply_buf,
1486                         &o->o_authorizer_reply_buf_len);
1487                 if (ret)
1488                 return ret;
1489         }
1490
1491         *proto = ac->protocol;
1492         *buf = o->o_authorizer_buf;
1493         *len = o->o_authorizer_buf_len;
1494         *reply_buf = o->o_authorizer_reply_buf;
1495         *reply_len = o->o_authorizer_reply_buf_len;
1496         return 0;
1497 }
1498
1499
1500 static int verify_authorizer_reply(struct ceph_connection *con, int len)
1501 {
1502         struct ceph_osd *o = con->private;
1503         struct ceph_osd_client *osdc = o->o_osdc;
1504         struct ceph_auth_client *ac = osdc->client->monc.auth;
1505
1506         return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1507 }
1508
1509 static int invalidate_authorizer(struct ceph_connection *con)
1510 {
1511         struct ceph_osd *o = con->private;
1512         struct ceph_osd_client *osdc = o->o_osdc;
1513         struct ceph_auth_client *ac = osdc->client->monc.auth;
1514
1515         if (ac->ops->invalidate_authorizer)
1516                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1517
1518         return ceph_monc_validate_auth(&osdc->client->monc);
1519 }
1520
1521 const static struct ceph_connection_operations osd_con_ops = {
1522         .get = get_osd_con,
1523         .put = put_osd_con,
1524         .dispatch = dispatch,
1525         .get_authorizer = get_authorizer,
1526         .verify_authorizer_reply = verify_authorizer_reply,
1527         .invalidate_authorizer = invalidate_authorizer,
1528         .alloc_msg = alloc_msg,
1529         .fault = osd_reset,
1530 };