• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Data Structures
  • Files
  • Directories
  • File List
  • Globals

io_wait.h

00001 /*
00002  * $Id$
00003  *
00004  * Copyright (C) 2005 iptelorg GmbH
00005  *
00006  * Permission to use, copy, modify, and distribute this software for any
00007  * purpose with or without fee is hereby granted, provided that the above
00008  * copyright notice and this permission notice appear in all copies.
00009  *
00010  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
00011  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00012  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
00013  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00014  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00015  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00016  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00017  */
00018 /*
00019  * tcp io wait common stuff used by tcp_main.c & tcp_read.c
00020  * All the functions are inline because of speed reasons and because they are
00021  * used only from 2 places.
00022  * You also have to define:
00023  *     int handle_io(struct fd_map* fm, short events, int idx) (see below)
00024  *     (this could be trivially replaced by a callback pointer entry attached
00025  *      to the io_wait handler if more flexibility rather then performance
00026  *      is needed)
00027  *      fd_type - define to some enum of you choice and define also
00028  *                FD_TYPE_DEFINED (if you don't do it fd_type will be defined
00029  *                to int). 0 has a special not set/not init. meaning
00030  *                (a lot of sanity checks and the sigio_rt code are based on
00031  *                 this assumption)
00032  *     local_malloc (defaults to pkg_malloc)
00033  *     local_free   (defaults to pkg_free)
00034  *
00035  */
00036 /*
00037  * History:
00038  * --------
00039  *  2005-06-13  created by andrei
00040  *  2005-06-26  added kqueue (andrei)
00041  *  2005-07-01  added /dev/poll (andrei)
00042  *  2006-05-30  sigio 64 bit workarround enabled for kernels < 2.6.5 (andrei)
00043  *  2007-11-22  when handle_io() is called in a loop check & stop if the fd was
00044  *               removed inside handle_io() (andrei)
00045  *  2007-11-29  support for write (POLLOUT); added io_watch_chg() (andrei)
00046  *  2008-02-04  POLLRDHUP & EPOLLRDHUP support (automatically enabled if POLLIN
00047  *               is set) (andrei)
00048  *  2010-06-17  re-enabled & enhanced the EV_ERROR for kqueue (andrei)
00049  */
00050 
00051 
00052 
00053 #ifndef _io_wait_h
00054 #define _io_wait_h
00055 
00056 #include <errno.h>
00057 #include <string.h>
00058 #ifdef HAVE_SIGIO_RT
00059 #define __USE_GNU /* or else F_SETSIG won't be included */
00060 #include <sys/types.h> /* recv */
00061 #include <sys/socket.h> /* recv */
00062 #include <signal.h> /* sigprocmask, sigwait a.s.o */
00063 #endif
00064 
00065 #define _GNU_SOURCE  /* for POLLRDHUP on linux */
00066 #include <sys/poll.h>
00067 #include <fcntl.h>
00068 
00069 #ifdef HAVE_EPOLL
00070 #include <sys/epoll.h>
00071 #endif
00072 #ifdef HAVE_KQUEUE
00073 #include <sys/types.h> /* needed on freebsd */
00074 #include <sys/event.h>
00075 #include <sys/time.h>
00076 #endif
00077 #ifdef HAVE_DEVPOLL
00078 #include <sys/devpoll.h>
00079 #endif
00080 #ifdef HAVE_SELECT
00081 /* needed on openbsd for select*/
00082 #include <sys/time.h>
00083 #include <sys/types.h>
00084 #include <unistd.h>
00085 /* needed according to POSIX for select*/
00086 #include <sys/select.h>
00087 #endif
00088 
00089 #include "dprint.h"
00090 
00091 #include "poll_types.h" /* poll_types*/
00092 #ifdef HAVE_SIGIO_RT
00093 #include "pt.h" /* mypid() */
00094 #endif
00095 
00096 #include "compiler_opt.h"
00097 
00098 
00099 #ifdef HAVE_EPOLL
00100 /* fix defines for EPOLL */
00101 #if defined POLLRDHUP && ! defined EPOLLRDHUP
00102 #define EPOLLRDHUP POLLRDHUP  /* should work on all linuxes */
00103 #endif /* POLLRDHUP && EPOLLRDHUP */
00104 #endif /* HAVE_EPOLL */
00105 
00106 
00107 extern int _os_ver; /* os version number, needed to select bugs workarrounds */
00108 
00109 
00110 #if 0
00111 enum fd_types; /* this should be defined from the including file,
00112                                   see tcp_main.c for an example,
00113                                   0 has a special meaning: not used/empty*/
00114 #endif
00115 
00116 #ifndef FD_TYPE_DEFINED
00117 typedef int fd_type;
00118 #define FD_TYPE_DEFINED
00119 #endif
00120 
00121 /* maps a fd to some other structure; used in almost all cases
00122  * except epoll and maybe kqueue or /dev/poll */
00123 struct fd_map{
00124         int fd;               /* fd no */
00125         fd_type type;         /* "data" type */
00126         void* data;           /* pointer to the corresponding structure */
00127         short events;         /* events we are interested int */
00128 };
00129 
00130 
00131 #ifdef HAVE_KQUEUE
00132 #ifndef KQ_CHANGES_ARRAY_SIZE
00133 #define KQ_CHANGES_ARRAY_SIZE 256
00134 
00135 #ifdef __OS_netbsd
00136 #define KEV_UDATA_CAST (intptr_t)
00137 #else
00138 #define KEV_UDATA_CAST
00139 #endif
00140 
00141 #endif
00142 #endif
00143 
00144 
00145 /* handler structure */
00146 struct io_wait_handler{
00147         enum poll_types poll_method;
00148         int flags;
00149         struct fd_map* fd_hash;
00150         int fd_no; /*  current index used in fd_array and the passed size for
00151                                    ep_array (for kq_array at least
00152                                     max(twice the size, kq_changes_size) should be
00153                                    be passed). */
00154         int max_fd_no; /* maximum fd no, is also the size of fd_array,
00155                                                        fd_hash  and ep_array*/
00156         /* common stuff for POLL, SIGIO_RT and SELECT
00157          * since poll support is always compiled => this will always be compiled */
00158         struct pollfd* fd_array; /* used also by devpoll as devpoll array */
00159         int crt_fd_array_idx; /*  crt idx for which handle_io is called
00160                                                          (updated also by del -> internal optimization) */
00161         /* end of common stuff */
00162 #ifdef HAVE_EPOLL
00163         int epfd; /* epoll ctrl fd */
00164         struct epoll_event* ep_array;
00165 #endif
00166 #ifdef HAVE_SIGIO_RT
00167         sigset_t sset; /* signal mask for sigio & sigrtmin */
00168         int signo;     /* real time signal used */
00169 #endif
00170 #ifdef HAVE_KQUEUE
00171         int kq_fd;
00172         struct kevent* kq_array;   /* used for the eventlist*/
00173         struct kevent* kq_changes; /* used for the changelist */
00174         size_t kq_nchanges;
00175         size_t kq_array_size;   /* array size */
00176         size_t kq_changes_size; /* size of the changes array */
00177 #endif
00178 #ifdef HAVE_DEVPOLL
00179         int dpoll_fd;
00180 #endif
00181 #ifdef HAVE_SELECT
00182         fd_set master_rset; /* read set */
00183         fd_set master_wset; /* write set */
00184         int max_fd_select; /* maximum select used fd */
00185 #endif
00186 };
00187 
00188 typedef struct io_wait_handler io_wait_h;
00189 
00190 
00191 /* get the corresponding fd_map structure pointer */
00192 #define get_fd_map(h, fd)               (&(h)->fd_hash[(fd)])
00193 /* remove a fd_map structure from the hash; the pointer must be returned
00194  * by get_fd_map or hash_fd_map*/
00195 #define unhash_fd_map(pfm)      \
00196         do{ \
00197                 (pfm)->type=0 /*F_NONE */; \
00198                 (pfm)->fd=-1; \
00199         }while(0)
00200 
00201 /* add a fd_map structure to the fd hash */
00202 static inline struct fd_map* hash_fd_map(       io_wait_h* h,
00203                                                                                         int fd,
00204                                                                                         short events,
00205                                                                                         fd_type type,
00206                                                                                         void* data)
00207 {
00208         h->fd_hash[fd].fd=fd;
00209         h->fd_hash[fd].events=events;
00210         h->fd_hash[fd].type=type;
00211         h->fd_hash[fd].data=data;
00212         return &h->fd_hash[fd];
00213 }
00214 
00215 
00216 
00217 #ifdef HANDLE_IO_INLINE
00218 /* generic handle io routine, this must be defined in the including file
00219  * (faster then registering a callback pointer)
00220  *
00221  * params:  fm     - pointer to a fd hash entry
00222  *          events - combinations of POLLIN, POLLOUT, POLLERR & POLLHUP
00223  *          idx    - index in the fd_array (or -1 if not known)
00224  * return: -1 on error
00225  *          0 on EAGAIN or when by some other way it is known that no more
00226  *            io events are queued on the fd (the receive buffer is empty).
00227  *            Usefull to detect when there are no more io events queued for
00228  *            sigio_rt, epoll_et, kqueue.
00229  *         >0 on successfull read from the fd (when there might be more io
00230  *            queued -- the receive buffer might still be non-empty)
00231  */
00232 inline static int handle_io(struct fd_map* fm, short events, int idx);
00233 #else
00234 int handle_io(struct fd_map* fm, short events, int idx);
00235 #endif
00236 
00237 
00238 
00239 #ifdef HAVE_KQUEUE
00240 /*
00241  * kqueue specific function: register a change
00242  * (adds a change to the kevent change array, and if full flushes it first)
00243  *
00244  * TODO: check if the event already exists in the change list or if it's
00245  *       complementary to an event in the list (e.g. EVFILT_WRITE, EV_DELETE
00246  *       and EVFILT_WRITE, EV_ADD for the same fd).
00247  * returns: -1 on error, 0 on success
00248  */
00249 static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
00250                                                                 void* data)
00251 {
00252         int n;
00253         int r;
00254         struct timespec tspec;
00255 
00256         if (h->kq_nchanges>=h->kq_changes_size){
00257                 /* changes array full ! */
00258                 LOG(L_WARN, "WARNING: kq_ev_change: kqueue changes array full"
00259                                         " trying to flush...\n");
00260                 tspec.tv_sec=0;
00261                 tspec.tv_nsec=0;
00262 again:
00263                 n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges, 0, 0, &tspec);
00264                 if (unlikely(n == -1)){
00265                         if (unlikely(errno == EINTR)) goto again;
00266                         else {
00267                                 /* for a detailed explanation of what follows see
00268                                    io_wait_loop_kqueue EV_ERROR case */
00269                                 if (unlikely(!(errno == EBADF || errno == ENOENT)))
00270                                         BUG("kq_ev_change: kevent flush changes failed"
00271                                                         " (unexpected error): %s [%d]\n",
00272                                                         strerror(errno), errno);
00273                                         /* ignore error even if it's not a EBADF/ENOENT */
00274                                 /* one of the file descriptors is bad, probably already
00275                                    closed => try to apply changes one-by-one */
00276                                 for (r = 0; r < h->kq_nchanges; r++) {
00277 retry2:
00278                                         n = kevent(h->kq_fd, &h->kq_changes[r], 1, 0, 0, &tspec);
00279                                         if (n==-1) {
00280                                                 if (unlikely(errno == EINTR))
00281                                                         goto retry2;
00282                                         /* for a detailed explanation of what follows see
00283                                                 io_wait_loop_kqueue EV_ERROR case */
00284                                                 if (unlikely(!(errno == EBADF || errno == ENOENT)))
00285                                                         BUG("kq_ev_change: kevent flush changes failed:"
00286                                                                         " (unexpected error) %s [%d] (%d/%lu)\n",
00287                                                                                 strerror(errno), errno,
00288                                                                                 r, (unsigned long)h->kq_nchanges);
00289                                                 continue; /* skip over it */
00290                                         }
00291                                 }
00292                         }
00293                 }
00294                 h->kq_nchanges=0; /* changes array is empty */
00295         }
00296         EV_SET(&h->kq_changes[h->kq_nchanges], fd, filter, flag, 0, 0,
00297                         KEV_UDATA_CAST data);
00298         h->kq_nchanges++;
00299         return 0;
00300 }
00301 #endif
00302 
00303 
00304 
00305 /* generic io_watch_add function
00306  * Params:
00307  *     h      - pointer to initialized io_wait handle
00308  *     fd     - fd to watch
00309  *     events - bitmap with the fd events for which the fd should be watched
00310  *              (combination of POLLIN and POLLOUT)
00311  *     type   - fd type (non 0 value, returned in the call to handle_io)
00312  *     data   - pointer/private data returned in the handle_io call
00313  * returns 0 on success, -1 on error
00314  *
00315  * WARNING: handle_io() can be called immediately (from io_watch_add()) so
00316  *  make sure that any dependent init. (e.g. data stuff) is made before
00317  *  calling io_watch_add
00318  *
00319  * this version should be faster than pointers to poll_method specific
00320  * functions (it avoids functions calls, the overhead being only an extra
00321  *  switch())*/
00322 inline static int io_watch_add( io_wait_h* h,
00323                                                                 int fd,
00324                                                                 short events,
00325                                                                 fd_type type,
00326                                                                 void* data)
00327 {
00328 
00329         /* helper macros */
00330 #define fd_array_setup(ev) \
00331         do{ \
00332                 h->fd_array[h->fd_no].fd=fd; \
00333                 h->fd_array[h->fd_no].events=(ev); /* useless for select */ \
00334                 h->fd_array[h->fd_no].revents=0;     /* useless for select */ \
00335         }while(0)
00336         
00337 #define set_fd_flags(f) \
00338         do{ \
00339                         flags=fcntl(fd, F_GETFL); \
00340                         if (flags==-1){ \
00341                                 LOG(L_ERR, "ERROR: io_watch_add: fnctl: GETFL failed:" \
00342                                                 " %s [%d]\n", strerror(errno), errno); \
00343                                 goto error; \
00344                         } \
00345                         if (fcntl(fd, F_SETFL, flags|(f))==-1){ \
00346                                 LOG(L_ERR, "ERROR: io_watch_add: fnctl: SETFL" \
00347                                                         " failed: %s [%d]\n", strerror(errno), errno); \
00348                                 goto error; \
00349                         } \
00350         }while(0)
00351         
00352         
00353         struct fd_map* e;
00354         int flags;
00355 #ifdef HAVE_EPOLL
00356         struct epoll_event ep_event;
00357 #endif
00358 #ifdef HAVE_DEVPOLL
00359         struct pollfd pfd;
00360 #endif
00361 #if defined(HAVE_SIGIO_RT) || defined (HAVE_EPOLL)
00362         int n;
00363 #endif
00364 #if defined(HAVE_SIGIO_RT)
00365         int idx;
00366         int check_io;
00367         struct pollfd pf;
00368         
00369         check_io=0; /* set to 1 if we need to check for pre-existing queued
00370                                    io/data on the fd */
00371         idx=-1;
00372 #endif
00373         e=0;
00374         /* sanity checks */
00375         if (unlikely(fd==-1)){
00376                 LOG(L_CRIT, "BUG: io_watch_add: fd is -1!\n");
00377                 goto error;
00378         }
00379         if (unlikely((events&(POLLIN|POLLOUT))==0)){
00380                 LOG(L_CRIT, "BUG: io_watch_add: invalid events: 0x%0x\n", events);
00381                 goto error;
00382         }
00383         /* check if not too big */
00384         if (unlikely(h->fd_no>=h->max_fd_no)){
00385                 LOG(L_CRIT, "ERROR: io_watch_add: maximum fd number exceeded:"
00386                                 " %d/%d\n", h->fd_no, h->max_fd_no);
00387                 goto error;
00388         }
00389         DBG("DBG: io_watch_add(%p, %d, %d, %p), fd_no=%d\n",
00390                         h, fd, type, data, h->fd_no);
00391         /*  hash sanity check */
00392         e=get_fd_map(h, fd);
00393         if (unlikely(e && (e->type!=0 /*F_NONE*/))){
00394                 LOG(L_ERR, "ERROR: io_watch_add: trying to overwrite entry %d"
00395                                 " watched for %x in the hash(%d, %d, %p) with (%d, %d, %p)\n",
00396                                 fd, events, e->fd, e->type, e->data, fd, type, data);
00397                 e=0;
00398                 goto error;
00399         }
00400         
00401         if (unlikely((e=hash_fd_map(h, fd, events, type, data))==0)){
00402                 LOG(L_ERR, "ERROR: io_watch_add: failed to hash the fd %d\n", fd);
00403                 goto error;
00404         }
00405         switch(h->poll_method){ /* faster then pointer to functions */
00406                 case POLL_POLL:
00407 #ifdef POLLRDHUP
00408                         /* listen to POLLRDHUP by default (if POLLIN) */
00409                         events|=((int)!(events & POLLIN) - 1) & POLLRDHUP;
00410 #endif /* POLLRDHUP */
00411                         fd_array_setup(events);
00412                         set_fd_flags(O_NONBLOCK);
00413                         break;
00414 #ifdef HAVE_SELECT
00415                 case POLL_SELECT:
00416                         fd_array_setup(events);
00417                         if (likely(events & POLLIN))
00418                                 FD_SET(fd, &h->master_rset);
00419                         if (unlikely(events & POLLOUT))
00420                                 FD_SET(fd, &h->master_wset);
00421                         if (h->max_fd_select<fd) h->max_fd_select=fd;
00422                         break;
00423 #endif
00424 #ifdef HAVE_SIGIO_RT
00425                 case POLL_SIGIO_RT:
00426                         fd_array_setup(events);
00427                         /* re-set O_ASYNC might be needed, if not done from
00428                          * io_watch_del (or if somebody wants to add a fd which has
00429                          * already O_ASYNC/F_SETSIG set on a duplicate)
00430                          */
00431                         /* set async & signal */
00432                         if (fcntl(fd, F_SETOWN, my_pid())==-1){
00433                                 LOG(L_ERR, "ERROR: io_watch_add: fnctl: SETOWN"
00434                                 " failed: %s [%d]\n", strerror(errno), errno);
00435                                 goto error;
00436                         }
00437                         if (fcntl(fd, F_SETSIG, h->signo)==-1){
00438                                 LOG(L_ERR, "ERROR: io_watch_add: fnctl: SETSIG"
00439                                         " failed: %s [%d]\n", strerror(errno), errno);
00440                                 goto error;
00441                         }
00442                         /* set both non-blocking and async */
00443                         set_fd_flags(O_ASYNC| O_NONBLOCK);
00444 #ifdef EXTRA_DEBUG
00445                         DBG("io_watch_add: sigio_rt on f %d, signal %d to pid %d\n",
00446                                         fd,  h->signo, my_pid());
00447 #endif
00448                         /* empty socket receive buffer, if buffer is already full
00449                          * no more space to put packets
00450                          * => no more signals are ever generated
00451                          * also when moving fds, the freshly moved fd might have
00452                          *  already some bytes queued, we want to get them now
00453                          *  and not later -- andrei */
00454                         idx=h->fd_no;
00455                         check_io=1;
00456                         break;
00457 #endif
00458 #ifdef HAVE_EPOLL
00459                 case POLL_EPOLL_LT:
00460                         ep_event.events=
00461 #ifdef POLLRDHUP
00462                                                 /* listen for EPOLLRDHUP too */
00463                                                 ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
00464 #else /* POLLRDHUP */
00465                                                 (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
00466 #endif /* POLLRDHUP */
00467                                                 (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
00468                         ep_event.data.ptr=e;
00469 again1:
00470                         n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
00471                         if (unlikely(n==-1)){
00472                                 if (errno==EAGAIN) goto again1;
00473                                 LOG(L_ERR, "ERROR: io_watch_add: epoll_ctl failed: %s [%d]\n",
00474                                         strerror(errno), errno);
00475                                 goto error;
00476                         }
00477                         break;
00478                 case POLL_EPOLL_ET:
00479                         set_fd_flags(O_NONBLOCK);
00480                         ep_event.events=
00481 #ifdef POLLRDHUP
00482                                                 /* listen for EPOLLRDHUP too */
00483                                                 ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
00484 #else /* POLLRDHUP */
00485                                                 (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
00486 #endif /* POLLRDHUP */
00487                                                 (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
00488                                                 EPOLLET;
00489                         ep_event.data.ptr=e;
00490 again2:
00491                         n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
00492                         if (unlikely(n==-1)){
00493                                 if (errno==EAGAIN) goto again2;
00494                                 LOG(L_ERR, "ERROR: io_watch_add: epoll_ctl failed: %s [%d]\n",
00495                                         strerror(errno), errno);
00496                                 goto error;
00497                         }
00498                         break;
00499 #endif
00500 #ifdef HAVE_KQUEUE
00501                 case POLL_KQUEUE:
00502                         if (likely( events & POLLIN)){
00503                                 if (unlikely(kq_ev_change(h, fd, EVFILT_READ, EV_ADD, e)==-1))
00504                                         goto error;
00505                         }
00506                         if (unlikely( events & POLLOUT)){
00507                                 if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE, EV_ADD, e)==-1))
00508                                 {
00509                                         if (likely(events & POLLIN)){
00510                                                 kq_ev_change(h, fd, EVFILT_READ, EV_DELETE, 0);
00511                                         }
00512                                         goto error;
00513                                 }
00514                         }
00515                         break;
00516 #endif
00517 #ifdef HAVE_DEVPOLL
00518                 case POLL_DEVPOLL:
00519                         pfd.fd=fd;
00520                         pfd.events=events;
00521                         pfd.revents=0;
00522 again_devpoll:
00523                         if (write(h->dpoll_fd, &pfd, sizeof(pfd))==-1){
00524                                 if (errno==EAGAIN) goto again_devpoll;
00525                                 LOG(L_ERR, "ERROR: io_watch_add: /dev/poll write failed:"
00526                                                         "%s [%d]\n", strerror(errno), errno);
00527                                 goto error;
00528                         }
00529                         break;
00530 #endif
00531                         
00532                 default:
00533                         LOG(L_CRIT, "BUG: io_watch_add: no support for poll method "
00534                                         " %s (%d)\n", poll_method_str[h->poll_method],
00535                                         h->poll_method);
00536                         goto error;
00537         }
00538         
00539         h->fd_no++; /* "activate" changes, for epoll/kqueue/devpoll it
00540                                    has only informative value */
00541 #if defined(HAVE_SIGIO_RT)
00542         if (check_io){
00543                 /* handle possible pre-existing events */
00544                 pf.fd=fd;
00545                 pf.events=events;
00546 check_io_again:
00547                 n=0;
00548                 while(e->type && ((n=poll(&pf, 1, 0))>0) &&
00549                                 (handle_io(e, pf.revents, idx)>0) &&
00550                                 (pf.revents & (e->events|POLLERR|POLLHUP)));
00551                 if (unlikely(e->type && (n==-1))){
00552                         if (errno==EINTR) goto check_io_again;
00553                         LOG(L_ERR, "ERROR: io_watch_add: check_io poll: %s [%d]\n",
00554                                                 strerror(errno), errno);
00555                 }
00556         }
00557 #endif
00558         return 0;
00559 error:
00560         if (e) unhash_fd_map(e);
00561         return -1;
00562 #undef fd_array_setup
00563 #undef set_fd_flags
00564 }
00565 
00566 
00567 
00568 #define IO_FD_CLOSING 16
00569 /* parameters:    h - handler
00570  *               fd - file descriptor
00571  *            index - index in the fd_array if known, -1 if not
00572  *                    (if index==-1 fd_array will be searched for the
00573  *                     corresponding fd* entry -- slower but unavoidable in
00574  *                     some cases). index is not used (no fd_array) for epoll,
00575  *                     /dev/poll and kqueue
00576  *            flags - optimization flags, e.g. IO_FD_CLOSING, the fd was
00577  *                    or will shortly be closed, in some cases we can avoid
00578  *                    extra remove operations (e.g.: epoll, kqueue, sigio)
00579  * returns 0 if ok, -1 on error */
00580 inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
00581 {
00582         
00583 #define fix_fd_array \
00584         do{\
00585                         if (unlikely(idx==-1)){ \
00586                                 /* fix idx if -1 and needed */ \
00587                                 for (idx=0; (idx<h->fd_no) && \
00588                                                         (h->fd_array[idx].fd!=fd); idx++); \
00589                         } \
00590                         if (likely(idx<h->fd_no)){ \
00591                                 memmove(&h->fd_array[idx], &h->fd_array[idx+1], \
00592                                         (h->fd_no-(idx+1))*sizeof(*(h->fd_array))); \
00593                                 if ((idx<=h->crt_fd_array_idx) && (h->crt_fd_array_idx>=0)) \
00594                                         h->crt_fd_array_idx--; \
00595                         } \
00596         }while(0)
00597         
00598         struct fd_map* e;
00599         int events;
00600 #ifdef HAVE_EPOLL
00601         int n;
00602         struct epoll_event ep_event;
00603 #endif
00604 #ifdef HAVE_DEVPOLL
00605         struct pollfd pfd;
00606 #endif
00607 #ifdef HAVE_SIGIO_RT
00608         int fd_flags;
00609 #endif
00610         
00611         if (unlikely((fd<0) || (fd>=h->max_fd_no))){
00612                 LOG(L_CRIT, "BUG: io_watch_del: invalid fd %d, not in [0, %d) \n",
00613                                                 fd, h->fd_no);
00614                 goto error;
00615         }
00616         DBG("DBG: io_watch_del (%p, %d, %d, 0x%x) fd_no=%d called\n",
00617                         h, fd, idx, flags, h->fd_no);
00618         e=get_fd_map(h, fd);
00619         /* more sanity checks */
00620         if (unlikely(e==0)){
00621                 LOG(L_CRIT, "BUG: io_watch_del: no corresponding hash entry for %d\n",
00622                                         fd);
00623                 goto error;
00624         }
00625         if (unlikely(e->type==0 /*F_NONE*/)){
00626                 LOG(L_ERR, "ERROR: io_watch_del: trying to delete already erased"
00627                                 " entry %d in the hash(%d, %d, %p) flags %x)\n",
00628                                 fd, e->fd, e->type, e->data, flags);
00629                 goto error;
00630         }
00631         events=e->events;
00632         
00633         switch(h->poll_method){
00634                 case POLL_POLL:
00635                         fix_fd_array;
00636                         break;
00637 #ifdef HAVE_SELECT
00638                 case POLL_SELECT:
00639                         if (likely(events & POLLIN))
00640                                 FD_CLR(fd, &h->master_rset);
00641                         if (unlikely(events & POLLOUT))
00642                                 FD_CLR(fd, &h->master_wset);
00643                         if (unlikely(h->max_fd_select && (h->max_fd_select==fd)))
00644                                 /* we don't know the prev. max, so we just decrement it */
00645                                 h->max_fd_select--;
00646                         fix_fd_array;
00647                         break;
00648 #endif
00649 #ifdef HAVE_SIGIO_RT
00650                 case POLL_SIGIO_RT:
00651                         /* the O_ASYNC flag must be reset all the time, the fd
00652                          *  can be changed only if  O_ASYNC is reset (if not and
00653                          *  the fd is a duplicate, you will get signals from the dup. fd
00654                          *  and not from the original, even if the dup. fd was closed
00655                          *  and the signals re-set on the original) -- andrei
00656                          */
00657                         /*if (!(flags & IO_FD_CLOSING)){*/
00658                                 /* reset ASYNC */
00659                                 fd_flags=fcntl(fd, F_GETFL);
00660                                 if (unlikely(fd_flags==-1)){
00661                                         LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:"
00662                                                         " %s [%d]\n", strerror(errno), errno);
00663                                         goto error;
00664                                 }
00665                                 if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){
00666                                         LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL"
00667                                                                 " failed: %s [%d]\n", strerror(errno), errno);
00668                                         goto error;
00669                                 }
00670                         fix_fd_array; /* only on success */
00671                         break;
00672 #endif
00673 #ifdef HAVE_EPOLL
00674                 case POLL_EPOLL_LT:
00675                 case POLL_EPOLL_ET:
00676                         /* epoll doesn't seem to automatically remove sockets,
00677                          * if the socket is a duplicate/moved and the original
00678                          * is still open. The fd is removed from the epoll set
00679                          * only when the original (and all the  copies?) is/are
00680                          * closed. This is probably a bug in epoll. --andrei */
00681 #ifdef EPOLL_NO_CLOSE_BUG
00682                         if (!(flags & IO_FD_CLOSING)){
00683 #endif
00684 again_epoll:
00685                                 n=epoll_ctl(h->epfd, EPOLL_CTL_DEL, fd, &ep_event);
00686                                 if (unlikely(n==-1)){
00687                                         if (errno==EAGAIN) goto again_epoll;
00688                                         LOG(L_ERR, "ERROR: io_watch_del: removing fd from epoll "
00689                                                         "list failed: %s [%d]\n", strerror(errno), errno);
00690                                         goto error;
00691                                 }
00692 #ifdef EPOLL_NO_CLOSE_BUG
00693                         }
00694 #endif
00695                         break;
00696 #endif
00697 #ifdef HAVE_KQUEUE
00698                 case POLL_KQUEUE:
00699                         if (!(flags & IO_FD_CLOSING)){
00700                                 if (likely(events & POLLIN)){
00701                                         if (unlikely(kq_ev_change(h, fd, EVFILT_READ,
00702                                                                                                         EV_DELETE, 0) ==-1)){
00703                                                 /* try to delete the write filter anyway */
00704                                                 if (events & POLLOUT){
00705                                                         kq_ev_change(h, fd, EVFILT_WRITE, EV_DELETE, 0);
00706                                                 }
00707                                                 goto error;
00708                                         }
00709                                 }
00710                                 if (unlikely(events & POLLOUT)){
00711                                         if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE,
00712                                                                                                         EV_DELETE, 0) ==-1))
00713                                                 goto error;
00714                                 }
00715                         }
00716                         break;
00717 #endif
00718 #ifdef HAVE_DEVPOLL
00719                 case POLL_DEVPOLL:
00720                                 /* for /dev/poll the closed fds _must_ be removed
00721                                    (they are not removed automatically on close()) */
00722                                 pfd.fd=fd;
00723                                 pfd.events=POLLREMOVE;
00724                                 pfd.revents=0;
00725 again_devpoll:
00726                                 if (write(h->dpoll_fd, &pfd, sizeof(pfd))==-1){
00727                                         if (errno==EINTR) goto again_devpoll;
00728                                         LOG(L_ERR, "ERROR: io_watch_del: removing fd from "
00729                                                                 "/dev/poll failed: %s [%d]\n",
00730                                                                 strerror(errno), errno);
00731                                         goto error;
00732                                 }
00733                                 break;
00734 #endif
00735                 default:
00736                         LOG(L_CRIT, "BUG: io_watch_del: no support for poll method "
00737                                         " %s (%d)\n", poll_method_str[h->poll_method],
00738                                         h->poll_method);
00739                         goto error;
00740         }
00741         unhash_fd_map(e); /* only on success */
00742         h->fd_no--;
00743         return 0;
00744 error:
00745         return -1;
00746 #undef fix_fd_array
00747 }
00748 
00749 
00750 
00751 /* parameters:    h - handler
00752  *               fd - file descriptor
00753  *           events - new events to watch for
00754  *              idx - index in the fd_array if known, -1 if not
00755  *                    (if index==-1 fd_array will be searched for the
00756  *                     corresponding fd* entry -- slower but unavoidable in
00757  *                     some cases). index is not used (no fd_array) for epoll,
00758  *                     /dev/poll and kqueue
00759  * returns 0 if ok, -1 on error */
00760 inline static int io_watch_chg(io_wait_h* h, int fd, short events, int idx )
00761 {
00762         
00763 #define fd_array_chg(ev) \
00764         do{\
00765                         if (unlikely(idx==-1)){ \
00766                                 /* fix idx if -1 and needed */ \
00767                                 for (idx=0; (idx<h->fd_no) && \
00768                                                         (h->fd_array[idx].fd!=fd); idx++); \
00769                         } \
00770                         if (likely(idx<h->fd_no)){ \
00771                                 h->fd_array[idx].events=(ev); \
00772                         } \
00773         }while(0)
00774         
00775         struct fd_map* e;
00776         int add_events;
00777         int del_events;
00778 #ifdef HAVE_DEVPOLL
00779         struct pollfd pfd;
00780 #endif
00781 #ifdef HAVE_EPOLL
00782         int n;
00783         struct epoll_event ep_event;
00784 #endif
00785         
00786         if (unlikely((fd<0) || (fd>=h->max_fd_no))){
00787                 LOG(L_CRIT, "BUG: io_watch_chg: invalid fd %d, not in [0, %d) \n",
00788                                                 fd, h->fd_no);
00789                 goto error;
00790         }
00791         if (unlikely((events&(POLLIN|POLLOUT))==0)){
00792                 LOG(L_CRIT, "BUG: io_watch_chg: invalid events: 0x%0x\n", events);
00793                 goto error;
00794         }
00795         DBG("DBG: io_watch_chg (%p, %d, 0x%x, 0x%x) fd_no=%d called\n",
00796                         h, fd, events, idx, h->fd_no);
00797         e=get_fd_map(h, fd);
00798         /* more sanity checks */
00799         if (unlikely(e==0)){
00800                 LOG(L_CRIT, "BUG: io_watch_chg: no corresponding hash entry for %d\n",
00801                                         fd);
00802                 goto error;
00803         }
00804         if (unlikely(e->type==0 /*F_NONE*/)){
00805                 LOG(L_ERR, "ERROR: io_watch_chg: trying to change an already erased"
00806                                 " entry %d in the hash(%d, %d, %p) )\n",
00807                                 fd, e->fd, e->type, e->data);
00808                 goto error;
00809         }
00810         
00811         add_events=events & ~e->events;
00812         del_events=e->events & ~events;
00813         switch(h->poll_method){
00814                 case POLL_POLL:
00815 #ifdef POLLRDHUP
00816                         fd_array_chg(events |
00817                                                         /* listen to POLLRDHUP by default (if POLLIN) */
00818                                                         (((int)!(events & POLLIN) - 1) & POLLRDHUP)
00819                                                 );
00820 #else /* POLLRDHUP */
00821                         fd_array_chg(events);
00822 #endif /* POLLRDHUP */
00823                         break;
00824 #ifdef HAVE_SELECT
00825                 case POLL_SELECT:
00826                         fd_array_chg(events);
00827                         if (unlikely(del_events & POLLIN))
00828                                 FD_CLR(fd, &h->master_rset);
00829                         else if (unlikely(add_events & POLLIN))
00830                                 FD_SET(fd, &h->master_rset);
00831                         if (likely(del_events & POLLOUT))
00832                                 FD_CLR(fd, &h->master_wset);
00833                         else if (likely(add_events & POLLOUT))
00834                                 FD_SET(fd, &h->master_wset);
00835                         break;
00836 #endif
00837 #ifdef HAVE_SIGIO_RT
00838                 case POLL_SIGIO_RT:
00839                         fd_array_chg(events);
00840                         /* no need for check_io, since SIGIO_RT listens by default for all
00841                          * the events */
00842                         break;
00843 #endif
00844 #ifdef HAVE_EPOLL
00845                 case POLL_EPOLL_LT:
00846                                 ep_event.events=
00847 #ifdef POLLRDHUP
00848                                                 /* listen for EPOLLRDHUP too */
00849                                                 ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
00850 #else /* POLLRDHUP */
00851                                                 (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
00852 #endif /* POLLRDHUP */
00853                                                 (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
00854                                 ep_event.data.ptr=e;
00855 again_epoll_lt:
00856                                 n=epoll_ctl(h->epfd, EPOLL_CTL_MOD, fd, &ep_event);
00857                                 if (unlikely(n==-1)){
00858                                         if (errno==EAGAIN) goto again_epoll_lt;
00859                                         LOG(L_ERR, "ERROR: io_watch_chg: modifying epoll events"
00860                                                         " failed: %s [%d]\n", strerror(errno), errno);
00861                                         goto error;
00862                                 }
00863                         break;
00864                 case POLL_EPOLL_ET:
00865                                 ep_event.events=
00866 #ifdef POLLRDHUP
00867                                                 /* listen for EPOLLRDHUP too */
00868                                                 ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
00869 #else /* POLLRDHUP */
00870                                                 (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
00871 #endif /* POLLRDHUP */
00872                                                 (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
00873                                                 EPOLLET;
00874                                 ep_event.data.ptr=e;
00875 again_epoll_et:
00876                                 n=epoll_ctl(h->epfd, EPOLL_CTL_MOD, fd, &ep_event);
00877                                 if (unlikely(n==-1)){
00878                                         if (errno==EAGAIN) goto again_epoll_et;
00879                                         LOG(L_ERR, "ERROR: io_watch_chg: modifying epoll events"
00880                                                         " failed: %s [%d]\n", strerror(errno), errno);
00881                                         goto error;
00882                                 }
00883                         break;
00884 #endif
00885 #ifdef HAVE_KQUEUE
00886                 case POLL_KQUEUE:
00887                         if (unlikely(del_events & POLLIN)){
00888                                 if (unlikely(kq_ev_change(h, fd, EVFILT_READ,
00889                                                                                                                 EV_DELETE, 0) ==-1))
00890                                                 goto error;
00891                         }else if (unlikely(add_events & POLLIN)){
00892                                 if (unlikely(kq_ev_change(h, fd, EVFILT_READ, EV_ADD, e) ==-1))
00893                                         goto error;
00894                         }
00895                         if (likely(del_events & POLLOUT)){
00896                                 if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE,
00897                                                                                                                 EV_DELETE, 0) ==-1))
00898                                                 goto error;
00899                         }else if (likely(add_events & POLLOUT)){
00900                                 if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE, EV_ADD, e)==-1))
00901                                         goto error;
00902                         }
00903                         break;
00904 #endif
00905 #ifdef HAVE_DEVPOLL
00906                 case POLL_DEVPOLL:
00907                                 /* for /dev/poll the closed fds _must_ be removed
00908                                    (they are not removed automatically on close()) */
00909                                 pfd.fd=fd;
00910                                 pfd.events=POLLREMOVE;
00911                                 pfd.revents=0;
00912 again_devpoll1:
00913                                 if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
00914                                         if (errno==EINTR) goto again_devpoll1;
00915                                         LOG(L_ERR, "ERROR: io_watch_chg: removing fd from "
00916                                                                 "/dev/poll failed: %s [%d]\n",
00917                                                                 strerror(errno), errno);
00918                                         goto error;
00919                                 }
00920 again_devpoll2:
00921                                 pfd.events=events;
00922                                 pfd.revents=0;
00923                                 if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
00924                                         if (errno==EINTR) goto again_devpoll2;
00925                                         LOG(L_ERR, "ERROR: io_watch_chg: re-adding fd to "
00926                                                                 "/dev/poll failed: %s [%d]\n",
00927                                                                 strerror(errno), errno);
00928                                         /* error re-adding the fd => mark it as removed/unhash */
00929                                         unhash_fd_map(e);
00930                                         goto error;
00931                                 }
00932                                 break;
00933 #endif
00934                 default:
00935                         LOG(L_CRIT, "BUG: io_watch_chg: no support for poll method "
00936                                         " %s (%d)\n", poll_method_str[h->poll_method],
00937                                         h->poll_method);
00938                         goto error;
00939         }
00940         e->events=events; /* only on success */
00941         return 0;
00942 error:
00943         return -1;
00944 #undef fix_fd_array
00945 }
00946 
00947 
00948 
00949 /* io_wait_loop_x style function.
00950  * wait for io using poll()
00951  * params: h      - io_wait handle
00952  *         t      - timeout in s
00953  *         repeat - if !=0 handle_io will be called until it returns <=0
00954  * returns: number of IO events handled on success (can be 0), -1 on error
00955  */
00956 inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
00957 {
00958         int n, r;
00959         int ret;
00960         struct fd_map* fm;
00961         
00962 again:
00963                 ret=n=poll(h->fd_array, h->fd_no, t*1000);
00964                 if (n==-1){
00965                         if (errno==EINTR) goto again; /* signal, ignore it */
00966                         else{
00967                                 LOG(L_ERR, "ERROR:io_wait_loop_poll: poll: %s [%d]\n",
00968                                                 strerror(errno), errno);
00969                                 goto error;
00970                         }
00971                 }
00972                 for (r=0; (r<h->fd_no) && n; r++){
00973                         fm=get_fd_map(h, h->fd_array[r].fd);
00974                         if (h->fd_array[r].revents & (fm->events|POLLERR|POLLHUP)){
00975                                 n--;
00976                                 /* sanity checks */
00977                                 if (unlikely((h->fd_array[r].fd >= h->max_fd_no)||
00978                                                                 (h->fd_array[r].fd < 0))){
00979                                         LOG(L_CRIT, "BUG: io_wait_loop_poll: bad fd %d "
00980                                                         "(no in the 0 - %d range)\n",
00981                                                         h->fd_array[r].fd, h->max_fd_no);
00982                                         /* try to continue anyway */
00983                                         h->fd_array[r].events=0; /* clear the events */
00984                                         continue;
00985                                 }
00986                                 h->crt_fd_array_idx=r;
00987                                 /* repeat handle_io if repeat, fd still watched (not deleted
00988                                  *  inside handle_io), handle_io returns that there's still
00989                                  *  IO and the fd is still watched for the triggering event */
00990                                 while(fm->type &&
00991                                                 (handle_io(fm, h->fd_array[r].revents, r) > 0) &&
00992                                                 repeat && ((fm->events|POLLERR|POLLHUP) &
00993                                                                                                         h->fd_array[r].revents));
00994                                 r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
00995                                                                                   array shifting */
00996                         }
00997                 }
00998 error:
00999         return ret;
01000 }
01001 
01002 
01003 
01004 #ifdef HAVE_SELECT
01005 /* wait for io using select */
01006 inline static int io_wait_loop_select(io_wait_h* h, int t, int repeat)
01007 {
01008         fd_set sel_rset;
01009         fd_set sel_wset;
01010         int n, ret;
01011         struct timeval timeout;
01012         int r;
01013         struct fd_map* fm;
01014         int revents;
01015         
01016 again:
01017                 sel_rset=h->master_rset;
01018                 sel_wset=h->master_wset;
01019                 timeout.tv_sec=t;
01020                 timeout.tv_usec=0;
01021                 ret=n=select(h->max_fd_select+1, &sel_rset, &sel_wset, 0, &timeout);
01022                 if (n<0){
01023                         if (errno==EINTR) goto again; /* just a signal */
01024                         LOG(L_ERR, "ERROR: io_wait_loop_select: select: %s [%d]\n",
01025                                         strerror(errno), errno);
01026                         n=0;
01027                         /* continue */
01028                 }
01029                 /* use poll fd array */
01030                 for(r=0; (r<h->fd_no) && n; r++){
01031                         revents=0;
01032                         if (likely(FD_ISSET(h->fd_array[r].fd, &sel_rset)))
01033                                 revents|=POLLIN;
01034                         if (unlikely(FD_ISSET(h->fd_array[r].fd, &sel_wset)))
01035                                 revents|=POLLOUT;
01036                         if (unlikely(revents)){
01037                                 h->crt_fd_array_idx=r;
01038                                 fm=get_fd_map(h, h->fd_array[r].fd);
01039                                 while(fm->type && (fm->events & revents) &&
01040                                                 (handle_io(fm, revents, r)>0) && repeat);
01041                                 r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
01042                                                                                   array shifting */
01043                                 n--;
01044                         }
01045                 };
01046         return ret;
01047 }
01048 #endif
01049 
01050 
01051 
01052 #ifdef HAVE_EPOLL
01053 inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
01054 {
01055         int n, r;
01056         struct fd_map* fm;
01057         int revents;
01058         
01059 again:
01060                 n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000);
01061                 if (unlikely(n==-1)){
01062                         if (errno==EINTR) goto again; /* signal, ignore it */
01063                         else{
01064                                 LOG(L_ERR, "ERROR:io_wait_loop_epoll: "
01065                                                 "epoll_wait(%d, %p, %d, %d): %s [%d]\n",
01066                                                 h->epfd, h->ep_array, h->fd_no, t*1000,
01067                                                 strerror(errno), errno);
01068                                 goto error;
01069                         }
01070                 }
01071 #if 0
01072                 if (n>1){
01073                         for(r=0; r<n; r++){
01074                                 LOG(L_ERR, "WARNING: ep_array[%d]= %x, %p\n",
01075                                                 r, h->ep_array[r].events, h->ep_array[r].data.ptr);
01076                         }
01077                 }
01078 #endif
01079                 for (r=0; r<n; r++){
01080                         revents= (POLLIN & (!(h->ep_array[r].events & (EPOLLIN|EPOLLPRI))
01081                                                 -1)) |
01082                                          (POLLOUT & (!(h->ep_array[r].events & EPOLLOUT)-1)) |
01083                                          (POLLERR & (!(h->ep_array[r].events & EPOLLERR)-1)) |
01084                                          (POLLHUP & (!(h->ep_array[r].events & EPOLLHUP)-1))
01085 #ifdef POLLRDHUP
01086                                         | (POLLRDHUP & (!(h->ep_array[r].events & EPOLLRDHUP)-1))
01087 #endif
01088                                         ;
01089                         if (likely(revents)){
01090                                 fm=(struct fd_map*)h->ep_array[r].data.ptr;
01091                                 while(fm->type && ((fm->events|POLLERR|POLLHUP) & revents) &&
01092                                                 (handle_io(fm, revents, -1)>0) && repeat);
01093                         }else{
01094                                 LOG(L_ERR, "ERROR:io_wait_loop_epoll: unexpected event %x"
01095                                                         " on %d/%d, data=%p\n", h->ep_array[r].events,
01096                                                         r+1, n, h->ep_array[r].data.ptr);
01097                         }
01098                 }
01099 error:
01100         return n;
01101 }
01102 #endif
01103 
01104 
01105 
01106 #ifdef HAVE_KQUEUE
01107 inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
01108 {
01109         int n, r;
01110         struct timespec tspec;
01111         struct fd_map* fm;
01112         int orig_changes;
01113         int apply_changes;
01114         int revents;
01115         
01116         tspec.tv_sec=t;
01117         tspec.tv_nsec=0;
01118         orig_changes=h->kq_nchanges;
01119         apply_changes=orig_changes;
01120         do {
01121 again:
01122                 n=kevent(h->kq_fd, h->kq_changes, apply_changes,  h->kq_array,
01123                                         h->kq_array_size, &tspec);
01124                 if (unlikely(n==-1)){
01125                         if (unlikely(errno==EINTR)) goto again; /* signal, ignore it */
01126                         else {
01127                                 /* for a detailed explanation of what follows see below
01128                                    the EV_ERROR case */
01129                                 if (unlikely(!(errno==EBADF || errno==ENOENT)))
01130                                         BUG("io_wait_loop_kqueue: kevent: unexpected error"
01131                                                 " %s [%d]\n", strerror(errno), errno);
01132                                 /* some of the FDs in kq_changes are bad (already closed)
01133                                    and there is not enough space in kq_array to return all
01134                                    of them back */
01135                                 apply_changes = h->kq_array_size;
01136                                 goto again;
01137                         }
01138                 }
01139                 /* remove applied changes */
01140                 h->kq_nchanges -= apply_changes;
01141                 if (unlikely(apply_changes < orig_changes)) {
01142                         orig_changes -= apply_changes;
01143                         memmove(&h->kq_changes[0], &h->kq_changes[apply_changes],
01144                                                                         sizeof(h->kq_changes[0])*h->kq_nchanges);
01145                         apply_changes = (orig_changes < h->kq_array_size) ? orig_changes :
01146                                                                 h->kq_array_size;
01147                 } else {
01148                         orig_changes = 0;
01149                         apply_changes = 0;
01150                 }
01151                 for (r=0; r<n; r++){
01152 #ifdef EXTRA_DEBUG
01153                         DBG("DBG: kqueue: event %d/%d: fd=%d, udata=%lx, flags=0x%x\n",
01154                                         r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
01155                                         h->kq_array[r].flags);
01156 #endif
01157                         if (unlikely((h->kq_array[r].flags & EV_ERROR) ||
01158                                                          h->kq_array[r].udata == 0)){
01159                                 /* error in changes: we ignore it if it has to do with a
01160                                    bad fd or update==0. It can be caused by trying to remove an
01161                                    already closed fd: race between adding something to the
01162                                    changes array, close() and applying the changes (EBADF).
01163                                    E.g. for ser tcp: tcp_main sends a fd to child for reading
01164                                     => deletes it from the watched fds => the changes array
01165                                         will contain an EV_DELETE for it. Before the changes
01166                                         are applied (they are at the end of the main io_wait loop,
01167                                         after all the fd events were processed), a CON_ERR sent
01168                                         to tcp_main by a sender (send fail) is processed and causes
01169                                         the fd to be closed. When the changes are applied =>
01170                                         error for the EV_DELETE attempt of a closed fd.
01171                                         Something similar can happen when a fd is scheduled
01172                                         for removal, is close()'ed before being removed and
01173                                         re-opened(a new sock. get the same fd). When the
01174                                         watched fd changes will be applied the fd will be valid
01175                                         (so no EBADF), but it's not already watch => ENOENT.
01176                                         We report a BUG for the other errors (there's nothing
01177                                         constructive we can do if we get an error we don't know
01178                                         how to handle), but apart from that we ignore it in the
01179                                         idea that it is better apply the rest of the changes,
01180                                         rather then dropping all of them.
01181                                 */
01182                                 /*
01183                                         example EV_ERROR for trying to delete a read watched fd,
01184                                         that was already closed:
01185                                         {
01186                                                 ident = 63,  [fd]
01187                                                 filter = -1, [EVFILT_READ]
01188                                                 flags = 16384, [EV_ERROR]
01189                                                 fflags = 0,
01190                                                 data = 9, [errno = EBADF]
01191                                                 udata = 0x0
01192                                         }
01193                                 */
01194                                 if (h->kq_array[r].data != EBADF &&
01195                                                 h->kq_array[r].data != ENOENT)
01196                                         BUG("io_wait_loop_kqueue: kevent unexpected error on "
01197                                                         "fd %ld udata %lx: %s [%ld]\n",
01198                                                         (long)h->kq_array[r].ident,
01199                                                         (long)h->kq_array[r].udata,
01200                                                         strerror(h->kq_array[r].data),
01201                                                         (long)h->kq_array[r].data);
01202                         }else{
01203                                 fm=(struct fd_map*)h->kq_array[r].udata;
01204                                 if (likely(h->kq_array[r].filter==EVFILT_READ)){
01205                                         revents=POLLIN |
01206                                                 (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
01207                                                 (((int)!((h->kq_array[r].flags & EV_EOF) &&
01208                                                                         h->kq_array[r].fflags != 0) - 1)&POLLERR);
01209                                         while(fm->type && (fm->events & revents) &&
01210                                                         (handle_io(fm, revents, -1)>0) && repeat);
01211                                 }else if (h->kq_array[r].filter==EVFILT_WRITE){
01212                                         revents=POLLOUT |
01213                                                 (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
01214                                                 (((int)!((h->kq_array[r].flags & EV_EOF) &&
01215                                                                         h->kq_array[r].fflags != 0) - 1)&POLLERR);
01216                                         while(fm->type && (fm->events & revents) &&
01217                                                         (handle_io(fm, revents, -1)>0) && repeat);
01218                                 }else{
01219                                         BUG("io_wait_loop_kqueue: unknown filter: kqueue: event "
01220                                                         "%d/%d: fd=%d, filter=%d, flags=0x%x, fflags=0x%x,"
01221                                                         " data=%lx, udata=%lx\n",
01222                                         r, n, (int)h->kq_array[r].ident, (int)h->kq_array[r].filter,
01223                                         h->kq_array[r].flags, h->kq_array[r].fflags,
01224                                         (unsigned long)h->kq_array[r].data,
01225                                         (unsigned long)h->kq_array[r].udata);
01226                                 }
01227                         }
01228                 }
01229         } while(unlikely(orig_changes));
01230         return n;
01231 }
01232 #endif
01233 
01234 
01235 
01236 #ifdef HAVE_SIGIO_RT
01237 /* sigio rt version has no repeat (it doesn't make sense)*/
01238 inline static int io_wait_loop_sigio_rt(io_wait_h* h, int t)
01239 {
01240         int n;
01241         int ret;
01242         struct timespec ts;
01243         siginfo_t siginfo;
01244         int sigio_band;
01245         int sigio_fd;
01246         struct fd_map* fm;
01247         int revents;
01248 #ifdef SIGINFO64_WORKARROUND
01249         int* pi;
01250 #endif
01251         
01252         
01253         ret=1; /* 1 event per call normally */
01254         ts.tv_sec=t;
01255         ts.tv_nsec=0;
01256         if (unlikely(!sigismember(&h->sset, h->signo) ||
01257                                         !sigismember(&h->sset, SIGIO))) {
01258                 LOG(L_CRIT, "BUG: io_wait_loop_sigio_rt: the signal mask"
01259                                 " is not properly set!\n");
01260                 goto error;
01261         }
01262 again:
01263         n=sigtimedwait(&h->sset, &siginfo, &ts);
01264         if (unlikely(n==-1)){
01265                 if (errno==EINTR) goto again; /* some other signal, ignore it */
01266                 else if (errno==EAGAIN){ /* timeout */
01267                         ret=0;
01268                         goto end;
01269                 }else{
01270                         LOG(L_ERR, "ERROR: io_wait_loop_sigio_rt: sigtimed_wait"
01271                                         " %s [%d]\n", strerror(errno), errno);
01272                         goto error;
01273                 }
01274         }
01275         if (likely(n!=SIGIO)){
01276 #ifdef SIGINFO64_WORKARROUND
01277                 /* on linux siginfo.si_band is defined as long in userspace
01278                  * and as int in kernel (< 2.6.5) => on 64 bits things will break!
01279                  * (si_band will include si_fd, and si_fd will contain
01280                  *  garbage).
01281                  *  see /usr/src/linux/include/asm-generic/siginfo.h and
01282                  *      /usr/include/bits/siginfo.h
01283                  *  On newer kernels this is fixed (si_band is long in the kernel too).
01284                  * -- andrei */
01285                 if  ((_os_ver<0x020605) && (sizeof(siginfo.si_band)>sizeof(int))){
01286                         pi=(int*)(void*)&siginfo.si_band; /* avoid type punning warnings */
01287                         sigio_band=*pi;
01288                         sigio_fd=*(pi+1);
01289                 }else
01290 #endif
01291                 {
01292                         sigio_band=siginfo.si_band;
01293                         sigio_fd=siginfo.si_fd;
01294                 }
01295                 if (unlikely(siginfo.si_code==SI_SIGIO)){
01296                         /* old style, we don't know the event (linux 2.2.?) */
01297                         LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: old style sigio"
01298                                         " interface\n");
01299                         fm=get_fd_map(h, sigio_fd);
01300                         /* we can have queued signals generated by fds not watched
01301                          * any more, or by fds in transition, to a child => ignore them*/
01302                         if (fm->type)
01303                                 handle_io(fm, POLLIN|POLLOUT, -1);
01304                 }else{
01305                         /* si_code contains the SIGPOLL reason: POLL_IN, POLL_OUT,
01306                          *  POLL_MSG, POLL_ERR, POLL_PRI or POLL_HUP
01307                          * and si_band the translated poll event bitmap:
01308                          *  POLLIN|POLLRDNORM  (=POLL_IN),
01309                          *  POLLOUT|POLLWRNORM|POLLWRBAND (=POLL_OUT),
01310                          *  POLLIN|POLLRDNORM|POLLMSG (=POLL_MSG),
01311                          *  POLLERR (=POLL_ERR),
01312                          *  POLLPRI|POLLRDBAND (=POLL_PRI),
01313                          *  POLLHUP|POLLERR (=POLL_HUP)
01314                          *  [linux 2.6.22 fs/fcntl.c:447]
01315                          */
01316 #ifdef EXTRA_DEBUG
01317                         DBG("io_wait_loop_sigio_rt: siginfo: signal=%d (%d),"
01318                                         " si_code=%d, si_band=0x%x,"
01319                                         " si_fd=%d\n",
01320                                         siginfo.si_signo, n, siginfo.si_code,
01321                                         (unsigned)sigio_band,
01322                                         sigio_fd);
01323 #endif
01324                         /* on some errors (e.g. when receving TCP RST), sigio_band will
01325                          * be set to 0x08 (POLLERR) or 0x18 (POLLERR|POLLHUP - on stream
01326                          *  unix socket close) , so better catch all events --andrei */
01327                         if (likely(sigio_band)){
01328                                 fm=get_fd_map(h, sigio_fd);
01329                                 revents=sigio_band;
01330                                 /* fix revents==POLLPRI case */
01331                                 revents |= (!(revents & POLLPRI)-1) & POLLIN;
01332                                 /* we can have queued signals generated by fds not watched
01333                                  * any more, or by fds in transition, to a child
01334                                  * => ignore them */
01335                                 if (fm->type && ((fm->events|POLLERR|POLLHUP) & revents))
01336                                         handle_io(fm, revents, -1);
01337                                 else
01338                                         DBG("WARNING: io_wait_loop_sigio_rt: ignoring event"
01339                                                         " %x on fd %d, watching for %x, si_code=%x "
01340                                                         "(fm->type=%d, fm->fd=%d, fm->data=%p)\n",
01341                                                         sigio_band, sigio_fd, fm->events, siginfo.si_code,
01342                                                         fm->type, fm->fd, fm->data);
01343                         }else{
01344                                 LOG(L_ERR, "ERROR: io_wait_loop_sigio_rt: unexpected event"
01345                                                         " on fd %d: %x\n", sigio_fd, sigio_band);
01346                         }
01347                 }
01348         }else{
01349                 /* signal queue overflow
01350                  * TODO: increase signal queue size: 2.4x /proc/.., 2.6x -rlimits */
01351                 LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: signal queue overflowed"
01352                                         "- falling back to poll\n");
01353                 /* clear real-time signal queue
01354                  * both SIG_IGN and SIG_DFL are needed , it doesn't work
01355                  * only with SIG_DFL  */
01356                 if (signal(h->signo, SIG_IGN)==SIG_ERR){
01357                         LOG(L_CRIT, "BUG: do_poll: couldn't reset signal to IGN\n");
01358                 }
01359                 
01360                 if (signal(h->signo, SIG_DFL)==SIG_ERR){
01361                         LOG(L_CRIT, "BUG: do_poll: couldn't reset signal to DFL\n");
01362                 }
01363                 /* falling back to normal poll */
01364                 ret=io_wait_loop_poll(h, -1, 1);
01365         }
01366 end:
01367         return ret;
01368 error:
01369         return -1;
01370 }
01371 #endif
01372 
01373 
01374 
01375 #ifdef HAVE_DEVPOLL
01376 inline static int io_wait_loop_devpoll(io_wait_h* h, int t, int repeat)
01377 {
01378         int n, r;
01379         int ret;
01380         struct dvpoll dpoll;
01381         struct fd_map* fm;
01382 
01383                 dpoll.dp_timeout=t*1000;
01384                 dpoll.dp_nfds=h->fd_no;
01385                 dpoll.dp_fds=h->fd_array;
01386 again:
01387                 ret=n=ioctl(h->dpoll_fd, DP_POLL, &dpoll);
01388                 if (unlikely(n==-1)){
01389                         if (errno==EINTR) goto again; /* signal, ignore it */
01390                         else{
01391                                 LOG(L_ERR, "ERROR:io_wait_loop_devpoll: ioctl: %s [%d]\n",
01392                                                 strerror(errno), errno);
01393                                 goto error;
01394                         }
01395                 }
01396                 for (r=0; r< n; r++){
01397                         if (h->fd_array[r].revents & (POLLNVAL|POLLERR)){
01398                                 LOG(L_ERR, "ERROR: io_wait_loop_devpoll: pollinval returned"
01399                                                         " for fd %d, revents=%x\n",
01400                                                         h->fd_array[r].fd, h->fd_array[r].revents);
01401                         }
01402                         /* POLLIN|POLLHUP just go through */
01403                         fm=get_fd_map(h, h->fd_array[r].fd);
01404                         while(fm->type && (fm->events & h->fd_array[r].revents) &&
01405                                         (handle_io(fm, h->fd_array[r].revents, r) > 0) && repeat);
01406                 }
01407 error:
01408         return ret;
01409 }
01410 #endif
01411 
01412 
01413 
01414 /* init */
01415 
01416 
01417 /* initializes the static vars/arrays
01418  * params:      h - pointer to the io_wait_h that will be initialized
01419  *         max_fd - maximum allowed fd number
01420  *         poll_m - poll method (0 for automatic best fit)
01421  */
01422 int init_io_wait(io_wait_h* h, int max_fd, enum poll_types poll_method);
01423 
01424 /* destroys everything init_io_wait allocated */
01425 void destroy_io_wait(io_wait_h* h);
01426 
01427 
01428 #endif

Generated on Tue May 22 2012 13:10:07 for SIP Router by  doxygen 1.7.1