1 module hio.drivers.epoll;
2 
3 version(linux):
4 
5 import std.datetime;
6 import std.string;
7 import std.container;
8 import std.exception;
9 import std.experimental.logger;
10 import std.typecons;
11 
12 import std.experimental.allocator;
13 import std.experimental.allocator.mallocator;
14 
15 import core.memory: GC;
16 
17 import std.algorithm.comparison: max;
18 import core.stdc.string: strerror;
19 import core.stdc.errno: errno, EAGAIN, EINTR;
20 
21 import core.sys.linux.epoll;
22 import core.sys.linux.timerfd;
23 import core.sys.linux.sys.signalfd;
24 
25 import core.sys.posix.unistd: close, read;
26 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC , timespec;
27 
28 import hio.events;
29 import hio.common;
30 
31 struct NativeEventLoopImpl {
32     immutable bool   native = true;
33     immutable string _name = "epoll";
34     private {
35         bool                    stopped = false;
36         enum                    MAXEVENTS = 1024;
37         int                     epoll_fd = -1;
38         int                     timer_fd = -1;
39         int                     signal_fd = -1;
40         sigset_t                mask;
41 
42         align(1)                epoll_event[MAXEVENTS] events;
43 
44         RedBlackTree!Timer      timers;
45         Timer[]                 overdue;    // timers added with expiration in past
46         Signal[][int]           signals;
47         //FileHandlerFunction[int] fileHandlers;
48         FileEventHandler[]      fileHandlers;
49 
50     }
51     @disable this(this) {}
52 
53     void initialize() @trusted nothrow {
54         if ( epoll_fd == -1 ) {
55             epoll_fd = (() @trusted  => epoll_create(MAXEVENTS))();
56         }
57         if ( timer_fd == -1 ) {
58             timer_fd = (() @trusted => timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK))();
59         }
60         timers = new RedBlackTree!Timer();
61         fileHandlers = Mallocator.instance.makeArray!FileEventHandler(16*1024);
62         GC.addRange(&fileHandlers[0], fileHandlers.length * FileEventHandler.sizeof);
63     }
64     void deinit() @trusted {
65         close(epoll_fd);
66         epoll_fd = -1;
67         close(timer_fd);
68         timer_fd = -1;
69         timers = null;
70         GC.removeRange(&fileHandlers[0]);
71         Mallocator.instance.dispose(fileHandlers);
72     }
73 
74     void stop() @safe {
75         stopped = true;
76     }
77 
78     int _calculate_timeout(SysTime deadline) {
79         Duration delta = deadline - Clock.currTime;
80         delta = max(delta, 0.seconds);
81         return cast(int)delta.total!"msecs";
82     }
83     /**
84     *
85     **/
86     void run(Duration d) {
87 
88         immutable bool runIndefinitely = (d == Duration.max);
89 
90         /**
91          * eventloop will exit when we reach deadline
92          * it is allowed to have d == 0.seconds,
93          * which mean we wil run events once
94         **/
95         SysTime deadline = Clock.currTime + d;
96         debug tracef("evl run %s",runIndefinitely? "indefinitely": "for %s".format(d));
97 
98         scope ( exit )
99         {
100             stopped = false;
101         }
102 
103         while( !stopped ) {
104             debug tracef("event loop iteration");
105 
106             //
107             // handle user events(notifications)
108             //
109             // auto counter = notificationsQueue.Size * 10;
110             // while(!notificationsQueue.empty){
111             //     auto nd = notificationsQueue.get();
112             //     Notification n = nd._n;
113             //     Broadcast b = nd._broadcast;
114             //     n.handler(b);
115             //     counter--;
116             //     enforce(counter > 0, "Can't clear notificatioinsQueue");
117             // }
118             //auto counter = notificationsQueue.Size * 10;
119             //while(!notificationsQueue.empty){
120             //    auto n = notificationsQueue.get();
121             //    n.handler();
122             //    counter--;
123             //    enforce(counter > 0, "Can't clear notificatioinsQueue");
124             //}
125 
126             while (overdue.length > 0) {
127                 // execute timers with requested negative delay
128                 Timer t = overdue[0];
129                 overdue = overdue[1..$];
130                 debug tracef("execute overdue %s", t);
131                 HandlerDelegate h = t._handler;
132                 try {
133                     h(AppEvent.TMO);
134                 } catch (Exception e) {
135                     errorf("Uncaught exception: %s", e);
136                 }
137             }
138             if (stopped) {
139                 break;
140             }
141 
142             int timeout_ms = runIndefinitely ?
143                 -1 :
144                 _calculate_timeout(deadline);
145 
146             int ready = epoll_wait(epoll_fd, &events[0], MAXEVENTS, timeout_ms);
147             debug tracef("got %d events", ready);
148             if ( ready == 0 ) {
149                 debug trace("epoll timedout and no events to process");
150                 return;
151             }
152             if ( ready == -1 && errno == EINTR) {
153                 continue;
154             }
155             if ( ready < 0 ) {
156                 errorf("epoll_wait returned error %s", fromStringz(strerror(errno)));
157             }
158             enforce(ready >= 0);
159             debug tracef("events: %s", events[0..ready]);
160             foreach(i; 0..ready) {
161                 auto e = events[i];
162                 debug tracef("got event %s", e);
163                 int fd = e.data.fd;
164 
165                 if ( fd == timer_fd ) {
166                     // with EPOLLET flag I dont have to read from timerfd, otherwise I have to:
167                     // ubyte[8] v;
168                     // auto tfdr = read(timer_fd, &v[0], 8);
169                     debug tracef("timer event");
170                     auto now = Clock.currTime;
171                     /*
172                      * Invariants for timers
173                      * ---------------------
174                      * timer list must not be empty at event.
175                      * we have to receive event only on the earliest timer in list
176                     **/
177                     assert(!timers.empty, "timers empty on timer event");
178                     assert(timers.front._expires <= now);
179 
180                     do {
181                         debug tracef("processing %s, lag: %s", timers.front, Clock.currTime - timers.front._expires);
182                         Timer t = timers.front;
183                         HandlerDelegate h = t._handler;
184                         timers.removeFront;
185                         if (timers.empty) {
186                             _del_kernel_timer();
187                         }
188                         try {
189                             h(AppEvent.TMO);
190                         } catch (Exception e) {
191                             errorf("Uncaught exception: %s", e);
192                         }
193                         now = Clock.currTime;
194                     } while (!timers.empty && timers.front._expires <= now );
195 
196                     if ( ! timers.empty ) {
197                         Duration kernel_delta = timers.front._expires - now;
198                         assert(kernel_delta > 0.seconds);
199                         _mod_kernel_timer(timers.front, kernel_delta);
200                     } else {
201                         // delete kernel timer so we can add it next time
202                         //_del_kernel_timer();
203                     }
204                     continue;
205                 }
206                 if ( fd == signal_fd ) {
207                     enum siginfo_items = 8;
208                     signalfd_siginfo[siginfo_items] info;
209                     debug trace("got signal");
210                     assert(signal_fd != -1);
211                     while (true) {
212                         auto rc = read(signal_fd, &info, info.sizeof);
213                         if ( rc < 0 && errno == EAGAIN ) {
214                             break;
215                         }
216                         enforce(rc > 0);
217                         auto got_signals = rc / signalfd_siginfo.sizeof;
218                         debug tracef("read info %d, %s", got_signals, info[0..got_signals]);
219                         foreach(si; 0..got_signals) {
220                             auto signum = info[si].ssi_signo;
221                             debug tracef("signum: %d", signum);
222                             foreach(s; signals[signum]) {
223                                 debug tracef("processing signal handler %s", s);
224                                 try {
225                                     SigHandlerDelegate h = s._handler;
226                                     h(signum);
227                                 } catch (Exception e) {
228                                     errorf("Uncaught exception: %s", e);
229                                 }
230                             }
231                         }
232                     }
233                     continue;
234                 }
235                 AppEvent ae;
236                 if ( e.events & EPOLLIN ) {
237                     ae |= AppEvent.IN;
238                 }
239                 if (e.events & EPOLLOUT) {
240                     ae |= AppEvent.OUT;
241                 }
242                 if (e.events & EPOLLERR) {
243                     ae |= AppEvent.ERR;
244                 }
245                 if (e.events & EPOLLHUP) {
246                     ae |= AppEvent.HUP;
247                 }
248                 debug tracef("process event %02x on fd: %s, handler: %s", e.events, e.data.fd, fileHandlers[fd]);
249                 if ( fileHandlers[fd] !is null ) {
250                     try {
251                         fileHandlers[fd].eventHandler(e.data.fd, ae);
252                     }
253                     catch (Exception e) {
254                         errorf("On file handler: %d, %s", fd, e);
255                     }
256                 }
257                 //HandlerDelegate h = cast(HandlerDelegate)e.data.ptr;
258                 //AppEvent appEvent = AppEvent(sysEventToAppEvent(e.events), -1);
259                 //h(appEvent);
260             }
261         }
262     }
263     void start_timer(Timer t) @safe {
264         debug tracef("insert timer %s", t);
265         if ( timers.empty || t < timers.front ) {
266             auto d = t._expires - Clock.currTime;
267             d = max(d, 0.seconds);
268             if ( d == 0.seconds ) {
269                 overdue ~= t;
270                 return;
271             }
272             debug {
273                 tracef("timers: %s", timers);
274             }
275             if ( timers.empty ) {
276                 _add_kernel_timer(t, d);
277             } else {
278                 _mod_kernel_timer(t, d);
279             }
280         }
281         timers.insert(t);
282     }
283 
284     void stop_timer(Timer t) @safe {
285         debug tracef("remove timer %s", t);
286 
287         if ( t !is timers.front ) {
288             debug tracef("Non front timer: %s", timers);
289             auto r = timers.equalRange(t);
290             timers.remove(r);
291             return;
292         }
293 
294         timers.removeFront();
295         debug trace("we have to del this timer from kernel or set to next");
296         if ( !timers.empty ) {
297             // we can change kernel timer to next,
298             // If next timer expired - set delta = 0 to run on next loop invocation
299             debug trace("set up next timer");
300             auto next = timers.front;
301             auto d = next._expires - Clock.currTime;
302             d = max(d, 0.seconds);
303             _mod_kernel_timer(timers.front, d);
304             return;
305         }
306         debug trace("remove last timer");
307         _del_kernel_timer();
308     }
309 
310     void _add_kernel_timer(Timer t, in Duration d) @trusted {
311         debug trace("add kernel timer");
312         assert(d > 0.seconds);
313         itimerspec itimer;
314         auto ds = d.split!("seconds", "nsecs");
315         itimer.it_value.tv_sec = cast(typeof(itimer.it_value.tv_sec)) ds.seconds;
316         itimer.it_value.tv_nsec = cast(typeof(itimer.it_value.tv_nsec)) ds.nsecs;
317         int rc = timerfd_settime(timer_fd, 0, &itimer, null);
318         enforce(rc >= 0, "timerfd_settime(%s): %s".format(itimer, fromStringz(strerror(errno))));
319         epoll_event e;
320         e.events = EPOLLIN|EPOLLET;
321         e.data.fd = timer_fd;
322         rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &e);
323         enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, fromStringz(strerror(errno))));
324     }
325     void _mod_kernel_timer(Timer t, in Duration d) @trusted {
326         debug tracef("mod kernel timer to %s", t);
327         assert(d >= 0.seconds, "Illegal timer %s".format(d));
328         itimerspec itimer;
329         auto ds = d.split!("seconds", "nsecs");
330         itimer.it_value.tv_sec = cast(typeof(itimer.it_value.tv_sec)) ds.seconds;
331         itimer.it_value.tv_nsec = cast(typeof(itimer.it_value.tv_nsec)) ds.nsecs;
332         int rc = timerfd_settime(timer_fd, 0, &itimer, null);
333         enforce(rc >= 0, "timerfd_settime(%s): %s".format(itimer, fromStringz(strerror(errno))));
334         epoll_event e;
335         e.events = EPOLLIN|EPOLLET;
336         e.data.fd = timer_fd;
337         rc = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, timer_fd, &e);
338         enforce(rc >= 0);
339     }
340     void _del_kernel_timer() @trusted {
341         debug trace("del kernel timer");
342         epoll_event e;
343         e.events = EPOLLIN;
344         e.data.fd = timer_fd;
345         int rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, timer_fd, &e);
346         enforce(rc >= 0, "epoll_ctl del(%s): %s".format(e, fromStringz(strerror(errno))));
347     }
348     //
349     // notifications
350     //
351     // pragma(inline)
352     // void processNotification(Notification ue, Broadcast broadcast) @safe {
353     //     ue.handler(broadcast);
354     // }
355     // void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe {
356     //     debug trace("posting notification");
357     //     if ( !notificationsQueue.full )
358     //     {
359     //         debug trace("put notification");
360     //         notificationsQueue.put(NotificationDelivery(notification, broadcast));
361     //         debug trace("put notification done");
362     //         return;
363     //     }
364     //     // now try to find space for next notification
365     //     auto retries = 10 * notificationsQueue.Size;
366     //     while(notificationsQueue.full && retries > 0)
367     //     {
368     //         retries--;
369     //         auto nd = notificationsQueue.get();
370     //         Notification _n = nd._n;
371     //         Broadcast _b = nd._broadcast;
372     //         processNotification(_n, _b);
373     //     }
374     //     enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue");
375     //     notificationsQueue.put(NotificationDelivery(notification, broadcast));
376     //     debug trace("posting notification - done");
377     // }
378 
379 
380     //void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe {
381     //    debug trace("posting notification");
382     //    if ( !notificationsQueue.full )
383     //    {
384     //        notificationsQueue.put(NotificationDelivery(notification, broadcast));
385     //        return;
386     //    }
387     //    // now try to find space for next notification
388     //    auto retries = 10 * notificationsQueue.Size;
389     //    while(notificationsQueue.full && retries > 0)
390     //    {
391     //        retries--;
392     //        auto nd = notificationsQueue.get();
393     //        Notification _n = nd._n;
394     //        Broadcast _b = nd._broadcast;
395     //        processNotification(_n, _b);
396     //    }
397     //    enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue");
398     //    notificationsQueue.put(NotificationDelivery(notification, broadcast));
399     //}
400 
401     //
402     // signals
403     //
404     void start_signal(Signal s) {
405         debug tracef("start signal %s", s);
406         debug tracef("signals: %s", signals);
407         auto r = s._signum in signals;
408         if ( r is null || r.length == 0 ) {
409             // enable signal only through kevent
410             _add_kernel_signal(s);
411         }
412         signals[s._signum] ~= s;
413     }
414     void stop_signal(Signal s) {
415         debug trace("stop signal");
416         auto r = s._signum in signals;
417         if ( r is null ) {
418             throw new NotFoundException("You tried to stop signal that was not started");
419         }
420         Signal[] new_row;
421         foreach(a; *r) {
422             if (a._id == s._id) {
423                 continue;
424             }
425             new_row ~= a;
426         }
427         if ( new_row.length == 0 ) {
428             *r = null;
429             _del_kernel_signal(s);
430             // reenable old signal behaviour
431         } else {
432             *r = new_row;
433         }
434         debug tracef("new signals %d row %s", s._signum, new_row);
435     }
436     void _add_kernel_signal(Signal s) {
437         debug tracef("add kernel signal %d, id: %d", s._signum, s._id);
438         sigset_t m;
439         sigemptyset(&m);
440         sigaddset(&m, s._signum);
441         pthread_sigmask(SIG_BLOCK, &m, null);
442 
443         sigaddset(&mask, s._signum);
444         if ( signal_fd == -1 ) {
445             signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
446             debug tracef("signalfd %d", signal_fd);
447             epoll_event e;
448             e.events = EPOLLIN|EPOLLET;
449             e.data.fd = signal_fd;
450             auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &e);
451             enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, fromStringz(strerror(errno))));
452         } else {
453             signalfd(signal_fd, &mask, 0);
454         }
455 
456     }
457     void _del_kernel_signal(Signal s) {
458         debug tracef("del kernel signal %d, id: %d", s._signum, s._id);
459         sigset_t m;
460         sigemptyset(&m);
461         sigaddset(&m, s._signum);
462         pthread_sigmask(SIG_UNBLOCK, &m, null);
463         sigdelset(&mask, s._signum);
464         assert(signal_fd != -1);
465         signalfd(signal_fd, &mask, 0);
466     }
467     void wait_for_user_event(int event_id, FileEventHandler handler) @safe {
468         epoll_event e;
469         e.events = EPOLLIN;
470         e.data.fd = event_id;
471         auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_id, &e))();
472         enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, s_strerror(errno)));
473         fileHandlers[event_id] = handler;
474     }
475     void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe {
476         epoll_event e;
477         e.events = EPOLLIN;
478         e.data.fd = event_id;
479         auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_DEL, event_id, &e))();
480         fileHandlers[event_id] = null;
481     }
482 
483     int get_kernel_id() pure @safe nothrow @nogc {
484         return epoll_fd;
485     }
486 
487     //
488     // files/sockets
489     //
490     void detach(int fd) @safe {
491         fileHandlers[fd] = null;
492     }
493     void start_poll(int fd, AppEvent ev, FileEventHandler f) @trusted {
494         epoll_event e;
495         e.events = appEventToSysEvent(ev);
496         e.data.fd = fd;
497         auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &e);
498         enforce(rc >= 0, "epoll_ctl add(%d, %s): %s".format(fd, e, fromStringz(strerror(errno))));
499         fileHandlers[fd] = f;
500     }
501 
502     void stop_poll(int fd, AppEvent ev) @trusted {
503         epoll_event e;
504         e.events = appEventToSysEvent(ev);
505         e.data.fd = fd;
506         auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &e);
507     }
508     auto appEventToSysEvent(AppEvent ae) pure @safe {
509         import core.bitop;
510         assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae)));
511         assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events");
512         switch ( ae ) {
513             case AppEvent.IN:
514                 return EPOLLIN;
515             case AppEvent.OUT:
516                 return EPOLLOUT;
517             //case AppEvent.CONN:
518             //    return EVFILT_READ;
519             default:
520                 throw new Exception("You can't wait for event %X".format(ae));
521         }
522     }
523 }
524