xref: /illumos-kvm-cmd/block/rbd.c (revision 68396ea9)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010 Christian Brunner <chb@muc.de>
5  *
6  * This work is licensed under the terms of the GNU GPL, version 2.  See
7  * the COPYING file in the top-level directory.
8  *
9  */
10 
11 #include "qemu-common.h"
12 #include "qemu-error.h"
13 
14 #include "rbd_types.h"
15 #include "block_int.h"
16 
17 #include <rados/librados.h>
18 
19 
20 
21 /*
22  * When specifying the image filename use:
23  *
24  * rbd:poolname/devicename
25  *
26  * poolname must be the name of an existing rados pool
27  *
28  * devicename is the basename for all objects used to
29  * emulate the raw device.
30  *
31  * Metadata information (image size, ...) is stored in an
32  * object with the name "devicename.rbd".
33  *
34  * The raw device is split into 4MB sized objects by default.
35  * The sequencenumber is encoded in a 12 byte long hex-string,
36  * and is attached to the devicename, separated by a dot.
37  * e.g. "devicename.1234567890ab"
38  *
39  */
40 
41 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
42 
43 typedef struct RBDAIOCB {
44     BlockDriverAIOCB common;
45     QEMUBH *bh;
46     int ret;
47     QEMUIOVector *qiov;
48     char *bounce;
49     int write;
50     int64_t sector_num;
51     int aiocnt;
52     int error;
53     struct BDRVRBDState *s;
54     int cancelled;
55 } RBDAIOCB;
56 
57 typedef struct RADOSCB {
58     int rcbid;
59     RBDAIOCB *acb;
60     struct BDRVRBDState *s;
61     int done;
62     int64_t segsize;
63     char *buf;
64     int ret;
65 } RADOSCB;
66 
67 #define RBD_FD_READ 0
68 #define RBD_FD_WRITE 1
69 
70 typedef struct BDRVRBDState {
71     int fds[2];
72     rados_pool_t pool;
73     rados_pool_t header_pool;
74     char name[RBD_MAX_OBJ_NAME_SIZE];
75     char block_name[RBD_MAX_BLOCK_NAME_SIZE];
76     uint64_t size;
77     uint64_t objsize;
78     int qemu_aio_count;
79     int event_reader_pos;
80     RADOSCB *event_rcb;
81 } BDRVRBDState;
82 
83 typedef struct rbd_obj_header_ondisk RbdHeader1;
84 
85 static void rbd_aio_bh_cb(void *opaque);
86 
rbd_next_tok(char * dst,int dst_len,char * src,char delim,const char * name,char ** p)87 static int rbd_next_tok(char *dst, int dst_len,
88                         char *src, char delim,
89                         const char *name,
90                         char **p)
91 {
92     int l;
93     char *end;
94 
95     *p = NULL;
96 
97     if (delim != '\0') {
98         end = strchr(src, delim);
99         if (end) {
100             *p = end + 1;
101             *end = '\0';
102         }
103     }
104     l = strlen(src);
105     if (l >= dst_len) {
106         error_report("%s too long", name);
107         return -EINVAL;
108     } else if (l == 0) {
109         error_report("%s too short", name);
110         return -EINVAL;
111     }
112 
113     pstrcpy(dst, dst_len, src);
114 
115     return 0;
116 }
117 
rbd_parsename(const char * filename,char * pool,int pool_len,char * snap,int snap_len,char * name,int name_len)118 static int rbd_parsename(const char *filename,
119                          char *pool, int pool_len,
120                          char *snap, int snap_len,
121                          char *name, int name_len)
122 {
123     const char *start;
124     char *p, *buf;
125     int ret;
126 
127     if (!strstart(filename, "rbd:", &start)) {
128         return -EINVAL;
129     }
130 
131     buf = qemu_strdup(start);
132     p = buf;
133 
134     ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
135     if (ret < 0 || !p) {
136         ret = -EINVAL;
137         goto done;
138     }
139     ret = rbd_next_tok(name, name_len, p, '@', "object name", &p);
140     if (ret < 0) {
141         goto done;
142     }
143     if (!p) {
144         *snap = '\0';
145         goto done;
146     }
147 
148     ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
149 
150 done:
151     qemu_free(buf);
152     return ret;
153 }
154 
create_tmap_op(uint8_t op,const char * name,char ** tmap_desc)155 static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
156 {
157     uint32_t len = strlen(name);
158     uint32_t len_le = cpu_to_le32(len);
159     /* total_len = encoding op + name + empty buffer */
160     uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
161     uint8_t *desc = NULL;
162 
163     desc = qemu_malloc(total_len);
164 
165     *tmap_desc = (char *)desc;
166 
167     *desc = op;
168     desc++;
169     memcpy(desc, &len_le, sizeof(len_le));
170     desc += sizeof(len_le);
171     memcpy(desc, name, len);
172     desc += len;
173     len = 0; /* no need for endian conversion for 0 */
174     memcpy(desc, &len, sizeof(len));
175     desc += sizeof(len);
176 
177     return (char *)desc - *tmap_desc;
178 }
179 
free_tmap_op(char * tmap_desc)180 static void free_tmap_op(char *tmap_desc)
181 {
182     qemu_free(tmap_desc);
183 }
184 
rbd_register_image(rados_pool_t pool,const char * name)185 static int rbd_register_image(rados_pool_t pool, const char *name)
186 {
187     char *tmap_desc;
188     const char *dir = RBD_DIRECTORY;
189     int ret;
190 
191     ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
192     if (ret < 0) {
193         return ret;
194     }
195 
196     ret = rados_tmap_update(pool, dir, tmap_desc, ret);
197     free_tmap_op(tmap_desc);
198 
199     return ret;
200 }
201 
touch_rbd_info(rados_pool_t pool,const char * info_oid)202 static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
203 {
204     int r = rados_write(pool, info_oid, 0, NULL, 0);
205     if (r < 0) {
206         return r;
207     }
208     return 0;
209 }
210 
rbd_assign_bid(rados_pool_t pool,uint64_t * id)211 static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
212 {
213     uint64_t out[1];
214     const char *info_oid = RBD_INFO;
215 
216     *id = 0;
217 
218     int r = touch_rbd_info(pool, info_oid);
219     if (r < 0) {
220         return r;
221     }
222 
223     r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
224                    0, (char *)out, sizeof(out));
225     if (r < 0) {
226         return r;
227     }
228 
229     le64_to_cpus(out);
230     *id = out[0];
231 
232     return 0;
233 }
234 
rbd_create(const char * filename,QEMUOptionParameter * options)235 static int rbd_create(const char *filename, QEMUOptionParameter *options)
236 {
237     int64_t bytes = 0;
238     int64_t objsize;
239     uint64_t size;
240     time_t mtime;
241     uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
242     char pool[RBD_MAX_SEG_NAME_SIZE];
243     char n[RBD_MAX_SEG_NAME_SIZE];
244     char name[RBD_MAX_OBJ_NAME_SIZE];
245     char snap_buf[RBD_MAX_SEG_NAME_SIZE];
246     char *snap = NULL;
247     RbdHeader1 header;
248     rados_pool_t p;
249     uint64_t bid;
250     uint32_t hi, lo;
251     int ret;
252 
253     if (rbd_parsename(filename,
254                       pool, sizeof(pool),
255                       snap_buf, sizeof(snap_buf),
256                       name, sizeof(name)) < 0) {
257         return -EINVAL;
258     }
259     if (snap_buf[0] != '\0') {
260         snap = snap_buf;
261     }
262 
263     snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX);
264 
265     /* Read out options */
266     while (options && options->name) {
267         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
268             bytes = options->value.n;
269         } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
270             if (options->value.n) {
271                 objsize = options->value.n;
272                 if ((objsize - 1) & objsize) {    /* not a power of 2? */
273                     error_report("obj size needs to be power of 2");
274                     return -EINVAL;
275                 }
276                 if (objsize < 4096) {
277                     error_report("obj size too small");
278                     return -EINVAL;
279                 }
280 		obj_order = ffs(objsize) - 1;
281             }
282         }
283         options++;
284     }
285 
286     memset(&header, 0, sizeof(header));
287     pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
288     pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
289     pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
290     header.image_size = cpu_to_le64(bytes);
291     header.options.order = obj_order;
292     header.options.crypt_type = RBD_CRYPT_NONE;
293     header.options.comp_type = RBD_COMP_NONE;
294     header.snap_seq = 0;
295     header.snap_count = 0;
296 
297     if (rados_initialize(0, NULL) < 0) {
298         error_report("error initializing");
299         return -EIO;
300     }
301 
302     if (rados_open_pool(pool, &p)) {
303         error_report("error opening pool %s", pool);
304         rados_deinitialize();
305         return -EIO;
306     }
307 
308     /* check for existing rbd header file */
309     ret = rados_stat(p, n, &size, &mtime);
310     if (ret == 0) {
311         ret=-EEXIST;
312         goto done;
313     }
314 
315     ret = rbd_assign_bid(p, &bid);
316     if (ret < 0) {
317         error_report("failed assigning block id");
318         rados_deinitialize();
319         return -EIO;
320     }
321     hi = bid >> 32;
322     lo = bid & 0xFFFFFFFF;
323     snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
324 
325     /* create header file */
326     ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
327     if (ret < 0) {
328         goto done;
329     }
330 
331     ret = rbd_register_image(p, name);
332 done:
333     rados_close_pool(p);
334     rados_deinitialize();
335 
336     return ret;
337 }
338 
339 /*
340  * This aio completion is being called from rbd_aio_event_reader() and
341  * runs in qemu context. It schedules a bh, but just in case the aio
342  * was not cancelled before.
343  */
rbd_complete_aio(RADOSCB * rcb)344 static void rbd_complete_aio(RADOSCB *rcb)
345 {
346     RBDAIOCB *acb = rcb->acb;
347     int64_t r;
348 
349     acb->aiocnt--;
350 
351     if (acb->cancelled) {
352         if (!acb->aiocnt) {
353             qemu_vfree(acb->bounce);
354             qemu_aio_release(acb);
355         }
356         goto done;
357     }
358 
359     r = rcb->ret;
360 
361     if (acb->write) {
362         if (r < 0) {
363             acb->ret = r;
364             acb->error = 1;
365         } else if (!acb->error) {
366             acb->ret += rcb->segsize;
367         }
368     } else {
369         if (r == -ENOENT) {
370             memset(rcb->buf, 0, rcb->segsize);
371             if (!acb->error) {
372                 acb->ret += rcb->segsize;
373             }
374         } else if (r < 0) {
375 	    memset(rcb->buf, 0, rcb->segsize);
376             acb->ret = r;
377             acb->error = 1;
378         } else if (r < rcb->segsize) {
379             memset(rcb->buf + r, 0, rcb->segsize - r);
380             if (!acb->error) {
381                 acb->ret += rcb->segsize;
382             }
383         } else if (!acb->error) {
384             acb->ret += r;
385         }
386     }
387     /* Note that acb->bh can be NULL in case where the aio was cancelled */
388     if (!acb->aiocnt) {
389         acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
390         qemu_bh_schedule(acb->bh);
391     }
392 done:
393     qemu_free(rcb);
394 }
395 
396 /*
397  * aio fd read handler. It runs in the qemu context and calls the
398  * completion handling of completed rados aio operations.
399  */
rbd_aio_event_reader(void * opaque)400 static void rbd_aio_event_reader(void *opaque)
401 {
402     BDRVRBDState *s = opaque;
403 
404     ssize_t ret;
405 
406     do {
407         char *p = (char *)&s->event_rcb;
408 
409         /* now read the rcb pointer that was sent from a non qemu thread */
410         if ((ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
411                         sizeof(s->event_rcb) - s->event_reader_pos)) > 0) {
412             if (ret > 0) {
413                 s->event_reader_pos += ret;
414                 if (s->event_reader_pos == sizeof(s->event_rcb)) {
415                     s->event_reader_pos = 0;
416                     rbd_complete_aio(s->event_rcb);
417                     s->qemu_aio_count --;
418                 }
419             }
420         }
421     } while (ret < 0 && errno == EINTR);
422 }
423 
rbd_aio_flush_cb(void * opaque)424 static int rbd_aio_flush_cb(void *opaque)
425 {
426     BDRVRBDState *s = opaque;
427 
428     return (s->qemu_aio_count > 0);
429 }
430 
431 
rbd_set_snapc(rados_pool_t pool,const char * snap,RbdHeader1 * header)432 static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
433 {
434     uint32_t snap_count = le32_to_cpu(header->snap_count);
435     rados_snap_t *snaps = NULL;
436     rados_snap_t seq;
437     uint32_t i;
438     uint64_t snap_names_len = le64_to_cpu(header->snap_names_len);
439     int r;
440     rados_snap_t snapid = 0;
441 
442     if (snap_count) {
443         const char *header_snap = (const char *)&header->snaps[snap_count];
444         const char *end = header_snap + snap_names_len;
445         snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
446 
447         for (i=0; i < snap_count; i++) {
448             snaps[i] = le64_to_cpu(header->snaps[i].id);
449 
450             if (snap && strcmp(snap, header_snap) == 0) {
451                 snapid = snaps[i];
452             }
453 
454             header_snap += strlen(header_snap) + 1;
455             if (header_snap > end) {
456                 error_report("bad header, snapshot list broken");
457             }
458         }
459     }
460 
461     if (snap && !snapid) {
462         error_report("snapshot not found");
463         qemu_free(snaps);
464         return -ENOENT;
465     }
466     seq = le32_to_cpu(header->snap_seq);
467 
468     r = rados_set_snap_context(pool, seq, snaps, snap_count);
469 
470     rados_set_snap(pool, snapid);
471 
472     qemu_free(snaps);
473 
474     return r;
475 }
476 
477 #define BUF_READ_START_LEN    4096
478 
rbd_read_header(BDRVRBDState * s,char ** hbuf)479 static int rbd_read_header(BDRVRBDState *s, char **hbuf)
480 {
481     char *buf = NULL;
482     char n[RBD_MAX_SEG_NAME_SIZE];
483     uint64_t len = BUF_READ_START_LEN;
484     int r;
485 
486     snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
487 
488     buf = qemu_malloc(len);
489 
490     r = rados_read(s->header_pool, n, 0, buf, len);
491     if (r < 0) {
492         goto failed;
493     }
494 
495     if (r < len) {
496         goto done;
497     }
498 
499     qemu_free(buf);
500     buf = qemu_malloc(len);
501 
502     r = rados_stat(s->header_pool, n, &len, NULL);
503     if (r < 0) {
504         goto failed;
505     }
506 
507     r = rados_read(s->header_pool, n, 0, buf, len);
508     if (r < 0) {
509         goto failed;
510     }
511 
512 done:
513     *hbuf = buf;
514     return 0;
515 
516 failed:
517     qemu_free(buf);
518     return r;
519 }
520 
rbd_open(BlockDriverState * bs,const char * filename,int flags)521 static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
522 {
523     BDRVRBDState *s = bs->opaque;
524     RbdHeader1 *header;
525     char pool[RBD_MAX_SEG_NAME_SIZE];
526     char snap_buf[RBD_MAX_SEG_NAME_SIZE];
527     char *snap = NULL;
528     char *hbuf = NULL;
529     int r;
530 
531     if (rbd_parsename(filename, pool, sizeof(pool),
532                       snap_buf, sizeof(snap_buf),
533                       s->name, sizeof(s->name)) < 0) {
534         return -EINVAL;
535     }
536     if (snap_buf[0] != '\0') {
537         snap = snap_buf;
538     }
539 
540     if ((r = rados_initialize(0, NULL)) < 0) {
541         error_report("error initializing");
542         return r;
543     }
544 
545     if ((r = rados_open_pool(pool, &s->pool))) {
546         error_report("error opening pool %s", pool);
547         rados_deinitialize();
548         return r;
549     }
550 
551     if ((r = rados_open_pool(pool, &s->header_pool))) {
552         error_report("error opening pool %s", pool);
553         rados_deinitialize();
554         return r;
555     }
556 
557     if ((r = rbd_read_header(s, &hbuf)) < 0) {
558         error_report("error reading header from %s", s->name);
559         goto failed;
560     }
561 
562     if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
563         error_report("Invalid header signature");
564         r = -EMEDIUMTYPE;
565         goto failed;
566     }
567 
568     if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
569         error_report("Unknown image version");
570         r = -EMEDIUMTYPE;
571         goto failed;
572     }
573 
574     header = (RbdHeader1 *) hbuf;
575     s->size = le64_to_cpu(header->image_size);
576     s->objsize = 1ULL << header->options.order;
577     memcpy(s->block_name, header->block_name, sizeof(header->block_name));
578 
579     r = rbd_set_snapc(s->pool, snap, header);
580     if (r < 0) {
581         error_report("failed setting snap context: %s", strerror(-r));
582         goto failed;
583     }
584 
585     bs->read_only = (snap != NULL);
586 
587     s->event_reader_pos = 0;
588     r = qemu_pipe(s->fds);
589     if (r < 0) {
590         error_report("error opening eventfd");
591         goto failed;
592     }
593     fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
594     fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
595     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL,
596         rbd_aio_flush_cb, NULL, s);
597 
598     qemu_free(hbuf);
599 
600     return 0;
601 
602 failed:
603     qemu_free(hbuf);
604 
605     rados_close_pool(s->header_pool);
606     rados_close_pool(s->pool);
607     rados_deinitialize();
608     return r;
609 }
610 
rbd_close(BlockDriverState * bs)611 static void rbd_close(BlockDriverState *bs)
612 {
613     BDRVRBDState *s = bs->opaque;
614 
615     close(s->fds[0]);
616     close(s->fds[1]);
617     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
618         NULL);
619 
620     rados_close_pool(s->header_pool);
621     rados_close_pool(s->pool);
622     rados_deinitialize();
623 }
624 
625 /*
626  * Cancel aio. Since we don't reference acb in a non qemu threads,
627  * it is safe to access it here.
628  */
rbd_aio_cancel(BlockDriverAIOCB * blockacb)629 static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
630 {
631     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
632     acb->cancelled = 1;
633 }
634 
635 static AIOPool rbd_aio_pool = {
636     .aiocb_size = sizeof(RBDAIOCB),
637     .cancel = rbd_aio_cancel,
638 };
639 
640 /*
641  * This is the callback function for rados_aio_read and _write
642  *
643  * Note: this function is being called from a non qemu thread so
644  * we need to be careful about what we do here. Generally we only
645  * write to the block notification pipe, and do the rest of the
646  * io completion handling from rbd_aio_event_reader() which
647  * runs in a qemu context.
648  */
rbd_finish_aiocb(rados_completion_t c,RADOSCB * rcb)649 static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
650 {
651     int ret;
652     rcb->ret = rados_aio_get_return_value(c);
653     rados_aio_release(c);
654     while (1) {
655         fd_set wfd;
656         int fd = rcb->s->fds[RBD_FD_WRITE];
657 
658         /* send the rcb pointer to the qemu thread that is responsible
659            for the aio completion. Must do it in a qemu thread context */
660         ret = write(fd, (void *)&rcb, sizeof(rcb));
661         if (ret >= 0) {
662             break;
663         }
664         if (errno == EINTR) {
665             continue;
666 	}
667         if (errno != EAGAIN) {
668             break;
669 	}
670 
671         FD_ZERO(&wfd);
672         FD_SET(fd, &wfd);
673         do {
674             ret = select(fd + 1, NULL, &wfd, NULL, NULL);
675         } while (ret < 0 && errno == EINTR);
676     }
677 
678     if (ret < 0) {
679         error_report("failed writing to acb->s->fds\n");
680         qemu_free(rcb);
681     }
682 }
683 
684 /* Callback when all queued rados_aio requests are complete */
685 
rbd_aio_bh_cb(void * opaque)686 static void rbd_aio_bh_cb(void *opaque)
687 {
688     RBDAIOCB *acb = opaque;
689 
690     if (!acb->write) {
691         qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
692     }
693     qemu_vfree(acb->bounce);
694     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
695     qemu_bh_delete(acb->bh);
696     acb->bh = NULL;
697 
698     qemu_aio_release(acb);
699 }
700 
rbd_aio_rw_vector(BlockDriverState * bs,int64_t sector_num,QEMUIOVector * qiov,int nb_sectors,BlockDriverCompletionFunc * cb,void * opaque,int write)701 static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
702                                            int64_t sector_num,
703                                            QEMUIOVector *qiov,
704                                            int nb_sectors,
705                                            BlockDriverCompletionFunc *cb,
706                                            void *opaque, int write)
707 {
708     RBDAIOCB *acb;
709     RADOSCB *rcb;
710     rados_completion_t c;
711     char n[RBD_MAX_SEG_NAME_SIZE];
712     int64_t segnr, segoffs, segsize, last_segnr;
713     int64_t off, size;
714     char *buf;
715 
716     BDRVRBDState *s = bs->opaque;
717 
718     acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
719     acb->write = write;
720     acb->qiov = qiov;
721     acb->bounce = qemu_blockalign(bs, qiov->size);
722     acb->aiocnt = 0;
723     acb->ret = 0;
724     acb->error = 0;
725     acb->s = s;
726     acb->cancelled = 0;
727     acb->bh = NULL;
728 
729     if (write) {
730         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
731     }
732 
733     buf = acb->bounce;
734 
735     off = sector_num * BDRV_SECTOR_SIZE;
736     size = nb_sectors * BDRV_SECTOR_SIZE;
737     segnr = off / s->objsize;
738     segoffs = off % s->objsize;
739     segsize = s->objsize - segoffs;
740 
741     last_segnr = ((off + size - 1) / s->objsize);
742     acb->aiocnt = (last_segnr - segnr) + 1;
743 
744     s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
745 
746     while (size > 0) {
747         if (size < segsize) {
748             segsize = size;
749         }
750 
751         snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name,
752                  segnr);
753 
754         rcb = qemu_malloc(sizeof(RADOSCB));
755         rcb->done = 0;
756         rcb->acb = acb;
757         rcb->segsize = segsize;
758         rcb->buf = buf;
759         rcb->s = acb->s;
760 
761         if (write) {
762             rados_aio_create_completion(rcb, NULL,
763                                         (rados_callback_t) rbd_finish_aiocb,
764                                         &c);
765             rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
766         } else {
767             rados_aio_create_completion(rcb,
768                                         (rados_callback_t) rbd_finish_aiocb,
769                                         NULL, &c);
770             rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
771         }
772 
773         buf += segsize;
774         size -= segsize;
775         segoffs = 0;
776         segsize = s->objsize;
777         segnr++;
778     }
779 
780     return &acb->common;
781 }
782 
rbd_aio_readv(BlockDriverState * bs,int64_t sector_num,QEMUIOVector * qiov,int nb_sectors,BlockDriverCompletionFunc * cb,void * opaque)783 static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
784                                        int64_t sector_num, QEMUIOVector * qiov,
785                                        int nb_sectors,
786                                        BlockDriverCompletionFunc * cb,
787                                        void *opaque)
788 {
789     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
790 }
791 
rbd_aio_writev(BlockDriverState * bs,int64_t sector_num,QEMUIOVector * qiov,int nb_sectors,BlockDriverCompletionFunc * cb,void * opaque)792 static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
793                                         int64_t sector_num, QEMUIOVector * qiov,
794                                         int nb_sectors,
795                                         BlockDriverCompletionFunc * cb,
796                                         void *opaque)
797 {
798     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
799 }
800 
rbd_getinfo(BlockDriverState * bs,BlockDriverInfo * bdi)801 static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
802 {
803     BDRVRBDState *s = bs->opaque;
804     bdi->cluster_size = s->objsize;
805     return 0;
806 }
807 
rbd_getlength(BlockDriverState * bs)808 static int64_t rbd_getlength(BlockDriverState * bs)
809 {
810     BDRVRBDState *s = bs->opaque;
811 
812     return s->size;
813 }
814 
rbd_snap_create(BlockDriverState * bs,QEMUSnapshotInfo * sn_info)815 static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
816 {
817     BDRVRBDState *s = bs->opaque;
818     char inbuf[512], outbuf[128];
819     uint64_t snap_id;
820     int r;
821     char *p = inbuf;
822     char *end = inbuf + sizeof(inbuf);
823     char n[RBD_MAX_SEG_NAME_SIZE];
824     char *hbuf = NULL;
825     RbdHeader1 *header;
826 
827     if (sn_info->name[0] == '\0') {
828         return -EINVAL; /* we need a name for rbd snapshots */
829     }
830 
831     /*
832      * rbd snapshots are using the name as the user controlled unique identifier
833      * we can't use the rbd snapid for that purpose, as it can't be set
834      */
835     if (sn_info->id_str[0] != '\0' &&
836         strcmp(sn_info->id_str, sn_info->name) != 0) {
837         return -EINVAL;
838     }
839 
840     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
841         return -ERANGE;
842     }
843 
844     r = rados_selfmanaged_snap_create(s->header_pool, &snap_id);
845     if (r < 0) {
846         error_report("failed to create snap id: %s", strerror(-r));
847         return r;
848     }
849 
850     *(uint32_t *)p = strlen(sn_info->name);
851     cpu_to_le32s((uint32_t *)p);
852     p += sizeof(uint32_t);
853     strncpy(p, sn_info->name, end - p);
854     p += strlen(p);
855     if (p + sizeof(snap_id) > end) {
856         error_report("invalid input parameter");
857         return -EINVAL;
858     }
859 
860     *(uint64_t *)p = snap_id;
861     cpu_to_le64s((uint64_t *)p);
862 
863     snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
864 
865     r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf,
866                    sizeof(inbuf), outbuf, sizeof(outbuf));
867     if (r < 0) {
868         error_report("rbd.snap_add execution failed failed: %s", strerror(-r));
869         return r;
870     }
871 
872     sprintf(sn_info->id_str, "%s", sn_info->name);
873 
874     r = rbd_read_header(s, &hbuf);
875     if (r < 0) {
876         error_report("failed reading header: %s", strerror(-r));
877         return r;
878     }
879 
880     header = (RbdHeader1 *) hbuf;
881     r = rbd_set_snapc(s->pool, sn_info->name, header);
882     if (r < 0) {
883         error_report("failed setting snap context: %s", strerror(-r));
884         goto failed;
885     }
886 
887     return 0;
888 
889 failed:
890     qemu_free(header);
891     return r;
892 }
893 
decode32(char ** p,const char * end,uint32_t * v)894 static int decode32(char **p, const char *end, uint32_t *v)
895 {
896     if (*p + 4 > end) {
897 	return -ERANGE;
898     }
899 
900     *v = *(uint32_t *)(*p);
901     le32_to_cpus(v);
902     *p += 4;
903     return 0;
904 }
905 
decode64(char ** p,const char * end,uint64_t * v)906 static int decode64(char **p, const char *end, uint64_t *v)
907 {
908     if (*p + 8 > end) {
909         return -ERANGE;
910     }
911 
912     *v = *(uint64_t *)(*p);
913     le64_to_cpus(v);
914     *p += 8;
915     return 0;
916 }
917 
decode_str(char ** p,const char * end,char ** s)918 static int decode_str(char **p, const char *end, char **s)
919 {
920     uint32_t len;
921     int r;
922 
923     if ((r = decode32(p, end, &len)) < 0) {
924         return r;
925     }
926 
927     *s = qemu_malloc(len + 1);
928     memcpy(*s, *p, len);
929     *p += len;
930     (*s)[len] = '\0';
931 
932     return len;
933 }
934 
rbd_snap_list(BlockDriverState * bs,QEMUSnapshotInfo ** psn_tab)935 static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
936 {
937     BDRVRBDState *s = bs->opaque;
938     char n[RBD_MAX_SEG_NAME_SIZE];
939     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
940     RbdHeader1 *header;
941     char *hbuf = NULL;
942     char *outbuf = NULL, *end, *buf;
943     uint64_t len;
944     uint64_t snap_seq;
945     uint32_t snap_count;
946     int r, i;
947 
948     /* read header to estimate how much space we need to read the snap
949      * list */
950     if ((r = rbd_read_header(s, &hbuf)) < 0) {
951         goto done_err;
952     }
953     header = (RbdHeader1 *)hbuf;
954     len = le64_to_cpu(header->snap_names_len);
955     len += 1024; /* should have already been enough, but new snapshots might
956                     already been created since we read the header. just allocate
957                     a bit more, so that in most cases it'll suffice anyway */
958     qemu_free(hbuf);
959 
960     snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
961     while (1) {
962         qemu_free(outbuf);
963         outbuf = qemu_malloc(len);
964 
965         r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0,
966                        outbuf, len);
967         if (r < 0) {
968             error_report("rbd.snap_list execution failed failed: %s", strerror(-r));
969             goto done_err;
970         }
971         if (r != len) {
972             break;
973 	}
974 
975         /* if we're here, we probably raced with some snaps creation */
976         len *= 2;
977     }
978     buf = outbuf;
979     end = buf + len;
980 
981     if ((r = decode64(&buf, end, &snap_seq)) < 0) {
982         goto done_err;
983     }
984     if ((r = decode32(&buf, end, &snap_count)) < 0) {
985         goto done_err;
986     }
987 
988     sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo));
989     for (i = 0; i < snap_count; i++) {
990         uint64_t id, image_size;
991         char *snap_name;
992 
993         if ((r = decode64(&buf, end, &id)) < 0) {
994             goto done_err;
995         }
996         if ((r = decode64(&buf, end, &image_size)) < 0) {
997             goto done_err;
998         }
999         if ((r = decode_str(&buf, end, &snap_name)) < 0) {
1000             goto done_err;
1001         }
1002 
1003         sn_info = sn_tab + i;
1004         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1005         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1006         qemu_free(snap_name);
1007 
1008         sn_info->vm_state_size = image_size;
1009         sn_info->date_sec = 0;
1010         sn_info->date_nsec = 0;
1011         sn_info->vm_clock_nsec = 0;
1012     }
1013     *psn_tab = sn_tab;
1014     qemu_free(outbuf);
1015     return snap_count;
1016 done_err:
1017     qemu_free(sn_tab);
1018     qemu_free(outbuf);
1019     return r;
1020 }
1021 
1022 static QEMUOptionParameter rbd_create_options[] = {
1023     {
1024      .name = BLOCK_OPT_SIZE,
1025      .type = OPT_SIZE,
1026      .help = "Virtual disk size"
1027     },
1028     {
1029      .name = BLOCK_OPT_CLUSTER_SIZE,
1030      .type = OPT_SIZE,
1031      .help = "RBD object size"
1032     },
1033     {NULL}
1034 };
1035 
1036 static BlockDriver bdrv_rbd = {
1037     .format_name        = "rbd",
1038     .instance_size      = sizeof(BDRVRBDState),
1039     .bdrv_file_open     = rbd_open,
1040     .bdrv_close         = rbd_close,
1041     .bdrv_create        = rbd_create,
1042     .bdrv_get_info      = rbd_getinfo,
1043     .create_options     = rbd_create_options,
1044     .bdrv_getlength     = rbd_getlength,
1045     .protocol_name      = "rbd",
1046 
1047     .bdrv_aio_readv     = rbd_aio_readv,
1048     .bdrv_aio_writev    = rbd_aio_writev,
1049 
1050     .bdrv_snapshot_create = rbd_snap_create,
1051     .bdrv_snapshot_list = rbd_snap_list,
1052 };
1053 
bdrv_rbd_init(void)1054 static void bdrv_rbd_init(void)
1055 {
1056     bdrv_register(&bdrv_rbd);
1057 }
1058 
1059 block_init(bdrv_rbd_init);
1060