1 module hio.drivers.epoll;
2 
3 version(linux):
4 
5 import std.datetime;
6 import std.string;
7 import std.container;
8 import std.exception;
9 import std.experimental.logger;
10 import std.typecons;
11 import std.traits;
12 import std.algorithm: min, max;
13 
14 import std.experimental.allocator;
15 import std.experimental.allocator.mallocator;
16 
17 import core.memory: GC;
18 
19 import std.algorithm.comparison: max;
20 import core.stdc.string: strerror;
21 import core.stdc.errno: errno, EAGAIN, EINTR;
22 import core.stdc.stdio: printf;
23 
24 import core.sys.linux.epoll;
25 import core.sys.linux.timerfd;
26 import core.sys.linux.sys.signalfd;
27 
28 import core.sys.posix.unistd: close, read;
29 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC , timespec;
30 
31 import core.memory;
32 
33 import timingwheels;
34 
35 import hio.events;
36 import hio.common;
37 
38 private enum InExpTimersSize = 16;
39 private alias TW = TimingWheels!Timer;
40 private alias TWAdvanceResult = ReturnType!(TW.advance!(TW));
41 
42 struct NativeEventLoopImpl {
43     immutable bool   native = true;
44     immutable string _name = "epoll";
45     private {
46         bool                    stopped = false;
47         bool                    inshutdown = false;
48         enum                    MAXEVENTS = 1024;
49         enum                    TOTAL_FILE_HANDLERS = 16*1024;
50         int                     epoll_fd = -1;
51         int                     signal_fd = -1;
52         sigset_t                mask;
53 
54         align(1)                epoll_event[MAXEVENTS] events;
55 
56         Duration                tick = 5.msecs;
57         TW                      timingwheels;
58 
59         TWAdvanceResult         advancedTimersHash;
60         long                    advancedTimersHashLength;
61 
62         Timer[InExpTimersSize]  advancedTimersArray;
63         long                    advancedTimersArrayLength;
64 
65         Timer[]                 overdue;    // timers added with expiration in past
66         Signal[][int]           signals;
67         FileEventHandler[]      fileHandlers;
68 
69     }
70     @disable this(this) {}
71     void initialize() @trusted nothrow {
72         if ( epoll_fd == -1 ) {
73             epoll_fd = (() @trusted  => epoll_create(MAXEVENTS))();
74         }
75         fileHandlers = Mallocator.instance.makeArray!FileEventHandler(TOTAL_FILE_HANDLERS);
76         GC.addRange(&fileHandlers[0], fileHandlers.length * FileEventHandler.sizeof);
77         timingwheels.init();
78     }
79     void deinit() @trusted {
80         if (epoll_fd>=0)
81         {
82             close(epoll_fd);
83             epoll_fd = -1;
84         }
85         if (signal_fd>=0)
86         {
87             close(signal_fd);
88             signal_fd = -1;
89         }
90         //precise_timers = null;
91         if (fileHandlers !is null)
92         {
93             GC.removeRange(&fileHandlers[0]);
94             Mallocator.instance.dispose(fileHandlers);
95             fileHandlers = null;
96         }
97         timingwheels = TimingWheels!(Timer)();
98         timingwheels.init();
99         advancedTimersHash = TWAdvanceResult.init;
100     }
101 
102     void stop() @safe {
103         stopped = true;
104     }
105 
106     int _calculate_timeout(SysTime deadline) {
107         auto now_real = Clock.currTime;
108         Duration delta = deadline - now_real;
109         debug(hioepoll) safe_tracef("deadline - now_real: %s", delta);
110         auto nextTWtimer = timingwheels.timeUntilNextEvent(tick, now_real.stdTime);
111         debug(hioepoll) safe_tracef("nextTWtimer: %s", nextTWtimer);
112         delta = min(delta, nextTWtimer);
113 
114         delta = max(delta, 0.seconds);
115         return cast(int)delta.total!"msecs";
116     }
117     private void execute_overdue_timers()
118     {
119         debug(hioepoll) tracef("calling overdue timers");
120         while (overdue.length > 0)
121         {
122             // execute timers which user requested with negative delay
123             Timer t = overdue[0];
124             overdue = overdue[1..$];
125             if ( t is null)
126             {
127                 // timer was removed
128                 continue;
129             }
130             debug(hioepoll) tracef("execute overdue %s", t);
131             assert(t._armed);
132             t._armed = false;
133             HandlerDelegate h = t._handler;
134             try {
135                 if (inshutdown)
136                 {
137                     h(AppEvent.SHUTDOWN);
138                 }
139                 else
140                 {
141                     h(AppEvent.TMO);
142                 }
143             } catch (Exception e) {
144                 errorf("Uncaught exception: %s", e);
145             }
146         }
147     }
148 
149     void shutdown() @safe
150     in(!stopped && !inshutdown)
151     {
152         // scan through all timers
153         auto a = timingwheels.allTimers();
154         foreach(t; a.timers)
155         {
156             try
157             {
158                 debug(hioepoll) tracef("send shutdown to timer %s", t);
159                 t._handler(AppEvent.SHUTDOWN);
160             }
161             catch(Exception e)
162             {
163                 errorf("Error on event SHUTDOWN in %s", t);
164             }
165         }
166         // scan through active file descriptors
167         // and send SHUTDOWN event
168         if (fileHandlers is null)
169         {
170             return;
171         }
172         for(int i=0;i<TOTAL_FILE_HANDLERS;i++)
173         {
174             if (fileHandlers[i] is null)
175             {
176                 continue;
177             }
178             try
179             {
180                 debug(hioepoll) tracef("send shutdown to filehandler %s", fileHandlers[i]);
181                 fileHandlers[i].eventHandler(i, AppEvent.SHUTDOWN);
182             }
183             catch(Exception e)
184             {
185                 () @trusted {printf("exception: %s:%d: %s", __FILE__.ptr, __LINE__, toStringz(e.toString));}();
186             }
187         }
188         inshutdown = true;
189     }
190     /**
191     *
192     **/
193     void run(Duration d) {
194 
195         immutable bool runInfinitely = (d == Duration.max);
196 
197         /**
198          * eventloop will exit when we reach deadline
199          * it is allowed to have d == 0.seconds,
200          * which mean we wil run events once
201         **/
202         SysTime deadline = Clock.currTime + d;
203         debug(hioepoll) tracef("evl run %s",runInfinitely? "infinitely": "for %s".format(d));
204         scope ( exit )
205         {
206             stopped = false;
207         }
208 
209         while( !stopped ) {
210             debug(hioepoll) tracef("event loop iteration");
211 
212             if (stopped) {
213                 break;
214             }
215 
216             int timeout_ms = _calculate_timeout(deadline);
217 
218             debug(hioepoll) safe_tracef("wait in poll for %s.ms", timeout_ms);
219 
220             int ready = epoll_wait(epoll_fd, &events[0], MAXEVENTS, timeout_ms);
221 
222             debug(hioepoll) tracef("got %d events", ready);
223 
224             SysTime now_real = Clock.currTime;
225 
226             if ( ready == -1 && errno == EINTR) {
227                 continue;
228             }
229             if ( ready < 0 ) {
230                 errorf("epoll_wait returned error %s", fromStringz(strerror(errno)));
231                 // throw new Exception("epoll errno");
232             }
233 
234             debug(hioepoll) tracef("events: %s", events[0..ready]);
235             foreach(i; 0..ready) {
236                 auto e = events[i];
237                 //debug printf("epoll wait: fd:%d: 0x%0x\n", e.data.fd, e.events);
238                 debug(hioepoll) tracef("got event %s", e);
239                 int fd = e.data.fd;
240 
241                 if ( fd == signal_fd ) {
242                     enum siginfo_items = 8;
243                     signalfd_siginfo[siginfo_items] info;
244                     debug(hioepoll) trace("got signal");
245                     assert(signal_fd != -1);
246                     while (true) {
247                         auto rc = read(signal_fd, &info, info.sizeof);
248                         if ( rc < 0 && errno == EAGAIN ) {
249                             break;
250                         }
251                         enforce(rc > 0);
252                         auto got_signals = rc / signalfd_siginfo.sizeof;
253                         debug(hioepoll) tracef("read info %d, %s", got_signals, info[0..got_signals]);
254                         foreach(si; 0..got_signals) {
255                             auto signum = info[si].ssi_signo;
256                             debug(hioepoll) tracef("signum: %d", signum);
257                             foreach(s; signals[signum]) {
258                                 debug(hioepoll) tracef("processing signal handler %s", s);
259                                 try {
260                                     SigHandlerDelegate h = s._handler;
261                                     h(signum);
262                                 } catch (Exception e) {
263                                     errorf("Uncaught exception: %s", e);
264                                 }
265                             }
266                         }
267                     }
268                     continue;
269                 }
270                 AppEvent ae;
271                 if ( e.events & EPOLLIN ) {
272                     ae |= AppEvent.IN;
273                 }
274                 if (e.events & EPOLLOUT) {
275                     ae |= AppEvent.OUT;
276                 }
277                 if (e.events & EPOLLERR) {
278                     ae |= AppEvent.ERR;
279                 }
280                 if (e.events & EPOLLHUP) {
281                     ae |= AppEvent.HUP;
282                 }
283                 debug(hioepoll) tracef("process event %02x on fd: %s, handler: %s", e.events, e.data.fd, fileHandlers[fd]);
284                 if ( fileHandlers[fd] !is null ) {
285                     try {
286                         fileHandlers[fd].eventHandler(e.data.fd, ae);
287                     }
288                     catch (Exception e) {
289                         errorf("On file handler: %d, %s", fd, e);
290                         throw e;
291                     }
292                 }
293             }
294             auto toCatchUp = timingwheels.ticksToCatchUp(tick, now_real.stdTime);
295             if(toCatchUp>0)
296             {
297                 /*
298                 ** Some timers expired.
299                 ** --------------------
300                 ** Most of the time the number of this timers is  low, so  we optimize for
301                 ** this case: copy this small number of timers into small array, then ite-
302                 ** rate over this array.
303                 **
304                 ** Another case - number of expired timers > InExpTimersSize, so  we can't
305                 ** copy timers to this array. Then we just iterate over the result.
306                 **
307                 ** Why  do we need random access to any expired timer (so we have  to save 
308                 ** it in array or in map)  - because any timer handler may wish  to cancel
309                 ** another timer (and this another timer can also be in 'advance' result).
310                 ** Example - expired timer wakes up some task which cancel socket io timer.
311                 */
312                 advancedTimersHash = timingwheels.advance(toCatchUp);
313                 if (advancedTimersHash.length < InExpTimersSize)
314                 {
315                     debug(hioepoll) tracef("calling timers");
316                     //
317                     // this case happens most of the time - low number of timers per tick
318                     // save expired timers into small array.
319                     //
320                     int j = 0;
321                     foreach(t; advancedTimersHash.timers)
322                     {
323                         advancedTimersArray[j++] = t;
324                     }
325                     advancedTimersArrayLength = j;
326                     for(j=0;j < advancedTimersArrayLength; j++)
327                     {
328                         Timer t = advancedTimersArray[j];
329                         if ( t is null )
330                         {
331                             continue;
332                         }
333                         HandlerDelegate h = t._handler;
334                         assert(t._armed);
335                         t._armed = false;
336                         try {
337                             h(AppEvent.TMO);
338                         } catch (Exception e) {
339                             errorf("Uncaught exception: %s", e);
340                         }
341                     }
342                 }
343                 else
344                 {
345                     debug(hioepoll) tracef("calling timers");
346                     advancedTimersHashLength = advancedTimersHash.length;
347                     foreach (t; advancedTimersHash.timers)
348                     {
349                         HandlerDelegate h = t._handler;
350                         assert(t._armed);
351                         t._armed = false;
352                         try {
353                             h(AppEvent.TMO);
354                         } catch (Exception e) {
355                             errorf("Uncaught exception: %s", e);
356                         }
357                     }
358                 }
359                 advancedTimersArrayLength = 0;
360                 advancedTimersHashLength = 0;
361             }
362             execute_overdue_timers();
363             if (!runInfinitely && now_real >= deadline)
364             {
365                 debug(hioepoll) safe_tracef("reached deadline, return");
366                 return;
367             }
368         }
369     }
370     void start_timer(Timer t) @safe {
371         debug(hioepoll) tracef("insert timer: %s", t);
372         if ( inshutdown)
373         {
374             throw new LoopShutdownException("starting timer");
375         }
376         assert(!t._armed);
377         t._armed = true;
378         auto now = Clock.currTime;
379         auto d = t._expires - now;
380         d = max(d, 0.seconds);
381         if ( d < tick ) {
382             overdue ~= t;
383             debug(hioepoll) tracef("inserted overdue timer: %s", t);
384             return;
385         }
386         ulong twNow = timingwheels.currStdTime(tick);
387         Duration twdelay = (now.stdTime - twNow).hnsecs;
388         debug(hioepoll) safe_tracef("tw delay: %s", (now.stdTime - twNow).hnsecs);
389         timingwheels.schedule(t, (d + twdelay)/tick);
390     }
391 
392     void stop_timer(Timer t) @safe {
393         debug(hioepoll) tracef("remove timer %s", t);
394         if ( advancedTimersArrayLength > 0)
395         {
396             for(int j=0; j<advancedTimersArrayLength;j++)
397             {
398                 if ( t is advancedTimersArray[j])
399                 {
400                     advancedTimersArray[j] = null;
401                     return;
402                 }
403             }
404         }
405         else if (advancedTimersHashLength>0 && advancedTimersHash.contains(t.id))
406         {
407             advancedTimersHash.remove(t.id);
408             return;
409         }
410         else if (overdue.length > 0)
411         {
412             for(int i=0;i<overdue.length;i++)
413             {
414                 if (overdue[i] is t)
415                 {
416                     overdue[i] = null;
417                     debug(hioepoll) tracef("remove timer from overdue %s", t);
418                 }
419             }
420         }
421 
422         // static destructors can try to stop timers after loop deinit, so we check totalTimers
423         if (timingwheels.totalTimers() > 0)
424         {
425             timingwheels.cancel(t);
426         }
427     }
428 
429     //
430     // signals
431     //
432     void start_signal(Signal s) {
433         debug(hioepoll) tracef("start signal %s", s);
434         debug(hioepoll) tracef("signals: %s", signals);
435         auto r = s._signum in signals;
436         if ( r is null || r.length == 0 ) {
437             // enable signal only through kevent
438             _add_kernel_signal(s);
439         }
440         signals[s._signum] ~= s;
441     }
442     void stop_signal(Signal s) {
443         debug(hioepoll) trace("stop signal");
444         auto r = s._signum in signals;
445         if ( r is null ) {
446             throw new NotFoundException("You tried to stop signal that was not started");
447         }
448         Signal[] new_row;
449         foreach(a; *r) {
450             if (a._id == s._id) {
451                 continue;
452             }
453             new_row ~= a;
454         }
455         if ( new_row.length == 0 ) {
456             *r = null;
457             _del_kernel_signal(s);
458             // reenable old signal behaviour
459         } else {
460             *r = new_row;
461         }
462         debug(hioepoll) tracef("new signals %d row %s", s._signum, new_row);
463     }
464     void _add_kernel_signal(Signal s) {
465         debug(hioepoll) tracef("add kernel signal %d, id: %d", s._signum, s._id);
466         sigset_t m;
467         sigemptyset(&m);
468         sigaddset(&m, s._signum);
469         pthread_sigmask(SIG_BLOCK, &m, null);
470 
471         sigaddset(&mask, s._signum);
472         if ( signal_fd == -1 ) {
473             signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
474             debug(hioepoll) tracef("signalfd %d", signal_fd);
475             epoll_event e;
476             e.events = EPOLLIN|EPOLLET;
477             e.data.fd = signal_fd;
478             auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &e);
479             enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, fromStringz(strerror(errno))));
480         } else {
481             signalfd(signal_fd, &mask, 0);
482         }
483 
484     }
485     void _del_kernel_signal(Signal s) {
486         debug(hioepoll) tracef("del kernel signal %d, id: %d", s._signum, s._id);
487         sigset_t m;
488         sigemptyset(&m);
489         sigaddset(&m, s._signum);
490         pthread_sigmask(SIG_UNBLOCK, &m, null);
491         sigdelset(&mask, s._signum);
492         assert(signal_fd != -1);
493         signalfd(signal_fd, &mask, 0);
494     }
495     void wait_for_user_event(int event_id, FileEventHandler handler) @safe {
496         epoll_event e;
497         e.events = EPOLLIN;
498         e.data.fd = event_id;
499         auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_id, &e))();
500         enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, s_strerror(errno)));
501         fileHandlers[event_id] = handler;
502     }
503     void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe {
504         epoll_event e;
505         e.events = EPOLLIN;
506         e.data.fd = event_id;
507         auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_DEL, event_id, &e))();
508         fileHandlers[event_id] = null;
509     }
510 
511     int get_kernel_id() pure @safe nothrow @nogc {
512         return epoll_fd;
513     }
514 
515     //
516     // files/sockets
517     //
518     void detach(int fd) @safe {
519         debug(hioepoll) tracef("detaching fd(%d) from fileHandlers[%d]", fd, fileHandlers.length);
520         fileHandlers[fd] = null;
521     }
522     void start_poll(int fd, AppEvent ev, FileEventHandler f) @trusted {
523         assert(epoll_fd != -1);
524         debug(hioepoll) tracef("start poll for %s on fd: %s (inshutdown: %s)", ev, fd, inshutdown);
525         if ( inshutdown )
526         {
527             //throw new LoopShutdownException("start polling");
528             f.eventHandler(fd, AppEvent.SHUTDOWN);
529             return;
530         }
531         epoll_event e;
532         e.events = appEventToSysEvent(ev);
533         if ( ev & AppEvent.EXT_EPOLLEXCLUSIVE )
534         {
535             e.events |= EPOLLEXCLUSIVE;
536         }
537         e.data.fd = fd;
538         //debug printf("epoll ctl: fd:%d: 0x%0x\n", e.data.fd, e.events);
539         auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &e);
540         enforce(rc >= 0, "epoll_ctl add(%d, %s): %s".format(fd, e, fromStringz(strerror(errno))));
541         fileHandlers[fd] = f;
542     }
543 
544     void stop_poll(int fd, AppEvent ev) @trusted {
545         assert(epoll_fd != -1);
546         epoll_event e;
547         e.events = appEventToSysEvent(ev);
548         e.data.fd = fd;
549         auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &e);
550     }
551     auto appEventToSysEvent(AppEvent ae) pure @safe {
552         import core.bitop;
553         // clear EXT_ flags
554         ae &= AppEvent.ALL;
555         assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae)));
556         assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events");
557         switch ( ae ) {
558             case AppEvent.IN:
559                 return EPOLLIN;
560             case AppEvent.OUT:
561                 return EPOLLOUT;
562             //case AppEvent.CONN:
563             //    return EVFILT_READ;
564             default:
565                 throw new Exception("You can't wait for event %X".format(ae));
566         }
567     }
568 }
569