1 module hio.drivers.select;
2 
3 import std.datetime;
4 import std.container;
5 import std.experimental.logger;
6 
7 import std.experimental.allocator;
8 import std.experimental.allocator.mallocator;
9 import std.typecons;
10 
11 import std.string;
12 import std.algorithm.comparison: min, max;
13 import std.exception: enforce;
14 import core.thread;
15 
16 version(Windows) {
17     import core.sys.windows.winsock2;
18 }
19 version(Posix) {
20     import core.sys.posix.sys.select;
21 }
22 
23 import core.stdc.string: strerror;
24 import core.stdc.errno;
25 import core.stdc.signal;
26 
27 import timingwheels;
28 
29 import hio.events;
30 import hio.common;
31 
32 //
33 // TODO add support for multiple select event loops
34 //
35 private enum                            sig_array_length = 256;
36 private static int[sig_array_length]    last_signal;
37 private static int                      last_signal_index;
38 
39 extern(C) void sig_catcher(int signum) nothrow @nogc {
40     last_signal[last_signal_index++] = signum;
41 }
42 
43 private struct FileDescriptor {
44     package {
45         AppEvent            _polling = AppEvent.NONE;
46     }
47     string toString() const @safe {
48         import std.format: format;
49         return appeventToString(_polling);
50         //return "FileDescriptor: filehandle: %d, events: %s".format(_fileno, appeventToString(_polling));
51     }
52 }
53 
54 struct FallbackEventLoopImpl {
55     immutable string _name = "select";
56     immutable numberOfDescriptors = 1024;
57 
58     private {
59         fd_set                  read_fds;
60         fd_set                  write_fds;
61         fd_set                  err_fds;
62 
63         bool                    stopped = false;
64 
65         Signal[][int]           signals;
66 
67         FileDescriptor[numberOfDescriptors]    fileDescriptors;
68         FileEventHandler[]      fileHandlers;
69 
70         Timer[]                 overdue;    // timers added with expiration in past or with cicks==0
71 
72         RedBlackTree!Timer      precise_timers;     // precise timers
73         TimingWheels!Timer      timingwheels;       // timing wheels
74         Duration                tick = 5.msecs;
75         //CircBuff!Notification   notificationsQueue;
76     }
77 
78     @disable this(this) {};
79 
80     void initialize() @safe nothrow {
81         precise_timers = new RedBlackTree!Timer();
82         fileHandlers = new FileEventHandler[](1024);
83         timingwheels.init();
84     }
85     void deinit() @trusted {
86         precise_timers = null;
87         timingwheels = TimingWheels!(Timer)();
88         timingwheels.init();
89     }
90     void stop() @safe {
91         debug(hioselect) safe_tracef("mark eventloop as stopped");
92         stopped = true;
93     }
94 
95     private Duration timeUntilNextTimer()
96     out(r;r>=0.seconds)
97     {
98         Duration result = Duration.max;
99         ulong nowRT = Clock.currStdTime;
100 
101         if ( ! precise_timers.empty )
102         {
103             result = min(result, precise_timers.front._expires - SysTime(nowRT));
104         }
105         auto nextTWtimer = timingwheels.timeUntilNextEvent(tick, nowRT);
106         nextTWtimer = max(nextTWtimer, 0.seconds);
107         debug(hioselect) safe_tracef("prec: %s, timingwheel: %s", result, nextTWtimer);
108         result = min(result, nextTWtimer);
109         return result;
110     }
111     private timeval* _calculate_timeval(Duration d, timeval* tv)
112     {
113         if (d < 0.seconds)
114         {
115             d = 0.seconds;
116         }
117         immutable converted = d.split!("seconds", "usecs");
118         tv.tv_sec  = cast(typeof(tv.tv_sec))converted.seconds;
119         tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs;
120         return tv;
121     }
122     private void execute_overdue_timers()
123     {
124         while (overdue.length > 0)
125         {
126             // execute timers which user requested with negative delay
127             Timer t = overdue[0];
128             overdue = overdue[1..$];
129             debug(hioselect) safe_tracef("execute overdue %s", t);
130             HandlerDelegate h = t._handler;
131             try {
132                 h(AppEvent.TMO);
133             } catch (Exception e) {
134                 errorf("Uncaught exception: %s", e);
135             }
136         }
137     }
138     // /**
139     //  * Find shortest interval between now->deadline, now->earliest timer
140     //  * If deadline expired or timer in past - set zero wait time
141     //  */
142     // timeval* _calculate_timeval(SysTime deadline, timeval* tv) {
143     //     SysTime now = Clock.currTime;
144     //     Duration d = deadline - now;
145     //     if ( ! precise_timers.empty ) {
146     //         d = min(d, precise_timers.front._expires - now);
147     //     }
148     //     d = min(d, timingwheels.timeUntilNextEvent(tick));
149     //     d = max(d, 0.seconds);
150     //     auto converted = d.split!("seconds", "usecs");
151     //     tv.tv_sec  = cast(typeof(tv.tv_sec))converted.seconds;
152     //     tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs;
153     //     return tv;
154     // }
155     // timeval* _calculate_timeval(timeval* tv) {
156     //     SysTime  now = Clock.currTime;
157     //     Duration d;
158     //     d = timers.front._expires - now;
159     //     d = max(d, 0.seconds);
160     //     auto converted = d.split!("seconds", "usecs");
161     //     tv.tv_sec  = cast(typeof(tv.tv_sec))converted.seconds;
162     //     tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs;
163     //     return tv;
164     // }
165     void run(Duration d) {
166 
167         immutable bool runIndefinitely = (d == Duration.max);
168         SysTime now = Clock.currTime;
169         SysTime deadline;
170         timeval tv;
171         timeval* wait;
172 
173         if ( ! runIndefinitely ) {
174             deadline = now + d;
175         }
176 
177         debug(hioselect) safe_tracef("evl run %s",d);
178 
179         scope(exit) {
180             stopped = false;
181         }
182 
183         while (!stopped) {
184 
185             int fdmax = -1;
186 
187             execute_overdue_timers();
188 
189             if (stopped) {
190                 break;
191             }
192 
193             while ( !precise_timers.empty && precise_timers.front._expires <= now) {
194                 debug(hioselect) safe_tracef("processing overdue from precise %s, lag: %s",
195                         precise_timers.front, Clock.currTime - precise_timers.front._expires);
196                 Timer t = precise_timers.front;
197                 HandlerDelegate h = t._handler;
198                 precise_timers.removeFront;
199                 try {
200                     h(AppEvent.TMO);
201                 } catch (Exception e) {
202                     errorf("Uncaught exception: %s", e);
203                 }
204                 now = Clock.currTime;
205             }
206             if (stopped) {
207                 break;
208             }
209 
210             FD_ZERO(&read_fds);
211             FD_ZERO(&write_fds);
212             FD_ZERO(&err_fds);
213 
214             foreach(int fd; 0..numberOfDescriptors) {
215                 AppEvent e = fileDescriptors[fd]._polling;
216                 if ( e == AppEvent.NONE ) {
217                     continue;
218                 }
219                 debug(hioselect) safe_tracef("poll %d for %s", fd, fileDescriptors[fd]);
220                 if ( e & AppEvent.IN ) {
221                     FD_SET(fd, &read_fds);
222                 }
223                 if ( e & AppEvent.OUT ) {
224                     FD_SET(fd, &write_fds);
225                 }
226                 fdmax = max(fdmax, fd);
227             }
228 
229             //
230             // Next limits for wait time:
231             // - deadline (user ask loop run duration d)
232             // - or requested to run indefinitely
233             // - next precise_timer
234             // - next timingwheel timer
235             //
236             immutable untilTimer = timeUntilNextTimer();
237             immutable untilDeadline = runIndefinitely ? Duration.max : deadline - Clock.currTime;
238             immutable nextStop = min(untilTimer, untilDeadline);
239             debug(hioselect) safe_tracef("untilTimer: %s, untilDeadline: %s", untilTimer, untilDeadline);
240             if ( nextStop == Duration.max )
241             {
242                 wait = null;
243             }
244             else
245             {
246                 wait = _calculate_timeval(nextStop, &tv);
247             }
248 
249             debug(hioselect) safe_tracef("waiting for events %s", wait is null?"forever":"%s".format(*wait));
250             auto ready = select(fdmax+1, &read_fds, &write_fds, &err_fds, wait);
251             debug(hioselect) safe_tracef("returned %d events", ready);
252             if ( ready < 0 && errno == EINTR ) {
253                 int s_ind;
254                 while(s_ind < last_signal_index) {
255                     int signum = last_signal[s_ind];
256                     assert(signals[signum].length > 0);
257                     foreach(s; signals[signum]) {
258                         debug(hioselect) safe_tracef("processing signal handler %s", s);
259                         try {
260                             SigHandlerDelegate h = s._handler;
261                             h(signum);
262                         } catch (Exception e) {
263                             errorf("Uncaught exception: %s", e);
264                         }
265                     }
266                     s_ind++;
267                 }
268                 last_signal_index = 0;
269                 continue;
270             }
271             if ( ready < 0 )
272             {
273                 errorf("on call: (%s, %s, %s, %s)", fdmax+1, read_fds, write_fds, tv);
274                 errorf("select returned error %s", fromStringz(strerror(errno)));
275             }
276             enforce(ready >= 0);
277             if ( ready == 0 ) {
278                 ulong nowRT = Clock.currStdTime;
279                 // Timedout
280                 // check timingweels
281                 auto toCatchUp = timingwheels.ticksToCatchUp(tick, nowRT);
282                 if(toCatchUp>0)
283                 {
284                     auto wr = timingwheels.advance(toCatchUp);
285                     foreach(t; wr.timers)
286                     {
287                         HandlerDelegate h = t._handler;
288                         try {
289                             h(AppEvent.TMO);
290                         } catch (Exception e) {
291                             errorf("Uncaught exception: %s", e);
292                         }
293                     }
294                 }
295 
296                 if ( precise_timers.length > 0 && precise_timers.front._expires <= now) do {
297                     debug(hioselect) safe_tracef("processing %s, lag: %s",
298                         precise_timers.front, Clock.currTime - precise_timers.front._expires);
299                     Timer t = precise_timers.front;
300                     HandlerDelegate h = t._handler;
301                     assert(t._armed);
302                     t._armed = false;
303                     try {
304                         h(AppEvent.TMO);
305                     } catch (Exception e) {
306                         errorf("Uncaught exception: %s", e);
307                     }
308                     // timer event handler can try to stop exactly this timer,
309                     // so when we returned from handler we can have different front
310                     // and we do not have to remove it.
311                     if ( !precise_timers.empty && precise_timers.front == t ) {
312                         precise_timers.removeFront;
313                     }
314                     now = Clock.currTime;
315                 } while (!precise_timers.empty && precise_timers.front._expires <= now );
316 
317                 // handlers can install some late timers, so...
318                 execute_overdue_timers();
319 
320                 now = Clock.currTime;
321                 if ( !runIndefinitely && now >= deadline ) {
322                     debug(hioselect) safe_tracef("deadline reached");
323                     return;
324                 }
325             }
326             if ( ready > 0 ) {
327                 foreach(int fd; 0..numberOfDescriptors) {
328                     AppEvent e = fileDescriptors[fd]._polling;
329                     if ( e == AppEvent.NONE ) {
330                         continue;
331                     }
332                     debug(hioselect) safe_tracef("check %d for %s", fd, fileDescriptors[fd]);
333                     if ( e & AppEvent.IN && FD_ISSET(fd, &read_fds) ) {
334                         debug(hioselect) safe_tracef("got IN event on file %d", fd);
335                         fileHandlers[fd].eventHandler(fd, AppEvent.IN);
336                     }
337                     if ( e & AppEvent.OUT && FD_ISSET(fd, &write_fds) ) {
338                         debug(hioselect) safe_tracef("got OUT event on file %d", fd);
339                         fileHandlers[fd].eventHandler(fd, AppEvent.OUT);
340                     }
341                 }
342             }
343         }
344     }
345 
346     void start_timer(Timer t) @trusted {
347         debug(hioselect) safe_tracef("insert timer: %s", t);
348         auto now = Clock.currTime;
349         auto d = t._expires - now;
350         d = max(d, 0.seconds);
351         if ( d < tick ) {
352             overdue ~= t;
353             return;
354         }
355         assert(!t._armed);
356         t._armed = true;
357         ulong twNow = timingwheels.currStdTime(tick);
358         Duration twdelay = (now.stdTime - twNow).hnsecs;
359         debug(hioselect) safe_tracef("tw delay: %s", (now.stdTime - twNow).hnsecs);
360         timingwheels.schedule(t, (d + twdelay)/tick);
361     }
362 
363     void stop_timer(Timer t) @trusted {
364         debug(hioselect) safe_tracef("remove timer %s", t);
365         {
366             // static destructors can try to stop timers after loop deinit
367             timingwheels.cancel(t);
368         }
369     }
370 
371     void start_precise_timer(Timer t) @trusted {
372         debug(hioselect) safe_tracef("insert timer: %s", t);
373         auto d = t._expires - Clock.currTime;
374         d = max(d, 0.seconds);
375         if ( d == 0.seconds ) {
376             overdue ~= t;
377             return;
378         }
379         precise_timers.insert(t);
380     }
381 
382     void stop_precise_timer(Timer t) @trusted {
383         assert(!precise_timers.empty, "You are trying to remove timer %s, but timer list is empty".format(t));
384         debug(hioselect) safe_tracef("remove timer %s", t);
385         auto r = precise_timers.equalRange(t);
386         precise_timers.remove(r);
387     }
388 
389     void start_poll(int fd, AppEvent ev, FileEventHandler h) pure @safe {
390         enforce(fd >= 0, "fileno can't be negative");
391         enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors");
392         debug(hioselect) safe_tracef("start poll on fd %d for events %s", fd, appeventToString(ev));
393         fileDescriptors[fd]._polling |= ev;
394         fileHandlers[fd] = h;
395     }
396     void stop_poll(int fd, AppEvent ev) @safe {
397         enforce(fd >= 0, "fileno can't be negative");
398         enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors");
399         debug(hioselect) safe_tracef("stop poll on fd %d for events %s", fd, appeventToString(ev));
400         fileDescriptors[fd]._polling &= ev ^ AppEvent.ALL;
401     }
402 
403     int get_kernel_id() @safe @nogc {
404         return -1;
405     }
406 
407     void wait_for_user_event(int event_id, FileEventHandler handler) @safe {
408     
409     }
410     void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe {
411     
412     }
413 
414     void detach(int fd) @safe {
415         fileHandlers[fd] = null;
416     }
417 
418 //    pragma(inline)
419 //    void processNotification(Notification ue) @safe {
420 //        ue.handler();
421 //    }
422 
423 //    void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe {
424 //        debug trace("posting notification");
425 //        if ( !notificationsQueue.full )
426 //        {
427 //            notificationsQueue.put(notification);
428 //            return;
429 //        }
430 //        // now try to find space for next notification
431 //        auto retries = 10 * notificationsQueue.Size;
432 //        while(notificationsQueue.full && retries > 0)
433 //        {
434 //            retries--;
435 //            auto _n = notificationsQueue.get();
436 //            processNotification(_n);
437 //        }
438 //        enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue");
439 //        notificationsQueue.put(notification);
440 //    }
441     void flush() {
442     }
443     void start_signal(Signal s) {
444         debug(hioselect) safe_tracef("start signal %s", s);
445         debug(hioselect) safe_tracef("signals: %s", signals);
446         auto r = s._signum in signals;
447         if ( r is null || r.length == 0 ) {
448             // enable signal only through kevent
449             _add_kernel_signal(s);
450         }
451         signals[s._signum] ~= s;
452     }
453 
454     void stop_signal(Signal s) {
455         debug trace("stop signal");
456         auto r = s._signum in signals;
457         if ( r is null ) {
458             throw new NotFoundException("You tried to stop signal that was not started");
459         }
460         Signal[] new_row;
461         foreach(a; *r) {
462             if (a._id == s._id) {
463                 continue;
464             }
465             new_row ~= a;
466         }
467         if ( new_row.length == 0 ) {
468             *r = null;
469             _del_kernel_signal(s);
470             // reenable old signal behaviour
471         } else {
472             *r = new_row;
473         }
474         debug(hioselect) safe_tracef("new signals %d row %s", s._signum, new_row);
475     }
476     void _add_kernel_signal(Signal s) {
477         signal(s._signum, &sig_catcher);
478         debug(hioselect) safe_tracef("adding handler for signum %d: %x", s._signum, &this);
479     }
480     void _del_kernel_signal(Signal s) {
481         signal(s._signum, SIG_DFL);
482         debug(hioselect) safe_tracef("deleted handler for signum %d: %x", s._signum, &this);
483     }
484 }