1 module hio.drivers.kqueue;
2 
3 version(FreeBSD) {
4     version = KQUEUE;
5 }
6 version(OSX) {
7     version = KQUEUE;
8 }
9 
10 version(KQUEUE):
11 
12 import std.algorithm;
13 import std.datetime;
14 import std.conv;
15 import std.string;
16 import std.container;
17 import std.stdio;
18 import std.exception;
19 import std.experimental.logger;
20 import std.typecons;
21 import std.experimental.allocator;
22 import std.experimental.allocator.mallocator;
23 
24 import core.memory;
25 
26 import std.algorithm.comparison: max;
27 import core.sys.posix.fcntl: open, O_RDONLY;
28 import core.sys.posix.unistd: close;
29 
30 version(OSX) {
31     import core.sys.darwin.sys.event;
32 }
33 version(FreeBSD) {
34     import core.sys.freebsd.sys.event;
35 }
36 
37 import core.sys.posix.signal;
38 import core.stdc.stdint : intptr_t, uintptr_t;
39 import core.stdc.string: strerror;
40 import core.stdc.errno: errno;
41 
42 import timingwheels;
43 
44 import hio.events;
45 import hio.common;
46 
47 //enum : short {
48 //    EVFILT_READ =      (-1),
49 //    EVFILT_WRITE =     (-2),
50 //    EVFILT_AIO =       (-3),    /* attached to aio requests */
51 //    EVFILT_VNODE =     (-4),    /* attached to vnodes */
52 //    EVFILT_PROC =      (-5),    /* attached to struct proc */
53 //    EVFILT_SIGNAL =    (-6),    /* attached to struct proc */
54 //    EVFILT_TIMER =     (-7),    /* timers */
55 //    EVFILT_MACHPORT =  (-8),    /* Mach portsets */
56 //    EVFILT_FS =        (-9),    /* Filesystem events */
57 //    EVFILT_USER =      (-10),   /* User events */
58 //            /* (-11) unused */
59 //    EVFILT_VM =        (-12)   /* Virtual memory events */
60 //}
61 
62 //enum : ushort {
63 ///* actions */
64 //    EV_ADD  =                0x0001,          /* add event to kq (implies enable) */
65 //    EV_DELETE =              0x0002,          /* delete event from kq */
66 //    EV_ENABLE =              0x0004,          /* enable event */
67 //    EV_DISABLE =             0x0008          /* disable event (not reported) */
68 //}
69 
70 //struct kevent_t {
71 //    uintptr_t       ident;          /* identifier for this event */
72 //    short           filter;         /* filter for event */
73 //    ushort          flags;          /* general flags */
74 //    uint            fflags;         /* filter-specific flags */
75 //    intptr_t        data;           /* filter-specific data */
76 //    void*           udata;
77 //}
78 
79 //extern(C) int kqueue() @safe @nogc nothrow;
80 //extern(C) int kevent(int kqueue_fd, const kevent_t *events, int ne, const kevent_t *events, int ne,timespec* timeout) @safe @nogc nothrow;
81 
82 auto s_kevent(A...)(A args) @trusted @nogc nothrow {
83     return kevent(args);
84 }
85 
86 Timer udataToTimer(T)(T udata) @trusted {
87     return cast(Timer)udata;
88 }
89 
90 struct NativeEventLoopImpl {
91     immutable bool   native = true;
92     immutable string _name = "kqueue";
93     @disable this(this) {}
94     private {
95         bool stopped = false;
96         bool inshutdown = false;
97         enum MAXEVENTS = 512;
98         enum TOTAL_FILE_HANDLERS = 16*1024;
99         int  kqueue_fd = -1;  // interface to kernel
100         int  in_index;
101         int  ready;
102 
103         timespec    ts;
104 
105         kevent_t[MAXEVENTS]     in_events;
106         kevent_t[MAXEVENTS]     out_events;
107 
108         Duration                tick = 5.msecs;
109         TimingWheels!Timer      timingwheels;
110         Timer[]                 overdue;   // timers added with expiration in past placed here
111 
112         Signal[][int]           signals;   // this is signals container
113 
114         FileEventHandler[]      fileHandlers;
115 
116     }
117     void initialize() @trusted nothrow
118     {
119         if ( kqueue_fd == -1) {
120             kqueue_fd = kqueue();
121         }
122         debug(hiokqueue) safe_tracef("kqueue_fd=%d", kqueue_fd);
123         fileHandlers = Mallocator.instance.makeArray!FileEventHandler(TOTAL_FILE_HANDLERS);
124         GC.addRange(fileHandlers.ptr, fileHandlers.length*FileEventHandler.sizeof);
125         timingwheels.init();
126     }
127     void deinit() @trusted {
128         debug tracef("deinit");
129         if ( kqueue_fd != -1 )
130         {
131             close(kqueue_fd);
132             kqueue_fd = -1;
133         }
134         in_index = 0;
135         GC.removeRange(&fileHandlers[0]);
136         Mallocator.instance.dispose(fileHandlers);
137     }
138     int get_kernel_id() pure @safe nothrow @nogc {
139         return kqueue_fd;
140     }
141     void stop() @safe pure {
142         debug trace("mark eventloop as stopped");
143         stopped = true;
144     }
145 
146     timespec _calculate_timespec(SysTime deadline) @safe {
147         timespec ts;
148 
149         auto now_real = Clock.currTime;
150         Duration delta = deadline - now_real;
151         debug(hiokqueue) safe_tracef("deadline - now_real: %s", delta);
152         auto nextTWtimer = timingwheels.timeUntilNextEvent(tick, now_real.stdTime);
153         debug(hiokqueue) safe_tracef("nextTWtimer: %s", nextTWtimer);
154         delta = min(delta, nextTWtimer);
155 
156         delta = max(delta, 0.seconds);
157 
158         debug(hiokqueue) safe_tracef("delta = %s", delta);
159         auto ds = delta.split!("seconds", "nsecs");
160         ts.tv_sec = cast(typeof(timespec.tv_sec))ds.seconds;
161         ts.tv_nsec = cast(typeof(timespec.tv_nsec))ds.nsecs;
162         return ts;
163     }
164 
165     private void execute_overdue_timers() @safe
166     {
167         int limit = 100;
168         while (overdue.length > 0 && --limit > 0 )
169         {
170             // execute timers which user requested with negative delay
171             Timer t = overdue[0];
172             overdue = overdue[1..$];
173             if ( t is null )
174             {
175                 // timer was removed
176                 continue;
177             }
178             debug(hiokqueue) tracef("execute overdue %s", t);
179             HandlerDelegate h = t._handler;
180             try {
181                 if (inshutdown)
182                 {
183                     h(AppEvent.SHUTDOWN);
184                 }
185                 else
186                 {
187                     h(AppEvent.TMO);
188                 }
189             } catch (Exception e) {
190                 import core.stdc.stdio: stderr, fprintf;
191                 () @trusted {fprintf(stderr, "Uncaught exception: %s", toStringz(e.toString));}();
192             }
193         }
194     }
195     void shutdown() @safe
196     in(!stopped && !inshutdown)
197     {
198         // scan through all timers
199         auto a = timingwheels.allTimers();
200         foreach(t; a.timers)
201         {
202             try
203             {
204                 debug(hiokqueue) tracef("send shutdown to timer %s", t);
205                 t._handler(AppEvent.SHUTDOWN);
206             }
207             catch(Exception e)
208             {
209                 errorf("Error on event SHUTDOWN in %s", t);
210             }
211         }
212         // scan through active file descriptors
213         // and send SHUTDOWN event
214         if (fileHandlers is null)
215         {
216             inshutdown = true;
217             return;
218         }
219         for(int i=0;i<TOTAL_FILE_HANDLERS;i++)
220         {
221             if (fileHandlers[i] is null)
222             {
223                 continue;
224             }
225             try
226             {
227                 debug(hiokqueue) tracef("send shutdown to filehandler %s", fileHandlers[i]);
228                 fileHandlers[i].eventHandler(i, AppEvent.SHUTDOWN);
229             }
230             catch(Exception e)
231             {
232                 () @trusted {printf("exception: %s:%d: %s", __FILE__.ptr, __LINE__, toStringz(e.toString));}();
233             }
234         }
235         inshutdown = true;
236     }
237     void run(Duration d) @safe {
238 
239         immutable bool runInfinitely = (d == Duration.max);
240         SysTime     deadline;
241 
242         if ( !runInfinitely ) {
243             deadline = Clock.currTime + d;
244         }
245         else
246         {
247             deadline = SysTime.max;
248         }
249 
250         debug tracef("evl run for %s", d);
251 
252         scope(exit) {
253             stopped = false;
254         }
255 
256         while(!stopped)
257         {
258             // handle overdue timers
259             execute_overdue_timers();
260 
261             if (stopped) {
262                 break;
263             } 
264             ts = _calculate_timespec(deadline);
265 
266             debug(hiokqueue) safe_tracef("waiting for %s", ts);
267             debug(hiokqueue) safe_tracef("waiting events %s", in_events[0..in_index]);
268 
269             ready = s_kevent(kqueue_fd,
270                                 cast(kevent_t*)&in_events[0], in_index,
271                                 cast(kevent_t*)&out_events[0], MAXEVENTS,
272                                 &ts);
273             SysTime now_real = Clock.currTime;
274             in_index = 0;
275             debug tracef("kevent returned %d events", ready);
276             debug tracef("");
277 
278 
279             if ( ready < 0 ) {
280                 error("kevent returned error %s".format(s_strerror(errno)));
281             }
282             //
283             // handle kernel events
284             //
285             foreach(i; 0..ready) {
286                 if ( stopped ) {
287                     break;
288                 }
289                 auto e = out_events[i];
290                 debug tracef("got kevent[%d] %s, data: %d, udata: %0x", i, e, e.data, e.udata);
291 
292                 switch (e.filter) {
293                     case EVFILT_READ:
294                         debug tracef("Read on fd %d", e.ident);
295                         AppEvent ae = AppEvent.IN;
296                         if ( e.flags & EV_ERROR) {
297                             ae |= AppEvent.ERR;
298                         }
299                         if ( e.flags & EV_EOF) {
300                             ae |= AppEvent.HUP;
301                         }
302                         int fd = cast(int)e.ident;
303                         fileHandlers[fd].eventHandler(cast(int)e.ident, ae);
304                         continue;
305                     case EVFILT_WRITE:
306                         debug tracef("Write on fd %d", e.ident);
307                         AppEvent ae = AppEvent.OUT;
308                         if ( e.flags & EV_ERROR) {
309                             ae |= AppEvent.ERR;
310                         }
311                         if ( e.flags & EV_EOF) {
312                             ae |= AppEvent.HUP;
313                         }
314                         int fd = cast(int)e.ident;
315                         fileHandlers[fd].eventHandler(cast(int)e.ident, ae);
316                         continue;
317                     case EVFILT_SIGNAL:
318                         assert(signals.length != 0);
319                         auto signum = cast(int)e.ident;
320                         debug tracef("received signal %d", signum);
321                         assert(signals[signum].length > 0);
322                         foreach(s; signals[signum]) {
323                             debug tracef("processing signal handler %s", s);
324                             try {
325                                 SigHandlerDelegate h = s._handler;
326                                 h(signum);
327                             } catch (Exception e) {
328                                 errorf("Uncaught exception: %s", e.msg);
329                             }
330                         }
331                         continue;
332                     case EVFILT_USER:
333                         handle_user_event(e);
334                         continue;
335                     default:
336                         break;
337                 }
338             }
339             auto toCatchUp = timingwheels.ticksToCatchUp(tick, now_real.stdTime);
340             if(toCatchUp>0)
341             {
342                 auto wr = timingwheels.advance(toCatchUp);
343                 foreach(t; wr.timers)
344                 {
345                     HandlerDelegate h = t._handler;
346                     assert(t._armed);
347                     t._armed = false;
348                     try {
349                         h(AppEvent.TMO);
350                     } catch (Exception e) {
351                         throw e;
352                         //errorf("Uncaught exception: %s", e);
353                     }
354                 }
355             }
356             execute_overdue_timers();
357             if (!runInfinitely && now_real >= deadline)
358             {
359                 debug(hiokqueue) safe_tracef("reached deadline, return");
360                 return;
361             }
362         }
363     }
364 
365     void start_timer(Timer t) @trusted {
366         debug(hiokqueue) tracef("insert timer: %s", t);
367         if ( inshutdown)
368         {
369             throw new LoopShutdownException("starting timer");
370         }
371         assert(!t._armed);
372         t._armed = true;
373         auto now = Clock.currTime;
374         auto d = t._expires - now;
375         d = max(d, 0.seconds);
376         if ( d < tick ) {
377             overdue ~= t;
378             return;
379         }
380         ulong twNow = timingwheels.currStdTime(tick);
381         Duration twdelay = (now.stdTime - twNow).hnsecs;
382         debug(hiokqueue) safe_tracef("tw delay: %s", (now.stdTime - twNow).hnsecs);
383         timingwheels.schedule(t, (d + twdelay)/tick);
384     }
385 
386     // bool timer_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc {
387     //     foreach(ref o; out_events[0..ready]) {
388     //         if ( o.ident == e.ident && o.filter == e.filter && o.udata == e.udata ) {
389     //             o.ident = 0;
390     //             o.filter = 0;
391     //             o.udata = null;
392     //             return true;
393     //         }
394     //     }
395     //     return false;
396     // }
397 
398     void stop_timer(Timer t) @safe {
399         debug(hiokqueue) safe_tracef("remove timer %s", t);
400         if (overdue.length > 0)
401             for(int i=0;i<overdue.length;i++)
402             {
403                 if (overdue[i] is t)
404                 {
405                     overdue[i] = null;
406                     debug(hioepoll) tracef("remove timer from overdue %s", t);
407                 }
408             }
409         if (timingwheels.totalTimers() > 0)
410         {
411             // static destructors can try to stop timers after loop deinit
412             timingwheels.cancel(t);
413         }
414     }
415 
416     void flush() @trusted {
417         if ( in_index == 0 ) {
418             return;
419         }
420         // flush
421         int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null);
422         enforce(rc>=0, "flush: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index]));
423         in_index = 0;
424     }
425 
426     bool fd_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc {
427         foreach(ref o; out_events[0..ready]) {
428             if ( o.ident == e.ident && o.filter == e.filter ) {
429                 o.ident = 0;
430                 o.filter = 0;
431                 return true;
432             }
433         }
434         return false;
435     }
436 
437     void detach(int fd) @safe {
438         fileHandlers[fd] = null;
439     }
440     void start_poll(int fd, AppEvent ev, FileEventHandler h) @safe {
441         assert(fd>=0);
442         immutable filter = appEventToSysEvent(ev);
443         debug tracef("start poll on fd %d for events %s", fd, appeventToString(ev));
444         if ( inshutdown )
445         {
446             //throw new LoopShutdownException("start polling");
447             h.eventHandler(fd, AppEvent.SHUTDOWN);
448             return;
449         }
450         kevent_t e;
451         e.ident = fd;
452         e.filter = cast(typeof(e.filter))filter;
453         e.flags = EV_ADD;
454         if ( in_index == MAXEVENTS ) {
455             flush();
456         }
457         in_events[in_index++] = e;
458         fileHandlers[fd] = h;
459     }
460     void stop_poll(int fd, AppEvent ev) @safe {
461         assert(fd>=0);
462         immutable filter = appEventToSysEvent(ev);
463         debug tracef("stop poll on fd %d for events %s", fd, appeventToString(ev));
464         kevent_t e;
465         e.ident = fd;
466         e.filter = cast(typeof(e.filter))filter;
467         e.flags = EV_DELETE|EV_DISABLE;
468         fd_cleared_from_out_events(e);
469         if ( in_index == MAXEVENTS ) {
470             flush();
471         }
472         in_events[in_index++] = e;
473         flush();
474     }
475 
476     pragma(inline, true)
477     void handle_user_event(kevent_t e) @safe {
478         import core.thread;
479         debug tracef("Got user event thread.id:%s event.id:%d", Thread.getThis().id(), e.ident);
480         disable_user_event(e);
481         auto h = fileHandlers[e.ident];
482         h.eventHandler(kqueue_fd, AppEvent.USER);
483     }
484 
485     void wait_for_user_event(int event_id, FileEventHandler handler) @safe {
486         debug tracef("start waiting for user_event %s", event_id);
487         fileHandlers[event_id] = handler;
488         kevent_t e;
489         e.ident = event_id;
490         e.filter = EVFILT_USER;
491         e.flags = EV_ADD;
492         if ( in_index == MAXEVENTS ) {
493             flush();
494         }
495         in_events[in_index++] = e;
496     }
497     void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe {
498         debug tracef("start waiting for user_event %s", event_id);
499         fileHandlers[event_id] = null;
500         kevent_t e;
501         e.ident = event_id;
502         e.filter = EVFILT_USER;
503         e.flags = EV_DELETE;
504         if ( in_index == MAXEVENTS ) {
505             flush();
506         }
507         in_events[in_index++] = e;
508     }
509     void disable_user_event(kevent_t e) @safe {
510         e.flags = EV_DISABLE;
511         if ( in_index == MAXEVENTS ) {
512             flush();
513         }
514         in_events[in_index++] = e;
515     }
516     void _add_kernel_timer(in Timer t, in Duration d) @trusted {
517         debug tracef("add kernel timer %s, delta %s", t, d);
518         assert(d >= 0.seconds);
519         intptr_t delay_ms;
520         if ( d < 36500.days)
521         {
522             delay_ms = d.split!"msecs".msecs;
523         }
524         else
525         {
526             // https://github.com/opensource-apple/xnu/blob/master/bsd/kern/kern_event.c#L1188
527             // OSX kerner refuses to set too large timer interwal with errno ERANGE
528             delay_ms = 36500.days.split!"msecs".msecs;
529         }
530         kevent_t e;
531         e.ident = 0;
532         e.filter = EVFILT_TIMER;
533         e.flags = EV_ADD | EV_ONESHOT;
534         e.data = delay_ms;
535         e.udata = cast(void*)t;
536         if ( in_index == MAXEVENTS ) {
537             flush();
538         }
539         in_events[in_index++] = e;
540     }
541 
542     alias _mod_kernel_timer = _add_kernel_timer;
543 
544     void _del_kernel_timer() @safe {
545         debug trace("del kernel timer");
546         kevent_t e;
547         e.ident = 0;
548         e.filter = EVFILT_TIMER;
549         e.flags = EV_DELETE;
550         if ( in_index == MAXEVENTS ) {
551             flush();
552         }
553         in_events[in_index++] = e;
554     }
555 
556     /*
557      * signal functions
558      */
559 
560     void start_signal(Signal s) {
561         debug tracef("start signal %s", s);
562         debug tracef("signals: %s", signals);
563         auto r = s._signum in signals;
564         if ( r is null || r.length == 0 ) {
565             // enable signal only through kevent
566             _add_kernel_signal(s);
567         }
568         signals[s._signum] ~= s;
569     }
570 
571     void stop_signal(Signal s) {
572         debug trace("stop signal");
573         auto r = s._signum in signals;
574         if ( r is null ) {
575             throw new NotFoundException("You tried to stop signal that was not started");
576         }
577         Signal[] new_row;
578         foreach(a; *r) {
579             if (a._id == s._id) {
580                 continue;
581             }
582             new_row ~= a;
583         }
584         if ( new_row.length == 0 ) {
585             *r = null;
586             _del_kernel_signal(s);
587             // reenable old signal behaviour
588         } else {
589             *r = new_row;
590         }
591         debug tracef("new signals %d row %s", s._signum, new_row);
592     }
593 
594     void _add_kernel_signal(in Signal s) {
595         debug tracef("add kernel signal %d, id: %d", s._signum, s._id);
596         signal(s._signum, SIG_IGN);
597 
598         kevent_t e;
599         e.ident = s._signum;
600         e.filter = EVFILT_SIGNAL;
601         e.flags = EV_ADD;
602         if ( in_index == MAXEVENTS ) {
603             // flush
604             int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null);
605             enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index]));
606             in_index = 0;
607         }
608         in_events[in_index++] = e;
609     }
610 
611     void _del_kernel_signal(in Signal s) {
612         debug tracef("del kernel signal %d, id: %d", s._signum, s._id);
613 
614         signal(s._signum, SIG_DFL);
615 
616         kevent_t e;
617         e.ident = s._signum;
618         e.filter = EVFILT_SIGNAL;
619         e.flags = EV_DELETE;
620         if ( in_index == MAXEVENTS ) {
621             // flush
622             int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null);
623             enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index]));
624             in_index = 0;
625         }
626         in_events[in_index++] = e;
627     }
628 }
629 
630 auto appEventToSysEvent(AppEvent ae) {
631     import core.bitop;
632     // clear EXT_ flags
633     ae &= AppEvent.ALL;
634     assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae)));
635     assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events");
636     switch ( ae ) {
637         case AppEvent.IN:
638             return EVFILT_READ;
639         case AppEvent.OUT:
640             return EVFILT_WRITE;
641         case AppEvent.CONN:
642             return EVFILT_READ;
643         default:
644             throw new Exception("You can't wait for event %X".format(ae));
645     }
646 }
647 AppEvent sysEventToAppEvent(short se) {
648     final switch ( se ) {
649         case EVFILT_READ:
650             return AppEvent.IN;
651         case EVFILT_WRITE:
652             return AppEvent.OUT;
653         // default:
654         //     throw new Exception("Unexpected event %d".format(se));
655     }
656 }
657 unittest {
658     import std.exception;
659     import core.exception;
660 
661     assert(appEventToSysEvent(AppEvent.IN)==EVFILT_READ);
662     assert(appEventToSysEvent(AppEvent.OUT)==EVFILT_WRITE);
663     assert(appEventToSysEvent(AppEvent.CONN)==EVFILT_READ);
664     //assertThrown!AssertError(appEventToSysEvent(AppEvent.IN | AppEvent.OUT));
665     assert(sysEventToAppEvent(EVFILT_READ) == AppEvent.IN);
666     assert(sysEventToAppEvent(EVFILT_WRITE) == AppEvent.OUT);
667 }