/[LeafOK_CVS]/pvpgn-1.7.4/src/common/fdwatch_kqueue.c
ViewVC logotype

Annotation of /pvpgn-1.7.4/src/common/fdwatch_kqueue.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (hide annotations)
Tue Jun 6 03:41:38 2006 UTC (19 years, 9 months ago) by sysadm
CVS Tags: pvpgn_1-7-4-0_MIL
Branch point for: GNU, MAIN
Content type: text/x-csrc
Initial revision

1 sysadm 1.1 /*
2     * Abstraction API/layer for the various ways PvPGN can inspect sockets state
3     * 2003 (C) dizzy@roedu.net
4     *
5     * Code is based on the ideas found in thttpd project.
6     *
7     * *BSD kqueue(2) based backend
8     *
9     * This program is free software; you can redistribute it and/or
10     * modify it under the terms of the GNU General Public License
11     * as published by the Free Software Foundation; either version 2
12     * of the License, or (at your option) any later version.
13     *
14     * This program is distributed in the hope that it will be useful,
15     * but WITHOUT ANY WARRANTY; without even the implied warranty of
16     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17     * GNU General Public License for more details.
18     *
19     * You should have received a copy of the GNU General Public License
20     * along with this program; if not, write to the Free Software
21     * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22     */
23    
24     #include "common/setup_before.h"
25     #ifdef STDC_HEADERS
26     # include <stdlib.h>
27     #else
28     # ifdef HAVE_MALLOC_H
29     # include <malloc.h>
30     # endif
31     #endif
32     #ifdef HAVE_STRING_H
33     # include <string.h>
34     #else
35     # ifdef HAVE_STRINGS_H
36     # include <strings.h>
37     # endif
38     #endif
39     #ifdef HAVE_SYS_TYPES_H
40     # include <sys/types.h>
41     #endif
42     #ifdef HAVE_SYS_EVENT_H
43     # include <sys/event.h>
44     #endif
45     #ifdef HAVE_SYS_TIME_H
46     # include <sys/time.h>
47     #endif
48     #include "fdwatch.h"
49     #include "common/eventlog.h"
50     #include "common/xalloc.h"
51     #include "common/setup_after.h"
52    
53     #ifdef HAVE_KQUEUE
54     static int sr;
55     static int kq;
56     static struct kevent *kqchanges = NULL; /* changes to make to kqueue */
57     static struct kevent *kqevents = NULL; /* events to investigate */
58     /* r/w indices from idx to the kqchanges index where the change is stored */
59     static int *_rridx, *_wridx;
60     static unsigned nochanges;
61    
62     static int fdw_kqueue_init(int nfds);
63     static int fdw_kqueue_close(void);
64     static int fdw_kqueue_add_fd(int idx, t_fdwatch_type rw);
65     static int fdw_kqueue_del_fd(int idx);
66     static int fdw_kqueue_watch(long timeout_msecs);
67     static void fdw_kqueue_handle(void);
68    
69     t_fdw_backend fdw_kqueue = {
70     fdw_kqueue_init,
71     fdw_kqueue_close,
72     fdw_kqueue_add_fd,
73     fdw_kqueue_del_fd,
74     fdw_kqueue_watch,
75     fdw_kqueue_handle
76     };
77    
78     static int fdw_kqueue_init(int nfds)
79     {
80     int i;
81    
82     if ((kq = kqueue()) == -1)
83     return -1;
84     kqevents = (struct kevent *) xmalloc(sizeof(struct kevent) * nfds);
85     kqchanges = (struct kevent *) xmalloc(sizeof(struct kevent) * nfds * 2);
86     _rridx = (int *) xmalloc(sizeof(int) * nfds);
87     _wridx = (int *) xmalloc(sizeof(int) * nfds);
88    
89     memset(kqchanges, 0, sizeof(struct kevent) * nfds);
90     for (i = 0; i < nfds; i++)
91     {
92     _rridx[i] = -1;
93     _wridx[i] = -1;
94     }
95     sr = 0;
96     nochanges = 0;
97    
98     eventlog(eventlog_level_info, __FUNCTION__, "fdwatch kqueue() based layer initialized (max %d sockets)", nfds);
99     return 0;
100     }
101    
102     static int fdw_kqueue_close(void)
103     {
104     if (_rridx) { xfree((void *) _rridx); _rridx = NULL; }
105     if (_wridx) { xfree((void *) _wridx); _wridx = NULL; }
106     if (kqchanges) { xfree((void *) kqchanges); kqchanges = NULL; }
107     if (kqevents) { xfree((void *) kqevents); kqevents = NULL; }
108     sr = 0;
109     nochanges = 0;
110    
111     return 0;
112     }
113    
114     static int fdw_kqueue_add_fd(int idx, t_fdwatch_type rw)
115     {
116     static int ridx;
117     t_fdwatch_fd *cfd;
118    
119     /* eventlog(eventlog_level_trace, __FUNCTION__, "called fd: %d rw: %d", fd, rw); */
120    
121     cfd = fdw_fds + idx;
122     /* adding read event filter */
123     if (!(fdw_rw(cfd) & fdwatch_type_read) && rw & fdwatch_type_read)
124     {
125     if (_rridx[idx] >= 0 && _rridx[idx] < nochanges && kqchanges[_rridx[idx]].ident == fdw_fd(cfd))
126     {
127     ridx = _rridx[idx];
128     /* eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (read) fd on %d", ridx); */
129     } else {
130     ridx = nochanges++;
131     _rridx[idx] = ridx;
132     /* eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (read) fd on %d", ridx); */
133     }
134     EV_SET(kqchanges + ridx, fdw_fd(cfd), EVFILT_READ, EV_ADD, 0, 0, (void*)idx);
135     }
136     else if (fdw_rw(cfd) & fdwatch_type_read && !( rw & fdwatch_type_read))
137     {
138     if (_rridx[idx] >= 0 && _rridx[idx] < nochanges && kqchanges[_rridx[idx]].ident == fdw_fd(cfd))
139     {
140     ridx = _rridx[idx];
141     /* eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (read) fd on %d", ridx); */
142     } else {
143     ridx = nochanges++;
144     _rridx[idx] = ridx;
145     /* eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (read) fd on %d", ridx); */
146     }
147     EV_SET(kqchanges + ridx, fdw_fd(cfd), EVFILT_READ, EV_DELETE, 0, 0, (void*)idx);
148     }
149    
150     /* adding write event filter */
151     if (!(fdw_rw(cfd) & fdwatch_type_write) && rw & fdwatch_type_write)
152     {
153     if (_wridx[idx] >= 0 && _wridx[idx] < nochanges && kqchanges[_wridx[idx]].ident == fdw_fd(cfd))
154     {
155     ridx = _wridx[idx];
156     /* eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (write) fd on %d", ridx); */
157     } else {
158     ridx = nochanges++;
159     _wridx[idx] = ridx;
160     /* eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (write) fd on %d", ridx); */
161     }
162     EV_SET(kqchanges + ridx, fdw_fd(cfd), EVFILT_WRITE, EV_ADD, 0, 0, (void*)idx);
163     }
164     else if (fdw_rw(cfd) & fdwatch_type_write && !(rw & fdwatch_type_write))
165     {
166     if (_wridx[idx] >= 0 && _wridx[idx] < nochanges && kqchanges[_wridx[idx]].ident == fdw_fd(cfd))
167     {
168     ridx = _wridx[idx];
169     /* eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (write) fd on %d", ridx); */
170     } else {
171     ridx = nochanges++;
172     _wridx[idx] = ridx;
173     /* eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (write) fd on %d", ridx); */
174     }
175     EV_SET(kqchanges + ridx, fdw_fd(cfd), EVFILT_WRITE, EV_DELETE, 0, 0, (void*)idx);
176     }
177    
178     return 0;
179     }
180    
181     static int fdw_kqueue_del_fd(int idx)
182     {
183     t_fdwatch_fd *cfd;
184    
185     /* eventlog(eventlog_level_trace, __FUNCTION__, "called fd: %d", fd); */
186     if (sr > 0)
187     eventlog(eventlog_level_error, __FUNCTION__, "BUG: called while still handling sockets");
188    
189     cfd = fdw_fds + idx;
190     /* the last event changes about this fd has not yet been sent to kernel */
191     if (fdw_rw(cfd) & fdwatch_type_read &&
192     nochanges && _rridx[idx] >= 0 && _rridx[idx] < nochanges &&
193     kqchanges[_rridx[idx]].ident == fdw_fd(cfd))
194     {
195     nochanges--;
196     if (_rridx[idx] < nochanges)
197     {
198     int oidx;
199    
200     oidx = (int)(kqchanges[nochanges].udata);
201     if (kqchanges[nochanges].filter == EVFILT_READ &&
202     _rridx[oidx] == nochanges)
203     {
204     /* eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
205     _rridx[oidx] = _rridx[idx];
206     memcpy(kqchanges + _rridx[idx], kqchanges + nochanges, sizeof(struct kevent));
207     }
208    
209     if (kqchanges[nochanges].filter == EVFILT_WRITE &&
210     _wridx[oidx] == nochanges)
211     {
212     /* eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
213     _wridx[oidx] = _rridx[idx];
214     memcpy(kqchanges + _rridx[idx], kqchanges + nochanges, sizeof(struct kevent));
215     }
216     }
217     _rridx[idx] = -1;
218     }
219    
220     if (fdw_rw(cfd) & fdwatch_type_write &&
221     nochanges && _wridx[idx] >= 0 && _wridx[idx] < nochanges &&
222     kqchanges[_wridx[idx]].ident == fdw_fd(cfd))
223     {
224     nochanges--;
225     if (_wridx[idx] < nochanges)
226     {
227     int oidx;
228    
229     oidx = (int)(kqchanges[nochanges].udata);
230     if (kqchanges[nochanges].filter == EVFILT_READ &&
231     _rridx[oidx] == nochanges)
232     {
233     /* eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
234     _rridx[oidx] = _wridx[idx];
235     memcpy(kqchanges + _wridx[idx], kqchanges + nochanges, sizeof(struct kevent));
236     }
237    
238     if (kqchanges[nochanges].filter == EVFILT_WRITE &&
239     _wridx[oidx] == nochanges)
240     {
241     /* eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
242     _wridx[oidx] = _wridx[idx];
243     memcpy(kqchanges + _wridx[idx], kqchanges + nochanges, sizeof(struct kevent));
244     }
245     }
246     _wridx[idx] = -1;
247     }
248    
249     /* here we presume the calling code does close() on the socket and if so
250     * it is automatically removed from any kernel kqueues */
251    
252     return 0;
253     }
254    
255     static int fdw_kqueue_watch(long timeout_msec)
256     {
257     static struct timespec ts;
258    
259     ts.tv_sec = timeout_msec / 1000L;
260     ts.tv_nsec = (timeout_msec % 1000L) * 1000000L;
261     sr = kevent(kq, nochanges > 0 ? kqchanges : NULL, nochanges, kqevents, fdw_maxcons, &ts);
262     nochanges = 0;
263     return sr;
264     }
265    
266     static void fdw_kqueue_handle(void)
267     {
268     register unsigned i;
269     t_fdwatch_fd *cfd;
270    
271     /* eventlog(eventlog_level_trace, __FUNCTION__, "called"); */
272     for (i = 0; i < sr; i++)
273     {
274     /* eventlog(eventlog_level_trace, __FUNCTION__, "checking %d ident: %d read: %d write: %d", i, kqevents[i].ident, kqevents[i].filter & EVFILT_READ, kqevents[i].filter & EVFILT_WRITE); */
275     cfd = fdw_fds + (int)kqevents[i].udata;
276     if (fdw_rw(cfd) & fdwatch_type_read && kqevents[i].filter == EVFILT_READ)
277     if (fdw_hnd(cfd) (fdw_data(cfd), fdwatch_type_read) == -2)
278     continue;
279    
280     if (fdw_rw(cfd) & fdwatch_type_write && kqevents[i].filter == EVFILT_WRITE)
281     fdw_hnd(cfd) (fdw_data(cfd), fdwatch_type_write);
282     }
283     sr = 0;
284     }
285    
286     #endif /* HAVE_KQUEUE */

webmaster@leafok.com
ViewVC Help
Powered by ViewVC 1.3.0-beta1