]> bbs.cooldavid.org Git - net-next-2.6.git/blob - drivers/block/rbd.c
c42f3058abc79de903b979b2a1fe9ea6dbc74ed0
[net-next-2.6.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    Instructions for use
25    --------------------
26
27    1) Map a Linux block device to an existing rbd image.
28
29       Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
30
31       $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
32
33       The snapshot name can be "-" or omitted to map the image read/write.
34
35    2) List all active blkdev<->object mappings.
36
37       In this example, we have performed step #1 twice, creating two blkdevs,
38       mapped to two separate rados objects in the rados rbd pool
39
40       $ cat /sys/class/rbd/list
41       #id     major   client_name     pool    name    snap    KB
42       0       254     client4143      rbd     foo     -      1024000
43
44       The columns, in order, are:
45       - blkdev unique id
46       - blkdev assigned major
47       - rados client id
48       - rados pool name
49       - rados block device name
50       - mapped snapshot ("-" if none)
51       - device size in KB
52
53
54    3) Create a snapshot.
55
56       Usage: <blkdev id> <snapname>
57
58       $ echo "0 mysnap" > /sys/class/rbd/snap_create
59
60
61    4) Listing a snapshot.
62
63       $ cat /sys/class/rbd/snaps_list
64       #id     snap    KB
65       0       -       1024000 (*)
66       0       foo     1024000
67
68       The columns, in order, are:
69       - blkdev unique id
70       - snapshot name, '-' means none (active read/write version)
71       - size of device at time of snapshot
72       - the (*) indicates this is the active version
73
74    5) Rollback to snapshot.
75
76       Usage: <blkdev id> <snapname>
77
78       $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
79
80
81    6) Mapping an image using snapshot.
82
83       A snapshot mapping is read-only. This is being done by passing
84       snap=<snapname> to the options when adding a device.
85
86       $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
87
88
89    7) Remove an active blkdev<->rbd image mapping.
90
91       In this example, we remove the mapping with blkdev unique id 1.
92
93       $ echo 1 > /sys/class/rbd/remove
94
95
96    NOTE:  The actual creation and deletion of rados objects is outside the scope
97    of this driver.
98
99  */
100
101 #include <linux/ceph/libceph.h>
102 #include <linux/ceph/osd_client.h>
103 #include <linux/ceph/mon_client.h>
104 #include <linux/ceph/decode.h>
105
106 #include <linux/kernel.h>
107 #include <linux/device.h>
108 #include <linux/module.h>
109 #include <linux/fs.h>
110 #include <linux/blkdev.h>
111
112 #include "rbd_types.h"
113
114 #define DRV_NAME "rbd"
115 #define DRV_NAME_LONG "rbd (rados block device)"
116
117 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
118
119 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
120 #define RBD_MAX_POOL_NAME_LEN   64
121 #define RBD_MAX_SNAP_NAME_LEN   32
122 #define RBD_MAX_OPT_LEN         1024
123
124 #define RBD_SNAP_HEAD_NAME      "-"
125
126 #define DEV_NAME_LEN            32
127
128 /*
129  * block device image metadata (in-memory version)
130  */
131 struct rbd_image_header {
132         u64 image_size;
133         char block_name[32];
134         __u8 obj_order;
135         __u8 crypt_type;
136         __u8 comp_type;
137         struct rw_semaphore snap_rwsem;
138         struct ceph_snap_context *snapc;
139         size_t snap_names_len;
140         u64 snap_seq;
141         u32 total_snaps;
142
143         char *snap_names;
144         u64 *snap_sizes;
145 };
146
147 /*
148  * an instance of the client.  multiple devices may share a client.
149  */
150 struct rbd_client {
151         struct ceph_client      *client;
152         struct kref             kref;
153         struct list_head        node;
154 };
155
156 /*
157  * a single io request
158  */
159 struct rbd_request {
160         struct request          *rq;            /* blk layer request */
161         struct bio              *bio;           /* cloned bio */
162         struct page             **pages;        /* list of used pages */
163         u64                     len;
164 };
165
166 /*
167  * a single device
168  */
169 struct rbd_device {
170         int                     id;             /* blkdev unique id */
171
172         int                     major;          /* blkdev assigned major */
173         struct gendisk          *disk;          /* blkdev's gendisk and rq */
174         struct request_queue    *q;
175
176         struct ceph_client      *client;
177         struct rbd_client       *rbd_client;
178
179         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181         spinlock_t              lock;           /* queue lock */
182
183         struct rbd_image_header header;
184         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
185         int                     obj_len;
186         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
188         int                     poolid;
189
190         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
191         u32 cur_snap;   /* index+1 of current snapshot within snap context
192                            0 - for the head */
193         int read_only;
194
195         struct list_head        node;
196 };
197
198 static spinlock_t node_lock;      /* protects client get/put */
199
200 static struct class *class_rbd;   /* /sys/class/rbd */
201 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
202 static LIST_HEAD(rbd_dev_list);    /* devices */
203 static LIST_HEAD(rbd_client_list);      /* clients */
204
205
206 static int rbd_open(struct block_device *bdev, fmode_t mode)
207 {
208         struct gendisk *disk = bdev->bd_disk;
209         struct rbd_device *rbd_dev = disk->private_data;
210
211         set_device_ro(bdev, rbd_dev->read_only);
212
213         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
214                 return -EROFS;
215
216         return 0;
217 }
218
219 static const struct block_device_operations rbd_bd_ops = {
220         .owner                  = THIS_MODULE,
221         .open                   = rbd_open,
222 };
223
224 /*
225  * Initialize an rbd client instance.
226  * We own *opt.
227  */
228 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
229 {
230         struct rbd_client *rbdc;
231         int ret = -ENOMEM;
232
233         dout("rbd_client_create\n");
234         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
235         if (!rbdc)
236                 goto out_opt;
237
238         kref_init(&rbdc->kref);
239         INIT_LIST_HEAD(&rbdc->node);
240
241         rbdc->client = ceph_create_client(opt, rbdc);
242         if (IS_ERR(rbdc->client))
243                 goto out_rbdc;
244
245         ret = ceph_open_session(rbdc->client);
246         if (ret < 0)
247                 goto out_err;
248
249         spin_lock(&node_lock);
250         list_add_tail(&rbdc->node, &rbd_client_list);
251         spin_unlock(&node_lock);
252
253         dout("rbd_client_create created %p\n", rbdc);
254         return rbdc;
255
256 out_err:
257         ceph_destroy_client(rbdc->client);
258         return ERR_PTR(ret);
259
260 out_rbdc:
261         kfree(rbdc);
262 out_opt:
263         ceph_destroy_options(opt);
264         return ERR_PTR(-ENOMEM);
265 }
266
267 /*
268  * Find a ceph client with specific addr and configuration.
269  */
270 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
271 {
272         struct rbd_client *client_node;
273
274         if (opt->flags & CEPH_OPT_NOSHARE)
275                 return NULL;
276
277         list_for_each_entry(client_node, &rbd_client_list, node)
278                 if (ceph_compare_options(opt, client_node->client) == 0)
279                         return client_node;
280         return NULL;
281 }
282
283 /*
284  * Get a ceph client with specific addr and configuration, if one does
285  * not exist create it.
286  */
287 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
288                           char *options)
289 {
290         struct rbd_client *rbdc;
291         struct ceph_options *opt;
292         int ret;
293
294         ret = ceph_parse_options(&opt, options, mon_addr,
295                                  mon_addr + strlen(mon_addr), NULL, NULL);
296         if (ret < 0)
297                 return ret;
298
299         spin_lock(&node_lock);
300         rbdc = __rbd_client_find(opt);
301         if (rbdc) {
302                 ceph_destroy_options(opt);
303
304                 /* using an existing client */
305                 kref_get(&rbdc->kref);
306                 rbd_dev->rbd_client = rbdc;
307                 rbd_dev->client = rbdc->client;
308                 spin_unlock(&node_lock);
309                 return 0;
310         }
311         spin_unlock(&node_lock);
312
313         rbdc = rbd_client_create(opt);
314         if (IS_ERR(rbdc))
315                 return PTR_ERR(rbdc);
316
317         rbd_dev->rbd_client = rbdc;
318         rbd_dev->client = rbdc->client;
319         return 0;
320 }
321
322 /*
323  * Destroy ceph client
324  */
325 static void rbd_client_release(struct kref *kref)
326 {
327         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
328
329         dout("rbd_release_client %p\n", rbdc);
330         spin_lock(&node_lock);
331         list_del(&rbdc->node);
332         spin_unlock(&node_lock);
333
334         ceph_destroy_client(rbdc->client);
335         kfree(rbdc);
336 }
337
338 /*
339  * Drop reference to ceph client node. If it's not referenced anymore, release
340  * it.
341  */
342 static void rbd_put_client(struct rbd_device *rbd_dev)
343 {
344         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345         rbd_dev->rbd_client = NULL;
346         rbd_dev->client = NULL;
347 }
348
349
350 /*
351  * Create a new header structure, translate header format from the on-disk
352  * header.
353  */
354 static int rbd_header_from_disk(struct rbd_image_header *header,
355                                  struct rbd_image_header_ondisk *ondisk,
356                                  int allocated_snaps,
357                                  gfp_t gfp_flags)
358 {
359         int i;
360         u32 snap_count = le32_to_cpu(ondisk->snap_count);
361         int ret = -ENOMEM;
362
363         init_rwsem(&header->snap_rwsem);
364
365         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
367                                 snap_count *
368                                  sizeof(struct rbd_image_snap_ondisk),
369                                 gfp_flags);
370         if (!header->snapc)
371                 return -ENOMEM;
372         if (snap_count) {
373                 header->snap_names = kmalloc(header->snap_names_len,
374                                              GFP_KERNEL);
375                 if (!header->snap_names)
376                         goto err_snapc;
377                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
378                                              GFP_KERNEL);
379                 if (!header->snap_sizes)
380                         goto err_names;
381         } else {
382                 header->snap_names = NULL;
383                 header->snap_sizes = NULL;
384         }
385         memcpy(header->block_name, ondisk->block_name,
386                sizeof(ondisk->block_name));
387
388         header->image_size = le64_to_cpu(ondisk->image_size);
389         header->obj_order = ondisk->options.order;
390         header->crypt_type = ondisk->options.crypt_type;
391         header->comp_type = ondisk->options.comp_type;
392
393         atomic_set(&header->snapc->nref, 1);
394         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395         header->snapc->num_snaps = snap_count;
396         header->total_snaps = snap_count;
397
398         if (snap_count &&
399             allocated_snaps == snap_count) {
400                 for (i = 0; i < snap_count; i++) {
401                         header->snapc->snaps[i] =
402                                 le64_to_cpu(ondisk->snaps[i].id);
403                         header->snap_sizes[i] =
404                                 le64_to_cpu(ondisk->snaps[i].image_size);
405                 }
406
407                 /* copy snapshot names */
408                 memcpy(header->snap_names, &ondisk->snaps[i],
409                         header->snap_names_len);
410         }
411
412         return 0;
413
414 err_names:
415         kfree(header->snap_names);
416 err_snapc:
417         kfree(header->snapc);
418         return ret;
419 }
420
421 static int snap_index(struct rbd_image_header *header, int snap_num)
422 {
423         return header->total_snaps - snap_num;
424 }
425
426 static u64 cur_snap_id(struct rbd_device *rbd_dev)
427 {
428         struct rbd_image_header *header = &rbd_dev->header;
429
430         if (!rbd_dev->cur_snap)
431                 return 0;
432
433         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
434 }
435
436 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
437                         u64 *seq, u64 *size)
438 {
439         int i;
440         char *p = header->snap_names;
441
442         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443                 if (strcmp(snap_name, p) == 0)
444                         break;
445         }
446         if (i == header->total_snaps)
447                 return -ENOENT;
448         if (seq)
449                 *seq = header->snapc->snaps[i];
450
451         if (size)
452                 *size = header->snap_sizes[i];
453
454         return i;
455 }
456
457 static int rbd_header_set_snap(struct rbd_device *dev,
458                                const char *snap_name,
459                                u64 *size)
460 {
461         struct rbd_image_header *header = &dev->header;
462         struct ceph_snap_context *snapc = header->snapc;
463         int ret = -ENOENT;
464
465         down_write(&header->snap_rwsem);
466
467         if (!snap_name ||
468             !*snap_name ||
469             strcmp(snap_name, "-") == 0 ||
470             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471                 if (header->total_snaps)
472                         snapc->seq = header->snap_seq;
473                 else
474                         snapc->seq = 0;
475                 dev->cur_snap = 0;
476                 dev->read_only = 0;
477                 if (size)
478                         *size = header->image_size;
479         } else {
480                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
481                 if (ret < 0)
482                         goto done;
483
484                 dev->cur_snap = header->total_snaps - ret;
485                 dev->read_only = 1;
486         }
487
488         ret = 0;
489 done:
490         up_write(&header->snap_rwsem);
491         return ret;
492 }
493
494 static void rbd_header_free(struct rbd_image_header *header)
495 {
496         kfree(header->snapc);
497         kfree(header->snap_names);
498         kfree(header->snap_sizes);
499 }
500
501 /*
502  * get the actual striped segment name, offset and length
503  */
504 static u64 rbd_get_segment(struct rbd_image_header *header,
505                            const char *block_name,
506                            u64 ofs, u64 len,
507                            char *seg_name, u64 *segofs)
508 {
509         u64 seg = ofs >> header->obj_order;
510
511         if (seg_name)
512                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513                          "%s.%012llx", block_name, seg);
514
515         ofs = ofs & ((1 << header->obj_order) - 1);
516         len = min_t(u64, len, (1 << header->obj_order) - ofs);
517
518         if (segofs)
519                 *segofs = ofs;
520
521         return len;
522 }
523
524 /*
525  * bio helpers
526  */
527
528 static void bio_chain_put(struct bio *chain)
529 {
530         struct bio *tmp;
531
532         while (chain) {
533                 tmp = chain;
534                 chain = chain->bi_next;
535                 bio_put(tmp);
536         }
537 }
538
539 /*
540  * zeros a bio chain, starting at specific offset
541  */
542 static void zero_bio_chain(struct bio *chain, int start_ofs)
543 {
544         struct bio_vec *bv;
545         unsigned long flags;
546         void *buf;
547         int i;
548         int pos = 0;
549
550         while (chain) {
551                 bio_for_each_segment(bv, chain, i) {
552                         if (pos + bv->bv_len > start_ofs) {
553                                 int remainder = max(start_ofs - pos, 0);
554                                 buf = bvec_kmap_irq(bv, &flags);
555                                 memset(buf + remainder, 0,
556                                        bv->bv_len - remainder);
557                                 bvec_kunmap_irq(bv, &flags);
558                         }
559                         pos += bv->bv_len;
560                 }
561
562                 chain = chain->bi_next;
563         }
564 }
565
566 /*
567  * bio_chain_clone - clone a chain of bios up to a certain length.
568  * might return a bio_pair that will need to be released.
569  */
570 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571                                    struct bio_pair **bp,
572                                    int len, gfp_t gfpmask)
573 {
574         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
575         int total = 0;
576
577         if (*bp) {
578                 bio_pair_release(*bp);
579                 *bp = NULL;
580         }
581
582         while (old_chain && (total < len)) {
583                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
584                 if (!tmp)
585                         goto err_out;
586
587                 if (total + old_chain->bi_size > len) {
588                         struct bio_pair *bp;
589
590                         /*
591                          * this split can only happen with a single paged bio,
592                          * split_bio will BUG_ON if this is not the case
593                          */
594                         dout("bio_chain_clone split! total=%d remaining=%d"
595                              "bi_size=%d\n",
596                              (int)total, (int)len-total,
597                              (int)old_chain->bi_size);
598
599                         /* split the bio. We'll release it either in the next
600                            call, or it will have to be released outside */
601                         bp = bio_split(old_chain, (len - total) / 512ULL);
602                         if (!bp)
603                                 goto err_out;
604
605                         __bio_clone(tmp, &bp->bio1);
606
607                         *next = &bp->bio2;
608                 } else {
609                         __bio_clone(tmp, old_chain);
610                         *next = old_chain->bi_next;
611                 }
612
613                 tmp->bi_bdev = NULL;
614                 gfpmask &= ~__GFP_WAIT;
615                 tmp->bi_next = NULL;
616
617                 if (!new_chain) {
618                         new_chain = tail = tmp;
619                 } else {
620                         tail->bi_next = tmp;
621                         tail = tmp;
622                 }
623                 old_chain = old_chain->bi_next;
624
625                 total += tmp->bi_size;
626         }
627
628         BUG_ON(total < len);
629
630         if (tail)
631                 tail->bi_next = NULL;
632
633         *old = old_chain;
634
635         return new_chain;
636
637 err_out:
638         dout("bio_chain_clone with err\n");
639         bio_chain_put(new_chain);
640         return NULL;
641 }
642
643 /*
644  * helpers for osd request op vectors.
645  */
646 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
647                             int num_ops,
648                             int opcode,
649                             u32 payload_len)
650 {
651         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
652                        GFP_NOIO);
653         if (!*ops)
654                 return -ENOMEM;
655         (*ops)[0].op = opcode;
656         /*
657          * op extent offset and length will be set later on
658          * in calc_raw_layout()
659          */
660         (*ops)[0].payload_len = payload_len;
661         return 0;
662 }
663
664 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
665 {
666         kfree(ops);
667 }
668
669 /*
670  * Send ceph osd request
671  */
672 static int rbd_do_request(struct request *rq,
673                           struct rbd_device *dev,
674                           struct ceph_snap_context *snapc,
675                           u64 snapid,
676                           const char *obj, u64 ofs, u64 len,
677                           struct bio *bio,
678                           struct page **pages,
679                           int num_pages,
680                           int flags,
681                           struct ceph_osd_req_op *ops,
682                           int num_reply,
683                           void (*rbd_cb)(struct ceph_osd_request *req,
684                                          struct ceph_msg *msg))
685 {
686         struct ceph_osd_request *req;
687         struct ceph_file_layout *layout;
688         int ret;
689         u64 bno;
690         struct timespec mtime = CURRENT_TIME;
691         struct rbd_request *req_data;
692         struct ceph_osd_request_head *reqhead;
693         struct rbd_image_header *header = &dev->header;
694
695         ret = -ENOMEM;
696         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
697         if (!req_data)
698                 goto done;
699
700         dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
701
702         down_read(&header->snap_rwsem);
703
704         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
705                                       snapc,
706                                       ops,
707                                       false,
708                                       GFP_NOIO, pages, bio);
709         if (IS_ERR(req)) {
710                 up_read(&header->snap_rwsem);
711                 ret = PTR_ERR(req);
712                 goto done_pages;
713         }
714
715         req->r_callback = rbd_cb;
716
717         req_data->rq = rq;
718         req_data->bio = bio;
719         req_data->pages = pages;
720         req_data->len = len;
721
722         req->r_priv = req_data;
723
724         reqhead = req->r_request->front.iov_base;
725         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
726
727         strncpy(req->r_oid, obj, sizeof(req->r_oid));
728         req->r_oid_len = strlen(req->r_oid);
729
730         layout = &req->r_file_layout;
731         memset(layout, 0, sizeof(*layout));
732         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733         layout->fl_stripe_count = cpu_to_le32(1);
734         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735         layout->fl_pg_preferred = cpu_to_le32(-1);
736         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738                              ofs, &len, &bno, req, ops);
739
740         ceph_osdc_build_request(req, ofs, &len,
741                                 ops,
742                                 snapc,
743                                 &mtime,
744                                 req->r_oid, req->r_oid_len);
745         up_read(&header->snap_rwsem);
746
747         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
748         if (ret < 0)
749                 goto done_err;
750
751         if (!rbd_cb) {
752                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753                 ceph_osdc_put_request(req);
754         }
755         return ret;
756
757 done_err:
758         bio_chain_put(req_data->bio);
759         ceph_osdc_put_request(req);
760 done_pages:
761         kfree(req_data);
762 done:
763         if (rq)
764                 blk_end_request(rq, ret, len);
765         return ret;
766 }
767
768 /*
769  * Ceph osd op callback
770  */
771 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
772 {
773         struct rbd_request *req_data = req->r_priv;
774         struct ceph_osd_reply_head *replyhead;
775         struct ceph_osd_op *op;
776         __s32 rc;
777         u64 bytes;
778         int read_op;
779
780         /* parse reply */
781         replyhead = msg->front.iov_base;
782         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783         op = (void *)(replyhead + 1);
784         rc = le32_to_cpu(replyhead->result);
785         bytes = le64_to_cpu(op->extent.length);
786         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
787
788         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
789
790         if (rc == -ENOENT && read_op) {
791                 zero_bio_chain(req_data->bio, 0);
792                 rc = 0;
793         } else if (rc == 0 && read_op && bytes < req_data->len) {
794                 zero_bio_chain(req_data->bio, bytes);
795                 bytes = req_data->len;
796         }
797
798         blk_end_request(req_data->rq, rc, bytes);
799
800         if (req_data->bio)
801                 bio_chain_put(req_data->bio);
802
803         ceph_osdc_put_request(req);
804         kfree(req_data);
805 }
806
807 /*
808  * Do a synchronous ceph osd operation
809  */
810 static int rbd_req_sync_op(struct rbd_device *dev,
811                            struct ceph_snap_context *snapc,
812                            u64 snapid,
813                            int opcode,
814                            int flags,
815                            struct ceph_osd_req_op *orig_ops,
816                            int num_reply,
817                            const char *obj,
818                            u64 ofs, u64 len,
819                            char *buf)
820 {
821         int ret;
822         struct page **pages;
823         int num_pages;
824         struct ceph_osd_req_op *ops = orig_ops;
825         u32 payload_len;
826
827         num_pages = calc_pages_for(ofs , len);
828         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
829         if (!pages)
830                 return -ENOMEM;
831
832         if (!orig_ops) {
833                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
835                 if (ret < 0)
836                         goto done;
837
838                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
840                         if (ret < 0)
841                                 goto done_ops;
842                 }
843         }
844
845         ret = rbd_do_request(NULL, dev, snapc, snapid,
846                           obj, ofs, len, NULL,
847                           pages, num_pages,
848                           flags,
849                           ops,
850                           2,
851                           NULL);
852         if (ret < 0)
853                 goto done_ops;
854
855         if ((flags & CEPH_OSD_FLAG_READ) && buf)
856                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
857
858 done_ops:
859         if (!orig_ops)
860                 rbd_destroy_ops(ops);
861 done:
862         ceph_release_page_vector(pages, num_pages);
863         return ret;
864 }
865
866 /*
867  * Do an asynchronous ceph osd operation
868  */
869 static int rbd_do_op(struct request *rq,
870                      struct rbd_device *rbd_dev ,
871                      struct ceph_snap_context *snapc,
872                      u64 snapid,
873                      int opcode, int flags, int num_reply,
874                      u64 ofs, u64 len,
875                      struct bio *bio)
876 {
877         char *seg_name;
878         u64 seg_ofs;
879         u64 seg_len;
880         int ret;
881         struct ceph_osd_req_op *ops;
882         u32 payload_len;
883
884         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
885         if (!seg_name)
886                 return -ENOMEM;
887
888         seg_len = rbd_get_segment(&rbd_dev->header,
889                                   rbd_dev->header.block_name,
890                                   ofs, len,
891                                   seg_name, &seg_ofs);
892         if (seg_len < 0)
893                 return seg_len;
894
895         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
896
897         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
898         if (ret < 0)
899                 goto done;
900
901         /* we've taken care of segment sizes earlier when we
902            cloned the bios. We should never have a segment
903            truncated at this point */
904         BUG_ON(seg_len < len);
905
906         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
907                              seg_name, seg_ofs, seg_len,
908                              bio,
909                              NULL, 0,
910                              flags,
911                              ops,
912                              num_reply,
913                              rbd_req_cb);
914 done:
915         kfree(seg_name);
916         return ret;
917 }
918
919 /*
920  * Request async osd write
921  */
922 static int rbd_req_write(struct request *rq,
923                          struct rbd_device *rbd_dev,
924                          struct ceph_snap_context *snapc,
925                          u64 ofs, u64 len,
926                          struct bio *bio)
927 {
928         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
929                          CEPH_OSD_OP_WRITE,
930                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
931                          2,
932                          ofs, len, bio);
933 }
934
935 /*
936  * Request async osd read
937  */
938 static int rbd_req_read(struct request *rq,
939                          struct rbd_device *rbd_dev,
940                          u64 snapid,
941                          u64 ofs, u64 len,
942                          struct bio *bio)
943 {
944         return rbd_do_op(rq, rbd_dev, NULL,
945                          (snapid ? snapid : CEPH_NOSNAP),
946                          CEPH_OSD_OP_READ,
947                          CEPH_OSD_FLAG_READ,
948                          2,
949                          ofs, len, bio);
950 }
951
952 /*
953  * Request sync osd read
954  */
955 static int rbd_req_sync_read(struct rbd_device *dev,
956                           struct ceph_snap_context *snapc,
957                           u64 snapid,
958                           const char *obj,
959                           u64 ofs, u64 len,
960                           char *buf)
961 {
962         return rbd_req_sync_op(dev, NULL,
963                                (snapid ? snapid : CEPH_NOSNAP),
964                                CEPH_OSD_OP_READ,
965                                CEPH_OSD_FLAG_READ,
966                                NULL,
967                                1, obj, ofs, len, buf);
968 }
969
970 /*
971  * Request sync osd read
972  */
973 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
974                                      u64 snapid,
975                                      const char *obj)
976 {
977         struct ceph_osd_req_op *ops;
978         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
979         if (ret < 0)
980                 return ret;
981
982         ops[0].snap.snapid = snapid;
983
984         ret = rbd_req_sync_op(dev, NULL,
985                                CEPH_NOSNAP,
986                                0,
987                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
988                                ops,
989                                1, obj, 0, 0, NULL);
990
991         rbd_destroy_ops(ops);
992
993         if (ret < 0)
994                 return ret;
995
996         return ret;
997 }
998
999 /*
1000  * Request sync osd read
1001  */
1002 static int rbd_req_sync_exec(struct rbd_device *dev,
1003                              const char *obj,
1004                              const char *cls,
1005                              const char *method,
1006                              const char *data,
1007                              int len)
1008 {
1009         struct ceph_osd_req_op *ops;
1010         int cls_len = strlen(cls);
1011         int method_len = strlen(method);
1012         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1013                                     cls_len + method_len + len);
1014         if (ret < 0)
1015                 return ret;
1016
1017         ops[0].cls.class_name = cls;
1018         ops[0].cls.class_len = (__u8)cls_len;
1019         ops[0].cls.method_name = method;
1020         ops[0].cls.method_len = (__u8)method_len;
1021         ops[0].cls.argc = 0;
1022         ops[0].cls.indata = data;
1023         ops[0].cls.indata_len = len;
1024
1025         ret = rbd_req_sync_op(dev, NULL,
1026                                CEPH_NOSNAP,
1027                                0,
1028                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1029                                ops,
1030                                1, obj, 0, 0, NULL);
1031
1032         rbd_destroy_ops(ops);
1033
1034         dout("cls_exec returned %d\n", ret);
1035         return ret;
1036 }
1037
1038 /*
1039  * block device queue callback
1040  */
1041 static void rbd_rq_fn(struct request_queue *q)
1042 {
1043         struct rbd_device *rbd_dev = q->queuedata;
1044         struct request *rq;
1045         struct bio_pair *bp = NULL;
1046
1047         rq = blk_fetch_request(q);
1048
1049         while (1) {
1050                 struct bio *bio;
1051                 struct bio *rq_bio, *next_bio = NULL;
1052                 bool do_write;
1053                 int size, op_size = 0;
1054                 u64 ofs;
1055
1056                 /* peek at request from block layer */
1057                 if (!rq)
1058                         break;
1059
1060                 dout("fetched request\n");
1061
1062                 /* filter out block requests we don't understand */
1063                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1064                         __blk_end_request_all(rq, 0);
1065                         goto next;
1066                 }
1067
1068                 /* deduce our operation (read, write) */
1069                 do_write = (rq_data_dir(rq) == WRITE);
1070
1071                 size = blk_rq_bytes(rq);
1072                 ofs = blk_rq_pos(rq) * 512ULL;
1073                 rq_bio = rq->bio;
1074                 if (do_write && rbd_dev->read_only) {
1075                         __blk_end_request_all(rq, -EROFS);
1076                         goto next;
1077                 }
1078
1079                 spin_unlock_irq(q->queue_lock);
1080
1081                 dout("%s 0x%x bytes at 0x%llx\n",
1082                      do_write ? "write" : "read",
1083                      size, blk_rq_pos(rq) * 512ULL);
1084
1085                 do {
1086                         /* a bio clone to be passed down to OSD req */
1087                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1088                         op_size = rbd_get_segment(&rbd_dev->header,
1089                                                   rbd_dev->header.block_name,
1090                                                   ofs, size,
1091                                                   NULL, NULL);
1092                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1093                                               op_size, GFP_ATOMIC);
1094                         if (!bio) {
1095                                 spin_lock_irq(q->queue_lock);
1096                                 __blk_end_request_all(rq, -ENOMEM);
1097                                 goto next;
1098                         }
1099
1100                         /* init OSD command: write or read */
1101                         if (do_write)
1102                                 rbd_req_write(rq, rbd_dev,
1103                                               rbd_dev->header.snapc,
1104                                               ofs,
1105                                               op_size, bio);
1106                         else
1107                                 rbd_req_read(rq, rbd_dev,
1108                                              cur_snap_id(rbd_dev),
1109                                              ofs,
1110                                              op_size, bio);
1111
1112                         size -= op_size;
1113                         ofs += op_size;
1114
1115                         rq_bio = next_bio;
1116                 } while (size > 0);
1117
1118                 if (bp)
1119                         bio_pair_release(bp);
1120
1121                 spin_lock_irq(q->queue_lock);
1122 next:
1123                 rq = blk_fetch_request(q);
1124         }
1125 }
1126
1127 /*
1128  * a queue callback. Makes sure that we don't create a bio that spans across
1129  * multiple osd objects. One exception would be with a single page bios,
1130  * which we handle later at bio_chain_clone
1131  */
1132 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1133                           struct bio_vec *bvec)
1134 {
1135         struct rbd_device *rbd_dev = q->queuedata;
1136         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1137         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1138         unsigned int bio_sectors = bmd->bi_size >> 9;
1139         int max;
1140
1141         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1142                                  + bio_sectors)) << 9;
1143         if (max < 0)
1144                 max = 0; /* bio_add cannot handle a negative return */
1145         if (max <= bvec->bv_len && bio_sectors == 0)
1146                 return bvec->bv_len;
1147         return max;
1148 }
1149
1150 static void rbd_free_disk(struct rbd_device *rbd_dev)
1151 {
1152         struct gendisk *disk = rbd_dev->disk;
1153
1154         if (!disk)
1155                 return;
1156
1157         rbd_header_free(&rbd_dev->header);
1158
1159         if (disk->flags & GENHD_FL_UP)
1160                 del_gendisk(disk);
1161         if (disk->queue)
1162                 blk_cleanup_queue(disk->queue);
1163         put_disk(disk);
1164 }
1165
1166 /*
1167  * reload the ondisk the header 
1168  */
1169 static int rbd_read_header(struct rbd_device *rbd_dev,
1170                            struct rbd_image_header *header)
1171 {
1172         ssize_t rc;
1173         struct rbd_image_header_ondisk *dh;
1174         int snap_count = 0;
1175         u64 snap_names_len = 0;
1176
1177         while (1) {
1178                 int len = sizeof(*dh) +
1179                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1180                           snap_names_len;
1181
1182                 rc = -ENOMEM;
1183                 dh = kmalloc(len, GFP_KERNEL);
1184                 if (!dh)
1185                         return -ENOMEM;
1186
1187                 rc = rbd_req_sync_read(rbd_dev,
1188                                        NULL, CEPH_NOSNAP,
1189                                        rbd_dev->obj_md_name,
1190                                        0, len,
1191                                        (char *)dh);
1192                 if (rc < 0)
1193                         goto out_dh;
1194
1195                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1196                 if (rc < 0)
1197                         goto out_dh;
1198
1199                 if (snap_count != header->total_snaps) {
1200                         snap_count = header->total_snaps;
1201                         snap_names_len = header->snap_names_len;
1202                         rbd_header_free(header);
1203                         kfree(dh);
1204                         continue;
1205                 }
1206                 break;
1207         }
1208
1209 out_dh:
1210         kfree(dh);
1211         return rc;
1212 }
1213
1214 /*
1215  * create a snapshot
1216  */
1217 static int rbd_header_add_snap(struct rbd_device *dev,
1218                                const char *snap_name,
1219                                gfp_t gfp_flags)
1220 {
1221         int name_len = strlen(snap_name);
1222         u64 new_snapid;
1223         int ret;
1224         void *data, *data_start, *data_end;
1225
1226         /* we should create a snapshot only if we're pointing at the head */
1227         if (dev->cur_snap)
1228                 return -EINVAL;
1229
1230         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1231                                       &new_snapid);
1232         dout("created snapid=%lld\n", new_snapid);
1233         if (ret < 0)
1234                 return ret;
1235
1236         data = kmalloc(name_len + 16, gfp_flags);
1237         if (!data)
1238                 return -ENOMEM;
1239
1240         data_start = data;
1241         data_end = data + name_len + 16;
1242
1243         ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1244         ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1245
1246         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1247                                 data_start, data - data_start);
1248
1249         kfree(data_start);
1250
1251         if (ret < 0)
1252                 return ret;
1253
1254         dev->header.snapc->seq =  new_snapid;
1255
1256         return 0;
1257 bad:
1258         return -ERANGE;
1259 }
1260
1261 /*
1262  * only read the first part of the ondisk header, without the snaps info
1263  */
1264 static int rbd_update_snaps(struct rbd_device *rbd_dev)
1265 {
1266         int ret;
1267         struct rbd_image_header h;
1268         u64 snap_seq;
1269
1270         ret = rbd_read_header(rbd_dev, &h);
1271         if (ret < 0)
1272                 return ret;
1273
1274         down_write(&rbd_dev->header.snap_rwsem);
1275
1276         snap_seq = rbd_dev->header.snapc->seq;
1277
1278         kfree(rbd_dev->header.snapc);
1279         kfree(rbd_dev->header.snap_names);
1280         kfree(rbd_dev->header.snap_sizes);
1281
1282         rbd_dev->header.total_snaps = h.total_snaps;
1283         rbd_dev->header.snapc = h.snapc;
1284         rbd_dev->header.snap_names = h.snap_names;
1285         rbd_dev->header.snap_sizes = h.snap_sizes;
1286         rbd_dev->header.snapc->seq = snap_seq;
1287
1288         up_write(&rbd_dev->header.snap_rwsem);
1289
1290         return 0;
1291 }
1292
1293 static int rbd_init_disk(struct rbd_device *rbd_dev)
1294 {
1295         struct gendisk *disk;
1296         struct request_queue *q;
1297         int rc;
1298         u64 total_size = 0;
1299
1300         /* contact OSD, request size info about the object being mapped */
1301         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1302         if (rc)
1303                 return rc;
1304
1305         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1306         if (rc)
1307                 return rc;
1308
1309         /* create gendisk info */
1310         rc = -ENOMEM;
1311         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1312         if (!disk)
1313                 goto out;
1314
1315         sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1316         disk->major = rbd_dev->major;
1317         disk->first_minor = 0;
1318         disk->fops = &rbd_bd_ops;
1319         disk->private_data = rbd_dev;
1320
1321         /* init rq */
1322         rc = -ENOMEM;
1323         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1324         if (!q)
1325                 goto out_disk;
1326         blk_queue_merge_bvec(q, rbd_merge_bvec);
1327         disk->queue = q;
1328
1329         q->queuedata = rbd_dev;
1330
1331         rbd_dev->disk = disk;
1332         rbd_dev->q = q;
1333
1334         /* finally, announce the disk to the world */
1335         set_capacity(disk, total_size / 512ULL);
1336         add_disk(disk);
1337
1338         pr_info("%s: added with size 0x%llx\n",
1339                 disk->disk_name, (unsigned long long)total_size);
1340         return 0;
1341
1342 out_disk:
1343         put_disk(disk);
1344 out:
1345         return rc;
1346 }
1347
1348 /********************************************************************
1349  * /sys/class/rbd/
1350  *                   add        map rados objects to blkdev
1351  *                   remove     unmap rados objects
1352  *                   list       show mappings
1353  *******************************************************************/
1354
1355 static void class_rbd_release(struct class *cls)
1356 {
1357         kfree(cls);
1358 }
1359
1360 static ssize_t class_rbd_list(struct class *c,
1361                               struct class_attribute *attr,
1362                               char *data)
1363 {
1364         int n = 0;
1365         struct list_head *tmp;
1366         int max = PAGE_SIZE;
1367
1368         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1369
1370         n += snprintf(data, max,
1371                       "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1372
1373         list_for_each(tmp, &rbd_dev_list) {
1374                 struct rbd_device *rbd_dev;
1375
1376                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1377                 n += snprintf(data+n, max-n,
1378                               "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1379                               rbd_dev->id,
1380                               rbd_dev->major,
1381                               ceph_client_id(rbd_dev->client),
1382                               rbd_dev->pool_name,
1383                               rbd_dev->obj, rbd_dev->snap_name,
1384                               rbd_dev->header.image_size >> 10);
1385                 if (n == max)
1386                         break;
1387         }
1388
1389         mutex_unlock(&ctl_mutex);
1390         return n;
1391 }
1392
1393 static ssize_t class_rbd_add(struct class *c,
1394                              struct class_attribute *attr,
1395                              const char *buf, size_t count)
1396 {
1397         struct ceph_osd_client *osdc;
1398         struct rbd_device *rbd_dev;
1399         ssize_t rc = -ENOMEM;
1400         int irc, new_id = 0;
1401         struct list_head *tmp;
1402         char *mon_dev_name;
1403         char *options;
1404
1405         if (!try_module_get(THIS_MODULE))
1406                 return -ENODEV;
1407
1408         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1409         if (!mon_dev_name)
1410                 goto err_out_mod;
1411
1412         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1413         if (!options)
1414                 goto err_mon_dev;
1415
1416         /* new rbd_device object */
1417         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1418         if (!rbd_dev)
1419                 goto err_out_opt;
1420
1421         /* static rbd_device initialization */
1422         spin_lock_init(&rbd_dev->lock);
1423         INIT_LIST_HEAD(&rbd_dev->node);
1424
1425         /* generate unique id: find highest unique id, add one */
1426         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1427
1428         list_for_each(tmp, &rbd_dev_list) {
1429                 struct rbd_device *rbd_dev;
1430
1431                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1432                 if (rbd_dev->id >= new_id)
1433                         new_id = rbd_dev->id + 1;
1434         }
1435
1436         rbd_dev->id = new_id;
1437
1438         /* add to global list */
1439         list_add_tail(&rbd_dev->node, &rbd_dev_list);
1440
1441         /* parse add command */
1442         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1443                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
1444                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1445                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1446                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1447                    mon_dev_name, options, rbd_dev->pool_name,
1448                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
1449                 rc = -EINVAL;
1450                 goto err_out_slot;
1451         }
1452
1453         if (rbd_dev->snap_name[0] == 0)
1454                 rbd_dev->snap_name[0] = '-';
1455
1456         rbd_dev->obj_len = strlen(rbd_dev->obj);
1457         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1458                  rbd_dev->obj, RBD_SUFFIX);
1459
1460         /* initialize rest of new object */
1461         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1462         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1463         if (rc < 0)
1464                 goto err_out_slot;
1465
1466         mutex_unlock(&ctl_mutex);
1467
1468         /* pick the pool */
1469         osdc = &rbd_dev->client->osdc;
1470         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1471         if (rc < 0)
1472                 goto err_out_client;
1473         rbd_dev->poolid = rc;
1474
1475         /* register our block device */
1476         irc = register_blkdev(0, rbd_dev->name);
1477         if (irc < 0) {
1478                 rc = irc;
1479                 goto err_out_client;
1480         }
1481         rbd_dev->major = irc;
1482
1483         /* set up and announce blkdev mapping */
1484         rc = rbd_init_disk(rbd_dev);
1485         if (rc)
1486                 goto err_out_blkdev;
1487
1488         return count;
1489
1490 err_out_blkdev:
1491         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1492 err_out_client:
1493         rbd_put_client(rbd_dev);
1494         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1495 err_out_slot:
1496         list_del_init(&rbd_dev->node);
1497         mutex_unlock(&ctl_mutex);
1498
1499         kfree(rbd_dev);
1500 err_out_opt:
1501         kfree(options);
1502 err_mon_dev:
1503         kfree(mon_dev_name);
1504 err_out_mod:
1505         dout("Error adding device %s\n", buf);
1506         module_put(THIS_MODULE);
1507         return rc;
1508 }
1509
1510 static struct rbd_device *__rbd_get_dev(unsigned long id)
1511 {
1512         struct list_head *tmp;
1513         struct rbd_device *rbd_dev;
1514
1515         list_for_each(tmp, &rbd_dev_list) {
1516                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1517                 if (rbd_dev->id == id)
1518                         return rbd_dev;
1519         }
1520         return NULL;
1521 }
1522
1523 static ssize_t class_rbd_remove(struct class *c,
1524                                 struct class_attribute *attr,
1525                                 const char *buf,
1526                                 size_t count)
1527 {
1528         struct rbd_device *rbd_dev = NULL;
1529         int target_id, rc;
1530         unsigned long ul;
1531
1532         rc = strict_strtoul(buf, 10, &ul);
1533         if (rc)
1534                 return rc;
1535
1536         /* convert to int; abort if we lost anything in the conversion */
1537         target_id = (int) ul;
1538         if (target_id != ul)
1539                 return -EINVAL;
1540
1541         /* remove object from list immediately */
1542         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1543
1544         rbd_dev = __rbd_get_dev(target_id);
1545         if (rbd_dev)
1546                 list_del_init(&rbd_dev->node);
1547
1548         mutex_unlock(&ctl_mutex);
1549
1550         if (!rbd_dev)
1551                 return -ENOENT;
1552
1553         rbd_put_client(rbd_dev);
1554
1555         /* clean up and free blkdev */
1556         rbd_free_disk(rbd_dev);
1557         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1558         kfree(rbd_dev);
1559
1560         /* release module ref */
1561         module_put(THIS_MODULE);
1562
1563         return count;
1564 }
1565
1566 static ssize_t class_rbd_snaps_list(struct class *c,
1567                               struct class_attribute *attr,
1568                               char *data)
1569 {
1570         struct rbd_device *rbd_dev = NULL;
1571         struct list_head *tmp;
1572         struct rbd_image_header *header;
1573         int i, n = 0, max = PAGE_SIZE;
1574         int ret;
1575
1576         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1577
1578         n += snprintf(data, max, "#id\tsnap\tKB\n");
1579
1580         list_for_each(tmp, &rbd_dev_list) {
1581                 char *names, *p;
1582                 struct ceph_snap_context *snapc;
1583
1584                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1585                 header = &rbd_dev->header;
1586
1587                 down_read(&header->snap_rwsem);
1588
1589                 names = header->snap_names;
1590                 snapc = header->snapc;
1591
1592                 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1593                               rbd_dev->id, RBD_SNAP_HEAD_NAME,
1594                               header->image_size >> 10,
1595                               (!rbd_dev->cur_snap ? " (*)" : ""));
1596                 if (n == max)
1597                         break;
1598
1599                 p = names;
1600                 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1601                         n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1602                               rbd_dev->id, p, header->snap_sizes[i] >> 10,
1603                               (rbd_dev->cur_snap &&
1604                                (snap_index(header, i) == rbd_dev->cur_snap) ?
1605                                " (*)" : ""));
1606                         if (n == max)
1607                                 break;
1608                 }
1609
1610                 up_read(&header->snap_rwsem);
1611         }
1612
1613
1614         ret = n;
1615         mutex_unlock(&ctl_mutex);
1616         return ret;
1617 }
1618
1619 static ssize_t class_rbd_snaps_refresh(struct class *c,
1620                                 struct class_attribute *attr,
1621                                 const char *buf,
1622                                 size_t count)
1623 {
1624         struct rbd_device *rbd_dev = NULL;
1625         int target_id, rc;
1626         unsigned long ul;
1627         int ret = count;
1628
1629         rc = strict_strtoul(buf, 10, &ul);
1630         if (rc)
1631                 return rc;
1632
1633         /* convert to int; abort if we lost anything in the conversion */
1634         target_id = (int) ul;
1635         if (target_id != ul)
1636                 return -EINVAL;
1637
1638         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1639
1640         rbd_dev = __rbd_get_dev(target_id);
1641         if (!rbd_dev) {
1642                 ret = -ENOENT;
1643                 goto done;
1644         }
1645
1646         rc = rbd_update_snaps(rbd_dev);
1647         if (rc < 0)
1648                 ret = rc;
1649
1650 done:
1651         mutex_unlock(&ctl_mutex);
1652         return ret;
1653 }
1654
1655 static ssize_t class_rbd_snap_create(struct class *c,
1656                                 struct class_attribute *attr,
1657                                 const char *buf,
1658                                 size_t count)
1659 {
1660         struct rbd_device *rbd_dev = NULL;
1661         int target_id, ret;
1662         char *name;
1663
1664         name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1665         if (!name)
1666                 return -ENOMEM;
1667
1668         /* parse snaps add command */
1669         if (sscanf(buf, "%d "
1670                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1671                    &target_id,
1672                    name) != 2) {
1673                 ret = -EINVAL;
1674                 goto done;
1675         }
1676
1677         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1678
1679         rbd_dev = __rbd_get_dev(target_id);
1680         if (!rbd_dev) {
1681                 ret = -ENOENT;
1682                 goto done_unlock;
1683         }
1684
1685         ret = rbd_header_add_snap(rbd_dev,
1686                                   name, GFP_KERNEL);
1687         if (ret < 0)
1688                 goto done_unlock;
1689
1690         ret = rbd_update_snaps(rbd_dev);
1691         if (ret < 0)
1692                 goto done_unlock;
1693
1694         ret = count;
1695 done_unlock:
1696         mutex_unlock(&ctl_mutex);
1697 done:
1698         kfree(name);
1699         return ret;
1700 }
1701
1702 static ssize_t class_rbd_rollback(struct class *c,
1703                                 struct class_attribute *attr,
1704                                 const char *buf,
1705                                 size_t count)
1706 {
1707         struct rbd_device *rbd_dev = NULL;
1708         int target_id, ret;
1709         u64 snapid;
1710         char snap_name[RBD_MAX_SNAP_NAME_LEN];
1711         u64 cur_ofs;
1712         char *seg_name;
1713
1714         /* parse snaps add command */
1715         if (sscanf(buf, "%d "
1716                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1717                    &target_id,
1718                    snap_name) != 2) {
1719                 return -EINVAL;
1720         }
1721
1722         ret = -ENOMEM;
1723         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1724         if (!seg_name)
1725                 return ret;
1726
1727         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1728
1729         rbd_dev = __rbd_get_dev(target_id);
1730         if (!rbd_dev) {
1731                 ret = -ENOENT;
1732                 goto done_unlock;
1733         }
1734
1735         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1736         if (ret < 0)
1737                 goto done_unlock;
1738
1739         dout("snapid=%lld\n", snapid);
1740
1741         cur_ofs = 0;
1742         while (cur_ofs < rbd_dev->header.image_size) {
1743                 cur_ofs += rbd_get_segment(&rbd_dev->header,
1744                                            rbd_dev->obj,
1745                                            cur_ofs, (u64)-1,
1746                                            seg_name, NULL);
1747                 dout("seg_name=%s\n", seg_name);
1748
1749                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1750                 if (ret < 0)
1751                         pr_warning("could not roll back obj %s err=%d\n",
1752                                    seg_name, ret);
1753         }
1754
1755         ret = rbd_update_snaps(rbd_dev);
1756         if (ret < 0)
1757                 goto done_unlock;
1758
1759         ret = count;
1760
1761 done_unlock:
1762         mutex_unlock(&ctl_mutex);
1763         kfree(seg_name);
1764
1765         return ret;
1766 }
1767
1768 static struct class_attribute class_rbd_attrs[] = {
1769         __ATTR(add,             0200, NULL, class_rbd_add),
1770         __ATTR(remove,          0200, NULL, class_rbd_remove),
1771         __ATTR(list,            0444, class_rbd_list, NULL),
1772         __ATTR(snaps_refresh,   0200, NULL, class_rbd_snaps_refresh),
1773         __ATTR(snap_create,     0200, NULL, class_rbd_snap_create),
1774         __ATTR(snaps_list,      0444, class_rbd_snaps_list, NULL),
1775         __ATTR(snap_rollback,   0200, NULL, class_rbd_rollback),
1776         __ATTR_NULL
1777 };
1778
1779 /*
1780  * create control files in sysfs
1781  * /sys/class/rbd/...
1782  */
1783 static int rbd_sysfs_init(void)
1784 {
1785         int ret = -ENOMEM;
1786
1787         class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1788         if (!class_rbd)
1789                 goto out;
1790
1791         class_rbd->name = DRV_NAME;
1792         class_rbd->owner = THIS_MODULE;
1793         class_rbd->class_release = class_rbd_release;
1794         class_rbd->class_attrs = class_rbd_attrs;
1795
1796         ret = class_register(class_rbd);
1797         if (ret)
1798                 goto out_class;
1799         return 0;
1800
1801 out_class:
1802         kfree(class_rbd);
1803         class_rbd = NULL;
1804         pr_err(DRV_NAME ": failed to create class rbd\n");
1805 out:
1806         return ret;
1807 }
1808
1809 static void rbd_sysfs_cleanup(void)
1810 {
1811         if (class_rbd)
1812                 class_destroy(class_rbd);
1813         class_rbd = NULL;
1814 }
1815
1816 int __init rbd_init(void)
1817 {
1818         int rc;
1819
1820         rc = rbd_sysfs_init();
1821         if (rc)
1822                 return rc;
1823         spin_lock_init(&node_lock);
1824         pr_info("loaded " DRV_NAME_LONG "\n");
1825         return 0;
1826 }
1827
1828 void __exit rbd_exit(void)
1829 {
1830         rbd_sysfs_cleanup();
1831 }
1832
1833 module_init(rbd_init);
1834 module_exit(rbd_exit);
1835
1836 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1837 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1838 MODULE_DESCRIPTION("rados block device");
1839
1840 /* following authorship retained from original osdblk.c */
1841 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1842
1843 MODULE_LICENSE("GPL");