xref: /illumos-kvm-cmd/posix-aio-compat.c (revision 1291c762)
1 /*
2  * QEMU posix-aio emulation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  */
13 
14 #include <sys/ioctl.h>
15 #include <sys/types.h>
16 #include <pthread.h>
17 #include <unistd.h>
18 #include <errno.h>
19 #include <time.h>
20 #include <signal.h>
21 #include <string.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 
25 #include "qemu-queue.h"
26 #include "osdep.h"
27 #include "sysemu.h"
28 #include "qemu-common.h"
29 #include "trace.h"
30 #include "block_int.h"
31 #include "compatfd.h"
32 
33 #include "block/raw-posix-aio.h"
34 
35 
36 struct qemu_paiocb {
37     BlockDriverAIOCB common;
38     int aio_fildes;
39     union {
40         struct iovec *aio_iov;
41         void *aio_ioctl_buf;
42     };
43     int aio_niov;
44     size_t aio_nbytes;
45 #define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
46     int ev_signo;
47     off_t aio_offset;
48 
49     QTAILQ_ENTRY(qemu_paiocb) node;
50     int aio_type;
51     ssize_t ret;
52     int active;
53     struct qemu_paiocb *next;
54 
55     int async_context_id;
56 };
57 
58 typedef struct PosixAioState {
59     int fd;
60     struct qemu_paiocb *first_aio;
61 } PosixAioState;
62 
63 
64 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
65 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
66 static pthread_t thread_id;
67 static pthread_attr_t attr;
68 static int max_threads = 64;
69 static int cur_threads = 0;
70 static int idle_threads = 0;
71 static QTAILQ_HEAD(, qemu_paiocb) request_list;
72 
73 #ifdef CONFIG_PREADV
74 static int preadv_present = 1;
75 #else
76 static int preadv_present = 0;
77 #endif
78 
die2(int err,const char * what)79 static void die2(int err, const char *what)
80 {
81     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
82     abort();
83 }
84 
die(const char * what)85 static void die(const char *what)
86 {
87     die2(errno, what);
88 }
89 
mutex_lock(pthread_mutex_t * mutex)90 static void mutex_lock(pthread_mutex_t *mutex)
91 {
92     int ret = pthread_mutex_lock(mutex);
93     if (ret) die2(ret, "pthread_mutex_lock");
94 }
95 
mutex_unlock(pthread_mutex_t * mutex)96 static void mutex_unlock(pthread_mutex_t *mutex)
97 {
98     int ret = pthread_mutex_unlock(mutex);
99     if (ret) die2(ret, "pthread_mutex_unlock");
100 }
101 
cond_timedwait(pthread_cond_t * cond,pthread_mutex_t * mutex,struct timespec * ts)102 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
103                            struct timespec *ts)
104 {
105     int ret = pthread_cond_timedwait(cond, mutex, ts);
106     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
107     return ret;
108 }
109 
cond_signal(pthread_cond_t * cond)110 static void cond_signal(pthread_cond_t *cond)
111 {
112     int ret = pthread_cond_signal(cond);
113     if (ret) die2(ret, "pthread_cond_signal");
114 }
115 
thread_create(pthread_t * thread,pthread_attr_t * attr,void * (* start_routine)(void *),void * arg)116 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
117                           void *(*start_routine)(void*), void *arg)
118 {
119     int ret = pthread_create(thread, attr, start_routine, arg);
120     if (ret) die2(ret, "pthread_create");
121 }
122 
handle_aiocb_ioctl(struct qemu_paiocb * aiocb)123 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
124 {
125     int ret;
126 
127     ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
128     if (ret == -1)
129         return -errno;
130 
131     /*
132      * This looks weird, but the aio code only consideres a request
133      * successful if it has written the number full number of bytes.
134      *
135      * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
136      * so in fact we return the ioctl command here to make posix_aio_read()
137      * happy..
138      */
139     return aiocb->aio_nbytes;
140 }
141 
handle_aiocb_flush(struct qemu_paiocb * aiocb)142 static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
143 {
144     int ret;
145 
146     ret = qemu_fdatasync(aiocb->aio_fildes);
147     if (ret == -1)
148         return -errno;
149     return 0;
150 }
151 
152 #ifdef CONFIG_PREADV
153 
154 static ssize_t
qemu_preadv(int fd,const struct iovec * iov,int nr_iov,off_t offset)155 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
156 {
157     return preadv(fd, iov, nr_iov, offset);
158 }
159 
160 static ssize_t
qemu_pwritev(int fd,const struct iovec * iov,int nr_iov,off_t offset)161 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
162 {
163     return pwritev(fd, iov, nr_iov, offset);
164 }
165 
166 #else
167 
168 static ssize_t
qemu_preadv(int fd,const struct iovec * iov,int nr_iov,off_t offset)169 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
170 {
171     return -ENOSYS;
172 }
173 
174 static ssize_t
qemu_pwritev(int fd,const struct iovec * iov,int nr_iov,off_t offset)175 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
176 {
177     return -ENOSYS;
178 }
179 
180 #endif
181 
handle_aiocb_rw_vector(struct qemu_paiocb * aiocb)182 static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
183 {
184     size_t offset = 0;
185     ssize_t len;
186 
187     do {
188         if (aiocb->aio_type & QEMU_AIO_WRITE)
189             len = qemu_pwritev(aiocb->aio_fildes,
190                                aiocb->aio_iov,
191                                aiocb->aio_niov,
192                                aiocb->aio_offset + offset);
193          else
194             len = qemu_preadv(aiocb->aio_fildes,
195                               aiocb->aio_iov,
196                               aiocb->aio_niov,
197                               aiocb->aio_offset + offset);
198     } while (len == -1 && errno == EINTR);
199 
200     if (len == -1)
201         return -errno;
202     return len;
203 }
204 
handle_aiocb_rw_linear(struct qemu_paiocb * aiocb,char * buf)205 static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
206 {
207     ssize_t offset = 0;
208     ssize_t len;
209 
210     while (offset < aiocb->aio_nbytes) {
211          if (aiocb->aio_type & QEMU_AIO_WRITE)
212              len = pwrite(aiocb->aio_fildes,
213                           (const char *)buf + offset,
214                           aiocb->aio_nbytes - offset,
215                           aiocb->aio_offset + offset);
216          else
217              len = pread(aiocb->aio_fildes,
218                          buf + offset,
219                          aiocb->aio_nbytes - offset,
220                          aiocb->aio_offset + offset);
221 
222          if (len == -1 && errno == EINTR)
223              continue;
224          else if (len == -1) {
225              offset = -errno;
226              break;
227          } else if (len == 0)
228              break;
229 
230          offset += len;
231     }
232 
233     return offset;
234 }
235 
handle_aiocb_rw(struct qemu_paiocb * aiocb)236 static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
237 {
238     ssize_t nbytes;
239     char *buf;
240 
241     if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
242         /*
243          * If there is just a single buffer, and it is properly aligned
244          * we can just use plain pread/pwrite without any problems.
245          */
246         if (aiocb->aio_niov == 1)
247              return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
248 
249         /*
250          * We have more than one iovec, and all are properly aligned.
251          *
252          * Try preadv/pwritev first and fall back to linearizing the
253          * buffer if it's not supported.
254          */
255         if (preadv_present) {
256             nbytes = handle_aiocb_rw_vector(aiocb);
257             if (nbytes == aiocb->aio_nbytes)
258                 return nbytes;
259             if (nbytes < 0 && nbytes != -ENOSYS)
260                 return nbytes;
261             preadv_present = 0;
262         }
263 
264         /*
265          * XXX(hch): short read/write.  no easy way to handle the reminder
266          * using these interfaces.  For now retry using plain
267          * pread/pwrite?
268          */
269     }
270 
271     /*
272      * Ok, we have to do it the hard way, copy all segments into
273      * a single aligned buffer.
274      */
275     buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
276     if (aiocb->aio_type & QEMU_AIO_WRITE) {
277         char *p = buf;
278         int i;
279 
280         for (i = 0; i < aiocb->aio_niov; ++i) {
281             memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
282             p += aiocb->aio_iov[i].iov_len;
283         }
284     }
285 
286     nbytes = handle_aiocb_rw_linear(aiocb, buf);
287     if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
288         char *p = buf;
289         size_t count = aiocb->aio_nbytes, copy;
290         int i;
291 
292         for (i = 0; i < aiocb->aio_niov && count; ++i) {
293             copy = count;
294             if (copy > aiocb->aio_iov[i].iov_len)
295                 copy = aiocb->aio_iov[i].iov_len;
296             memcpy(aiocb->aio_iov[i].iov_base, p, copy);
297             p     += copy;
298             count -= copy;
299         }
300     }
301     qemu_vfree(buf);
302 
303     return nbytes;
304 }
305 
aio_thread(void * unused)306 static void *aio_thread(void *unused)
307 {
308     pid_t pid;
309 
310     pid = getpid();
311 
312     while (1) {
313         struct qemu_paiocb *aiocb;
314         ssize_t ret = 0;
315         qemu_timeval tv;
316         struct timespec ts;
317 
318         qemu_gettimeofday(&tv);
319         ts.tv_sec = tv.tv_sec + 10;
320         ts.tv_nsec = 0;
321 
322         mutex_lock(&lock);
323 
324         while (QTAILQ_EMPTY(&request_list) &&
325                !(ret == ETIMEDOUT)) {
326             ret = cond_timedwait(&cond, &lock, &ts);
327         }
328 
329         if (QTAILQ_EMPTY(&request_list))
330             break;
331 
332         aiocb = QTAILQ_FIRST(&request_list);
333         QTAILQ_REMOVE(&request_list, aiocb, node);
334         aiocb->active = 1;
335         idle_threads--;
336         mutex_unlock(&lock);
337 
338         switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
339         case QEMU_AIO_READ:
340         case QEMU_AIO_WRITE:
341             ret = handle_aiocb_rw(aiocb);
342             break;
343         case QEMU_AIO_FLUSH:
344             ret = handle_aiocb_flush(aiocb);
345             break;
346         case QEMU_AIO_IOCTL:
347             ret = handle_aiocb_ioctl(aiocb);
348             break;
349         default:
350             fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
351             ret = -EINVAL;
352             break;
353         }
354 
355         mutex_lock(&lock);
356         aiocb->ret = ret;
357         idle_threads++;
358         mutex_unlock(&lock);
359 
360         if (kill(pid, aiocb->ev_signo)) die("kill failed");
361     }
362 
363     idle_threads--;
364     cur_threads--;
365     mutex_unlock(&lock);
366 
367     return NULL;
368 }
369 
spawn_thread(void)370 static void spawn_thread(void)
371 {
372     sigset_t set, oldset;
373 
374     cur_threads++;
375     idle_threads++;
376 
377     /* block all signals */
378     if (sigfillset(&set)) die("sigfillset");
379     if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
380 
381     thread_create(&thread_id, &attr, aio_thread, NULL);
382 
383     if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
384 }
385 
qemu_paio_submit(struct qemu_paiocb * aiocb)386 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
387 {
388     aiocb->ret = -EINPROGRESS;
389     aiocb->active = 0;
390     mutex_lock(&lock);
391     if (idle_threads == 0 && cur_threads < max_threads)
392         spawn_thread();
393     QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
394     mutex_unlock(&lock);
395     cond_signal(&cond);
396 }
397 
qemu_paio_return(struct qemu_paiocb * aiocb)398 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
399 {
400     ssize_t ret;
401 
402     mutex_lock(&lock);
403     ret = aiocb->ret;
404     mutex_unlock(&lock);
405 
406     return ret;
407 }
408 
qemu_paio_error(struct qemu_paiocb * aiocb)409 static int qemu_paio_error(struct qemu_paiocb *aiocb)
410 {
411     ssize_t ret = qemu_paio_return(aiocb);
412 
413     if (ret < 0)
414         ret = -ret;
415     else
416         ret = 0;
417 
418     return ret;
419 }
420 
posix_aio_process_queue(void * opaque)421 static int posix_aio_process_queue(void *opaque)
422 {
423     PosixAioState *s = opaque;
424     struct qemu_paiocb *acb, **pacb;
425     int ret;
426     int result = 0;
427     int async_context_id = get_async_context_id();
428 
429     for(;;) {
430         pacb = &s->first_aio;
431         for(;;) {
432             acb = *pacb;
433             if (!acb)
434                 return result;
435 
436             /* we're only interested in requests in the right context */
437             if (acb->async_context_id != async_context_id) {
438                 pacb = &acb->next;
439                 continue;
440             }
441 
442             ret = qemu_paio_error(acb);
443             if (ret == ECANCELED) {
444                 /* remove the request */
445                 *pacb = acb->next;
446                 qemu_aio_release(acb);
447                 result = 1;
448             } else if (ret != EINPROGRESS) {
449                 /* end of aio */
450                 if (ret == 0) {
451                     ret = qemu_paio_return(acb);
452                     if (ret == acb->aio_nbytes)
453                         ret = 0;
454                     else
455                         ret = -EINVAL;
456                 } else {
457                     ret = -ret;
458                 }
459                 /* remove the request */
460                 *pacb = acb->next;
461                 /* call the callback */
462                 acb->common.cb(acb->common.opaque, ret);
463                 qemu_aio_release(acb);
464                 result = 1;
465                 break;
466             } else {
467                 pacb = &acb->next;
468             }
469         }
470     }
471 
472     return result;
473 }
474 
posix_aio_read(void * opaque)475 static void posix_aio_read(void *opaque)
476 {
477     PosixAioState *s = opaque;
478     union {
479         struct qemu_signalfd_siginfo siginfo;
480         char buf[128];
481     } sig;
482     size_t offset;
483 
484     /* try to read from signalfd, don't freak out if we can't read anything */
485     offset = 0;
486     while (offset < 128) {
487         ssize_t len;
488 
489         len = read(s->fd, sig.buf + offset, 128 - offset);
490         if (len == -1 && errno == EINTR)
491             continue;
492         if (len == -1 && errno == EAGAIN) {
493             /* there is no natural reason for this to happen,
494              * so we'll spin hard until we get everything just
495              * to be on the safe side. */
496             if (offset > 0)
497                 continue;
498         }
499 
500         offset += len;
501     }
502 
503     posix_aio_process_queue(s);
504 }
505 
posix_aio_flush(void * opaque)506 static int posix_aio_flush(void *opaque)
507 {
508     PosixAioState *s = opaque;
509     return !!s->first_aio;
510 }
511 
512 static PosixAioState *posix_aio_state;
513 
paio_remove(struct qemu_paiocb * acb)514 static void paio_remove(struct qemu_paiocb *acb)
515 {
516     struct qemu_paiocb **pacb;
517 
518     /* remove the callback from the queue */
519     pacb = &posix_aio_state->first_aio;
520     for(;;) {
521         if (*pacb == NULL) {
522             fprintf(stderr, "paio_remove: aio request not found!\n");
523             break;
524         } else if (*pacb == acb) {
525             *pacb = acb->next;
526             qemu_aio_release(acb);
527             break;
528         }
529         pacb = &(*pacb)->next;
530     }
531 }
532 
paio_cancel(BlockDriverAIOCB * blockacb)533 static void paio_cancel(BlockDriverAIOCB *blockacb)
534 {
535     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
536     int active = 0;
537 
538     mutex_lock(&lock);
539     if (!acb->active) {
540         QTAILQ_REMOVE(&request_list, acb, node);
541         acb->ret = -ECANCELED;
542     } else if (acb->ret == -EINPROGRESS) {
543         active = 1;
544     }
545     mutex_unlock(&lock);
546 
547     if (active) {
548         /* fail safe: if the aio could not be canceled, we wait for
549            it */
550         while (qemu_paio_error(acb) == EINPROGRESS)
551             ;
552     }
553 
554     paio_remove(acb);
555 }
556 
557 static AIOPool raw_aio_pool = {
558     .aiocb_size         = sizeof(struct qemu_paiocb),
559     .cancel             = paio_cancel,
560 };
561 
paio_submit(BlockDriverState * bs,int fd,int64_t sector_num,QEMUIOVector * qiov,int nb_sectors,BlockDriverCompletionFunc * cb,void * opaque,int type)562 BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
563         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
564         BlockDriverCompletionFunc *cb, void *opaque, int type)
565 {
566     struct qemu_paiocb *acb;
567 
568     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
569     if (!acb)
570         return NULL;
571     acb->aio_type = type;
572     acb->aio_fildes = fd;
573     acb->ev_signo = SIGUSR2;
574     acb->async_context_id = get_async_context_id();
575 
576     if (qiov) {
577         acb->aio_iov = qiov->iov;
578         acb->aio_niov = qiov->niov;
579     }
580     acb->aio_nbytes = nb_sectors * 512;
581     acb->aio_offset = sector_num * 512;
582 
583     acb->next = posix_aio_state->first_aio;
584     posix_aio_state->first_aio = acb;
585 
586     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
587     qemu_paio_submit(acb);
588     return &acb->common;
589 }
590 
paio_ioctl(BlockDriverState * bs,int fd,unsigned long int req,void * buf,BlockDriverCompletionFunc * cb,void * opaque)591 BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
592         unsigned long int req, void *buf,
593         BlockDriverCompletionFunc *cb, void *opaque)
594 {
595     struct qemu_paiocb *acb;
596 
597     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
598     if (!acb)
599         return NULL;
600     acb->aio_type = QEMU_AIO_IOCTL;
601     acb->aio_fildes = fd;
602     acb->ev_signo = SIGUSR2;
603     acb->async_context_id = get_async_context_id();
604     acb->aio_offset = 0;
605     acb->aio_ioctl_buf = buf;
606     acb->aio_ioctl_cmd = req;
607 
608     acb->next = posix_aio_state->first_aio;
609     posix_aio_state->first_aio = acb;
610 
611     qemu_paio_submit(acb);
612     return &acb->common;
613 }
614 
paio_init(void)615 int paio_init(void)
616 {
617     sigset_t mask;
618     PosixAioState *s;
619     int ret;
620 
621     if (posix_aio_state)
622         return 0;
623 
624     s = qemu_malloc(sizeof(PosixAioState));
625 
626     /* Make sure to block AIO signal */
627     sigemptyset(&mask);
628     sigaddset(&mask, SIGUSR2);
629     sigprocmask(SIG_BLOCK, &mask, NULL);
630 
631     s->first_aio = NULL;
632     s->fd = qemu_signalfd(&mask);
633     if (s->fd == -1) {
634         fprintf(stderr, "failed to create signalfd\n");
635         return -1;
636     }
637 
638     fcntl(s->fd, F_SETFL, O_NONBLOCK);
639 
640     qemu_aio_set_fd_handler(s->fd, posix_aio_read, NULL, posix_aio_flush,
641         posix_aio_process_queue, s);
642 
643     ret = pthread_attr_init(&attr);
644     if (ret)
645         die2(ret, "pthread_attr_init");
646 
647     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
648     if (ret)
649         die2(ret, "pthread_attr_setdetachstate");
650 
651     QTAILQ_INIT(&request_list);
652 
653     posix_aio_state = s;
654     return 0;
655 }
656