1 module hio.drivers.kqueue;
2 
3 version(OSX):
4 
5 import std.datetime;
6 import std.conv;
7 import std.string;
8 import std.container;
9 import std.stdio;
10 import std.exception;
11 import std.experimental.logger;
12 import std.typecons;
13 import std.experimental.allocator;
14 import std.experimental.allocator.mallocator;
15 
16 import core.memory;
17 
18 import std.algorithm.comparison: max;
19 import core.sys.posix.fcntl: open, O_RDONLY;
20 import core.sys.posix.unistd: close;
21 
22 import core.sys.darwin.sys.event;
23 
24 import core.sys.posix.signal;
25 import core.stdc.stdint : intptr_t, uintptr_t;
26 import core.stdc.string: strerror;
27 import core.stdc.errno: errno;
28 
29 import hio.events;
30 import hio.common;
31 
32 //enum : short {
33 //    EVFILT_READ =      (-1),
34 //    EVFILT_WRITE =     (-2),
35 //    EVFILT_AIO =       (-3),    /* attached to aio requests */
36 //    EVFILT_VNODE =     (-4),    /* attached to vnodes */
37 //    EVFILT_PROC =      (-5),    /* attached to struct proc */
38 //    EVFILT_SIGNAL =    (-6),    /* attached to struct proc */
39 //    EVFILT_TIMER =     (-7),    /* timers */
40 //    EVFILT_MACHPORT =  (-8),    /* Mach portsets */
41 //    EVFILT_FS =        (-9),    /* Filesystem events */
42 //    EVFILT_USER =      (-10),   /* User events */
43 //            /* (-11) unused */
44 //    EVFILT_VM =        (-12)   /* Virtual memory events */
45 //}
46 
47 //enum : ushort {
48 ///* actions */
49 //    EV_ADD  =                0x0001,          /* add event to kq (implies enable) */
50 //    EV_DELETE =              0x0002,          /* delete event from kq */
51 //    EV_ENABLE =              0x0004,          /* enable event */
52 //    EV_DISABLE =             0x0008          /* disable event (not reported) */
53 //}
54 
55 //struct kevent_t {
56 //    uintptr_t       ident;          /* identifier for this event */
57 //    short           filter;         /* filter for event */
58 //    ushort          flags;          /* general flags */
59 //    uint            fflags;         /* filter-specific flags */
60 //    intptr_t        data;           /* filter-specific data */
61 //    void*           udata;
62 //}
63 
64 //extern(C) int kqueue() @safe @nogc nothrow;
65 //extern(C) int kevent(int kqueue_fd, const kevent_t *events, int ne, const kevent_t *events, int ne,timespec* timeout) @safe @nogc nothrow;
66 
67 auto s_kevent(A...)(A args) @trusted @nogc nothrow {
68     return kevent(args);
69 }
70 
71 Timer udataToTimer(T)(T udata) @trusted {
72     return cast(Timer)udata;
73 }
74 
75 struct NativeEventLoopImpl {
76     immutable bool   native = true;
77     immutable string _name = "kqueue";
78     @disable this(this) {}
79     private {
80         bool stopped = false;
81         enum MAXEVENTS = 512;
82 
83         int  kqueue_fd = -1;  // interface to kernel
84         int  in_index;
85         int  ready;
86 
87         timespec    ts;
88 
89         kevent_t[MAXEVENTS]     in_events;
90         kevent_t[MAXEVENTS]     out_events;
91 
92         RedBlackTree!Timer      timers;    // this is timers contaiers
93         Timer[]                 overdue;   // timers added with expiration in past placed here
94 
95         Signal[][int]           signals;   // this is signals container
96 
97         FileEventHandler[]      fileHandlers;
98 
99 //        CircBuff!NotificationDelivery
100 //                                notificationsQueue;
101 
102         //HandlerDelegate[]       userEventHandlers;
103     }
104     void initialize() @trusted nothrow {
105         if ( kqueue_fd == -1) {
106             kqueue_fd = kqueue();
107         }
108         debug try{tracef("kqueue_fd=%d", kqueue_fd);}catch(Exception e){}
109         timers = new RedBlackTree!Timer();
110         fileHandlers = Mallocator.instance.makeArray!FileEventHandler(16*1024);
111         GC.addRange(fileHandlers.ptr, fileHandlers.length*FileEventHandler.sizeof);
112     }
113     void deinit() @trusted {
114         debug tracef("deinit");
115         if ( kqueue_fd != -1 )
116         {
117             close(kqueue_fd);
118             kqueue_fd = -1;
119         }
120         in_index = 0;
121         timers = null;
122         GC.removeRange(&fileHandlers[0]);
123         Mallocator.instance.dispose(fileHandlers);
124         //Mallocator.instance.dispose(userEventHandlers);
125     }
126     int get_kernel_id() pure @safe nothrow @nogc {
127         return kqueue_fd;
128     }
129     void stop() @safe pure {
130         debug trace("mark eventloop as stopped");
131         stopped = true;
132     }
133 
134     timespec _calculate_timespec(SysTime deadline) @safe {
135         timespec ts;
136         Duration delta = deadline - Clock.currTime;
137         delta = max(delta, 0.seconds);
138         debug tracef("delta = %s", delta);
139         auto ds = delta.split!("seconds", "nsecs");
140         ts.tv_sec = cast(typeof(timespec.tv_sec))ds.seconds;
141         ts.tv_nsec = cast(typeof(timespec.tv_nsec))ds.nsecs;
142         return ts;
143     }
144 
145     void run(Duration d) @safe {
146 
147         immutable bool runInfinitely = (d == Duration.max);
148         SysTime     deadline;
149         timespec*   wait;
150 
151         if ( !runInfinitely ) {
152             deadline = Clock.currTime + d;
153         }
154 
155         debug tracef("evl run for %s", d);
156 
157         scope(exit) {
158             stopped = false;
159         }
160 
161         while(!stopped) {
162             //
163             // handle user events(notifications)
164             //
165             //auto counter = notificationsQueue.Size * 10;
166             //while(!notificationsQueue.empty){
167             //    auto nd = notificationsQueue.get();
168             //    Notification n = nd._n;
169             //    Broadcast b = nd._broadcast;
170              //   n.handler(b);
171             //    counter--;
172             //    enforce(counter > 0, "Can't clear notificatioinsQueue");
173            // }
174             //
175             // handle overdue timers
176             //
177             while (overdue.length > 0) {
178                 // execute timers which user requested with negative delay
179                 Timer t = overdue[0];
180                 overdue = overdue[1..$];
181                 debug tracef("execute overdue %s", t);
182                 HandlerDelegate h = t._handler;
183                 try {
184                     h(AppEvent.TMO);
185                 } catch (Exception e) {
186                     errorf("Uncaught exception: %s", e.msg);
187                 }
188             }
189             if (stopped) {
190                 break;
191             } 
192             ts = _calculate_timespec(deadline);
193 
194             wait = runInfinitely ?
195                       null
196                     : &ts;
197 
198             debug tracef("waiting for %s", wait is null?"forever":"%s".format(*wait));
199             debug tracef("waiting events %s", in_events[0..in_index]);
200             ready = s_kevent(kqueue_fd,
201                                 cast(kevent_t*)&in_events[0], in_index,
202                                 cast(kevent_t*)&out_events[0], MAXEVENTS,
203                                 wait);
204             in_index = 0;
205             debug tracef("kevent returned %d events", ready);
206             debug tracef("");
207 
208 
209             if ( ready < 0 ) {
210                 error("kevent returned error %s".format(s_strerror(errno)));
211             }
212             enforce(ready >= 0);
213             if ( ready == 0 ) {
214                 debug trace("kevent timedout and no events to process");
215                 return;
216             }
217             //
218             // handle kernel events
219             //
220             foreach(i; 0..ready) {
221                 if ( stopped ) {
222                     break;
223                 }
224                 auto e = out_events[i];
225                 debug tracef("got kevent[%d] %s, data: %d, udata: %0x", i, e, e.data, e.udata);
226 
227                 switch (e.filter) {
228                     case EVFILT_READ:
229                         debug tracef("Read on fd %d", e.ident);
230                         AppEvent ae = AppEvent.IN;
231                         if ( e.flags & EV_ERROR) {
232                             ae |= AppEvent.ERR;
233                         }
234                         if ( e.flags & EV_EOF) {
235                             ae |= AppEvent.HUP;
236                         }
237                         int fd = cast(int)e.ident;
238                         fileHandlers[fd].eventHandler(cast(int)e.ident, ae);
239                         continue;
240                     case EVFILT_WRITE:
241                         debug tracef("Write on fd %d", e.ident);
242                         AppEvent ae = AppEvent.OUT;
243                         if ( e.flags & EV_ERROR) {
244                             ae |= AppEvent.ERR;
245                         }
246                         if ( e.flags & EV_EOF) {
247                             ae |= AppEvent.HUP;
248                         }
249                         int fd = cast(int)e.ident;
250                         fileHandlers[fd].eventHandler(cast(int)e.ident, ae);
251                         continue;
252                     case EVFILT_TIMER:
253                         /*
254                          * Invariants for timers
255                          * ---------------------
256                          * timer list must not be empty at event.
257                          * we have to receive event only on the earliest timer in list
258                         */
259                         assert(!timers.empty, "timers empty on timer event: %s".format(out_events[0..ready]));
260                         enforce((e.flags & EV_ERROR) == 0, "Timer errno: %d".format(e.data));
261 
262                         if ( udataToTimer(e.udata) !is timers.front) {
263                             errorf("timer event: %s != timers.front: %s", udataToTimer(e.udata), timers.front);
264                             //errorf("timers=%s", to!string(timers));
265                             errorf("events=%s", out_events[0..ready]);
266                             assert(0);
267                         }
268                         /* */
269 
270                         auto now = Clock.currTime;
271 
272                         do {
273                             debug tracef("processing %s, lag: %s", timers.front, Clock.currTime - timers.front._expires);
274                             Timer t = timers.front;
275                             HandlerDelegate h = t._handler;
276                             try {
277                                 h(AppEvent.TMO);
278                             } catch (Exception e) {
279                                 errorf("Uncaught exception: %s", e.msg);
280                             }
281                             // timer event handler can try to stop exactly this timer,
282                             // so when we returned from handler we can have different front
283                             // and we do not have to remove it.
284                             if ( !timers.empty && timers.front is t ) {
285                                 timers.removeFront;
286                             }
287                             now = Clock.currTime;
288                         } while (!timers.empty && timers.front._expires <= now );
289 
290                         if ( ! timers.empty ) {
291                             Duration kernel_delta = timers.front._expires - now;
292                             assert(kernel_delta > 0.seconds);
293                             _mod_kernel_timer(timers.front, kernel_delta);
294                         } else {
295                             // kqueue do not require deletion here
296                         }
297 
298                         continue;
299                     case EVFILT_SIGNAL:
300                         assert(signals.length != 0);
301                         auto signum = cast(int)e.ident;
302                         debug tracef("received signal %d", signum);
303                         assert(signals[signum].length > 0);
304                         foreach(s; signals[signum]) {
305                             debug tracef("processing signal handler %s", s);
306                             try {
307                                 SigHandlerDelegate h = s._handler;
308                                 h(signum);
309                             } catch (Exception e) {
310                                 errorf("Uncaught exception: %s", e.msg);
311                             }
312                         }
313                         continue;
314                     case EVFILT_USER:
315                         handle_user_event(e);
316                         continue;
317                     default:
318                         break;
319                 }
320             }
321         }
322     }
323 
324     void start_timer(Timer t) @trusted {
325         debug tracef("insert timer %s - %X", t, cast(void*)t);
326         if ( timers.empty || t < timers.front ) {
327             auto d = t._expires - Clock.currTime;
328             d = max(d, 0.seconds);
329             if ( d == 0.seconds ) {
330                 overdue ~= t;
331                 return;
332             }
333             if ( timers.empty ) {
334                 _add_kernel_timer(t, d);
335             } else {
336                 _mod_kernel_timer(t, d);
337             }
338         }
339         timers.insert(t);
340     }
341 
342     bool timer_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc {
343         foreach(ref o; out_events[0..ready]) {
344             if ( o.ident == e.ident && o.filter == e.filter && o.udata == e.udata ) {
345                 o.ident = 0;
346                 o.filter = 0;
347                 o.udata = null;
348                 return true;
349             }
350         }
351         return false;
352     }
353 
354     void stop_timer(Timer t) @trusted {
355 
356         assert(!timers.empty, "You are trying to remove timer %s, but timer list is empty".format(t));
357 
358         debug tracef("timers: %s", timers);
359         if ( t != timers.front ) {
360             debug tracef("remove non-front %s", t);
361             auto r = timers.equalRange(t);
362             timers.remove(r);
363             return;
364         }
365 
366         kevent_t e;
367         e.ident = 0;
368         e.filter = EVFILT_TIMER;
369         e.udata = cast(void*)t;
370         auto cleared = timer_cleared_from_out_events(e);
371 
372         timers.removeFront();
373         if ( timers.empty ) {
374             if ( cleared ) {
375                 debug tracef("return because it is cleared");
376                 return;
377             }
378             debug tracef("we have to del this timer from kernel");
379             _del_kernel_timer();
380             return;
381         }
382         debug tracef("we have to set timer to next: %s, %s", out_events[0..ready], timers);
383         // we can change kernel timer to next,
384         // If next timer expired - set delta = 0 to run on next loop invocation
385         auto next = timers.front;
386         auto d = next._expires - Clock.currTime;
387         d = max(d, 0.seconds);
388         _mod_kernel_timer(timers.front, d);
389         return;
390     }
391 
392 //    pragma(inline, true)
393 //    void processNotification(Notification ue, Broadcast broadcast) @safe {
394 //        ue.handler();
395 //    }
396 
397 //    void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe {
398 //        debug trace("posting notification");
399 //        if ( !notificationsQueue.full )
400 //        {
401 //            debug trace("put notification");
402 //            notificationsQueue.put(NotificationDelivery(notification, broadcast));
403 //            debug trace("put notification done");
404 //            return;
405 //        }
406 //        // now try to find space for next notification
407 //        auto retries = 10 * notificationsQueue.Size;
408 //        while(notificationsQueue.full && retries > 0)
409 //        {
410 //            retries--;
411 //            auto nd = notificationsQueue.get();
412 //            Notification _n = nd._n;
413 //            Broadcast _b = nd._broadcast;
414 //            processNotification(_n, _b);
415 //        }
416 //        enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue");
417 //        notificationsQueue.put(NotificationDelivery(notification, broadcast));
418 //        debug trace("posting notification - done");
419 //    }
420 //
421     void flush() @trusted {
422         if ( in_index == 0 ) {
423             return;
424         }
425         // flush
426         int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null);
427         enforce(rc>=0, "flush: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index]));
428         in_index = 0;
429     }
430 
431     bool fd_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc {
432         foreach(ref o; out_events[0..ready]) {
433             if ( o.ident == e.ident && o.filter == e.filter ) {
434                 o.ident = 0;
435                 o.filter = 0;
436                 return true;
437             }
438         }
439         return false;
440     }
441 
442     void detach(int fd) @safe {
443         fileHandlers[fd] = null;
444     }
445     void start_poll(int fd, AppEvent ev, FileEventHandler h) @safe {
446         assert(fd>=0);
447         immutable filter = appEventToSysEvent(ev);
448         debug tracef("start poll on fd %d for events %s", fd, appeventToString(ev));
449         kevent_t e;
450         e.ident = fd;
451         e.filter = filter;
452         e.flags = EV_ADD;
453         if ( in_index == MAXEVENTS ) {
454             flush();
455         }
456         in_events[in_index++] = e;
457         fileHandlers[fd] = h;
458     }
459     void stop_poll(int fd, AppEvent ev) @safe {
460         assert(fd>=0);
461         immutable filter = appEventToSysEvent(ev);
462         debug tracef("stop poll on fd %d for events %s", fd, appeventToString(ev));
463         kevent_t e;
464         e.ident = fd;
465         e.filter = filter;
466         e.flags = EV_DELETE|EV_DISABLE;
467         fd_cleared_from_out_events(e);
468         if ( in_index == MAXEVENTS ) {
469             flush();
470         }
471         in_events[in_index++] = e;
472         flush();
473     }
474 
475     pragma(inline, true)
476     void handle_user_event(kevent_t e) @safe {
477         import core.thread;
478         debug tracef("Got user event thread.id:%s event.id:%d", Thread.getThis().id(), e.ident);
479         disable_user_event(e);
480         auto h = fileHandlers[e.ident];
481         h.eventHandler(kqueue_fd, AppEvent.USER);
482     }
483 
484     void wait_for_user_event(int event_id, FileEventHandler handler) @safe {
485         debug tracef("start waiting for user_event %s", event_id);
486         fileHandlers[event_id] = handler;
487         kevent_t e;
488         e.ident = event_id;
489         e.filter = EVFILT_USER;
490         e.flags = EV_ADD;
491         if ( in_index == MAXEVENTS ) {
492             flush();
493         }
494         in_events[in_index++] = e;
495     }
496     void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe {
497         debug tracef("start waiting for user_event %s", event_id);
498         fileHandlers[event_id] = null;
499         kevent_t e;
500         e.ident = event_id;
501         e.filter = EVFILT_USER;
502         e.flags = EV_DELETE;
503         if ( in_index == MAXEVENTS ) {
504             flush();
505         }
506         in_events[in_index++] = e;
507     }
508     void disable_user_event(kevent_t e) @safe {
509         e.flags = EV_DISABLE;
510         if ( in_index == MAXEVENTS ) {
511             flush();
512         }
513         in_events[in_index++] = e;
514     }
515     void _add_kernel_timer(in Timer t, in Duration d) @trusted {
516         debug tracef("add kernel timer %s, delta %s", t, d);
517         assert(d >= 0.seconds);
518         intptr_t delay_ms;
519         if ( d < 36500.days)
520         {
521             delay_ms = d.split!"msecs".msecs;
522         }
523         else
524         {
525             // https://github.com/opensource-apple/xnu/blob/master/bsd/kern/kern_event.c#L1188
526             // OSX kerner refuses to set too large timer interwal with errno ERANGE
527             delay_ms = 36500.days.split!"msecs".msecs;
528         }
529         kevent_t e;
530         e.ident = 0;
531         e.filter = EVFILT_TIMER;
532         e.flags = EV_ADD | EV_ONESHOT;
533         e.data = delay_ms;
534         e.udata = cast(void*)t;
535         if ( in_index == MAXEVENTS ) {
536             flush();
537         }
538         in_events[in_index++] = e;
539     }
540 
541     alias _mod_kernel_timer = _add_kernel_timer;
542 
543     void _del_kernel_timer() @safe {
544         debug trace("del kernel timer");
545         kevent_t e;
546         e.ident = 0;
547         e.filter = EVFILT_TIMER;
548         e.flags = EV_DELETE;
549         if ( in_index == MAXEVENTS ) {
550             flush();
551         }
552         in_events[in_index++] = e;
553     }
554 
555     /*
556      * signal functions
557      */
558 
559     void start_signal(Signal s) {
560         debug tracef("start signal %s", s);
561         debug tracef("signals: %s", signals);
562         auto r = s._signum in signals;
563         if ( r is null || r.length == 0 ) {
564             // enable signal only through kevent
565             _add_kernel_signal(s);
566         }
567         signals[s._signum] ~= s;
568     }
569 
570     void stop_signal(Signal s) {
571         debug trace("stop signal");
572         auto r = s._signum in signals;
573         if ( r is null ) {
574             throw new NotFoundException("You tried to stop signal that was not started");
575         }
576         Signal[] new_row;
577         foreach(a; *r) {
578             if (a._id == s._id) {
579                 continue;
580             }
581             new_row ~= a;
582         }
583         if ( new_row.length == 0 ) {
584             *r = null;
585             _del_kernel_signal(s);
586             // reenable old signal behaviour
587         } else {
588             *r = new_row;
589         }
590         debug tracef("new signals %d row %s", s._signum, new_row);
591     }
592 
593     void _add_kernel_signal(in Signal s) {
594         debug tracef("add kernel signal %d, id: %d", s._signum, s._id);
595         signal(s._signum, SIG_IGN);
596 
597         kevent_t e;
598         e.ident = s._signum;
599         e.filter = EVFILT_SIGNAL;
600         e.flags = EV_ADD;
601         if ( in_index == MAXEVENTS ) {
602             // flush
603             int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null);
604             enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index]));
605             in_index = 0;
606         }
607         in_events[in_index++] = e;
608     }
609 
610     void _del_kernel_signal(in Signal s) {
611         debug tracef("del kernel signal %d, id: %d", s._signum, s._id);
612 
613         signal(s._signum, SIG_DFL);
614 
615         kevent_t e;
616         e.ident = s._signum;
617         e.filter = EVFILT_SIGNAL;
618         e.flags = EV_DELETE;
619         if ( in_index == MAXEVENTS ) {
620             // flush
621             int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null);
622             enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index]));
623             in_index = 0;
624         }
625         in_events[in_index++] = e;
626     }
627 }
628 
629 auto appEventToSysEvent(AppEvent ae) {
630     import core.bitop;
631     assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae)));
632     assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events");
633     switch ( ae ) {
634         case AppEvent.IN:
635             return EVFILT_READ;
636         case AppEvent.OUT:
637             return EVFILT_WRITE;
638         case AppEvent.CONN:
639             return EVFILT_READ;
640         default:
641             throw new Exception("You can't wait for event %X".format(ae));
642     }
643 }
644 AppEvent sysEventToAppEvent(short se) {
645     final switch ( se ) {
646         case EVFILT_READ:
647             return AppEvent.IN;
648         case EVFILT_WRITE:
649             return AppEvent.OUT;
650         // default:
651         //     throw new Exception("Unexpected event %d".format(se));
652     }
653 }
654 unittest {
655     import std.exception;
656     import core.exception;
657 
658     assert(appEventToSysEvent(AppEvent.IN)==EVFILT_READ);
659     assert(appEventToSysEvent(AppEvent.OUT)==EVFILT_WRITE);
660     assert(appEventToSysEvent(AppEvent.CONN)==EVFILT_READ);
661     //assertThrown!AssertError(appEventToSysEvent(AppEvent.IN | AppEvent.OUT));
662     assert(sysEventToAppEvent(EVFILT_READ) == AppEvent.IN);
663     assert(sysEventToAppEvent(EVFILT_WRITE) == AppEvent.OUT);
664 }