1 module hio.drivers.select;
2 
3 import std.datetime;
4 import std.container;
5 import std.experimental.logger;
6 
7 import std.experimental.allocator;
8 import std.experimental.allocator.mallocator;
9 import std.typecons;
10 
11 import std.string;
12 import std.algorithm.comparison: min, max;
13 import std.exception: enforce;
14 import core.thread;
15 
16 version(Windows) {
17     import core.sys.windows.winsock2;
18 }
19 version(Posix) {
20     import core.sys.posix.sys.select;
21 }
22 
23 import core.stdc.string: strerror;
24 import core.stdc.errno;
25 import core.stdc.signal;
26 
27 import hio.events;
28 
29 //
30 // TODO add support for multiple select event loops
31 //
32 private enum                            sig_array_length = 256;
33 private static int[sig_array_length]    last_signal;
34 private static int                      last_signal_index;
35 
36 extern(C) void sig_catcher(int signum) nothrow @nogc {
37     last_signal[last_signal_index++] = signum;
38 }
39 
40 private struct FileDescriptor {
41     package {
42         AppEvent            _polling = AppEvent.NONE;
43     }
44     string toString() const @safe {
45         import std.format: format;
46         return appeventToString(_polling);
47         //return "FileDescriptor: filehandle: %d, events: %s".format(_fileno, appeventToString(_polling));
48     }
49 }
50 
51 struct FallbackEventLoopImpl {
52     immutable string _name = "select";
53     immutable numberOfDescriptors = 1024;
54 
55     private {
56         fd_set                  read_fds;
57         fd_set                  write_fds;
58         fd_set                  err_fds;
59 
60         bool                    stopped = false;
61         RedBlackTree!Timer      timers;
62         Timer[]                 overdue;    // timers added with expiration in past
63 
64         Signal[][int]           signals;
65 
66         FileDescriptor[numberOfDescriptors]    fileDescriptors;
67         FileEventHandler[]      fileHandlers;
68         //CircBuff!Notification   notificationsQueue;
69     }
70 
71     @disable this(this) {};
72 
73     void initialize() @safe nothrow {
74         timers = new RedBlackTree!Timer();
75         fileHandlers = new FileEventHandler[](1024);
76     }
77     void deinit() @safe {
78         timers = null;
79     }
80     void stop() @safe {
81         debug trace("mark eventloop as stopped");
82         stopped = true;
83     }
84 
85     /**
86      * Find shortest interval between now->deadline, now->earliest timer
87      * If deadline expired or timer in past - set zero wait time
88      */
89     timeval* _calculate_timeval(SysTime deadline, timeval* tv) {
90         SysTime now = Clock.currTime;
91         Duration d = deadline - now;
92         if ( ! timers.empty ) {
93             d = min(d, timers.front._expires - now);
94         }
95         d = max(d, 0.seconds);
96         auto converted = d.split!("seconds", "usecs");
97         tv.tv_sec  = cast(typeof(tv.tv_sec))converted.seconds;
98         tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs;
99         return tv;
100     }
101     timeval* _calculate_timeval(timeval* tv) {
102         SysTime  now = Clock.currTime;
103         Duration d;
104         d = timers.front._expires - now;
105         d = max(d, 0.seconds);
106         auto converted = d.split!("seconds", "usecs");
107         tv.tv_sec  = cast(typeof(tv.tv_sec))converted.seconds;
108         tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs;
109         return tv;
110     }
111     void run(Duration d) {
112 
113         immutable bool runIndefinitely = (d == Duration.max);
114         SysTime now = Clock.currTime;
115         SysTime deadline;
116         timeval tv;
117         timeval* wait;
118 
119         if ( ! runIndefinitely ) {
120             deadline = now + d;
121         }
122 
123         debug tracef("evl run %s",runIndefinitely? "indefinitely": "for %s".format(d));
124 
125         scope(exit) {
126             stopped = false;
127         }
128 
129         while (!stopped) {
130 
131             int fdmax = -1;
132 
133             //
134             // handle user events(notifications)
135             //
136             //auto counter = notificationsQueue.Size * 10;
137             //while(!notificationsQueue.empty){
138             //    auto n = notificationsQueue.get();
139             //    n.handler();
140             //    counter--;
141             //    enforce(counter > 0, "Can't clear notificatioinsQueue");
142            // }
143 
144             while (overdue.length > 0) {
145                 // execute timers which user requested with negative delay
146                 Timer t = overdue[0];
147                 overdue = overdue[1..$];
148                 debug tracef("execute overdue %s", t);
149                 HandlerDelegate h = t._handler;
150                 try {
151                     h(AppEvent.TMO);
152                 } catch (Exception e) {
153                     errorf("Uncaught exception: %s", e);
154                 }
155             }
156             if (stopped) {
157                 break;
158             } 
159 
160             while ( !timers.empty && timers.front._expires <= now) {
161                 debug tracef("processing overdue  %s, lag: %s", timers.front, Clock.currTime - timers.front._expires);
162                 Timer t = timers.front;
163                 HandlerDelegate h = t._handler;
164                 timers.removeFront;
165                 try {
166                     h(AppEvent.TMO);
167                 } catch (Exception e) {
168                     errorf("Uncaught exception: %s", e);
169                 }
170                 now = Clock.currTime;
171             }
172 
173             FD_ZERO(&read_fds);
174             FD_ZERO(&write_fds);
175             FD_ZERO(&err_fds);
176 
177             foreach(int fd; 0..numberOfDescriptors) {
178                 AppEvent e = fileDescriptors[fd]._polling;
179                 if ( e == AppEvent.NONE ) {
180                     continue;
181                 }
182                 debug tracef("poll %d for %s", fd, fileDescriptors[fd]);
183                 if ( e & AppEvent.IN ) {
184                     FD_SET(fd, &read_fds);
185                 }
186                 if ( e & AppEvent.OUT ) {
187                     FD_SET(fd, &write_fds);
188                 }
189                 fdmax = max(fdmax, fd);
190             }
191 
192             wait = (runIndefinitely && timers.empty)  ?
193                           null
194                         : _calculate_timeval(deadline, &tv);
195             if ( runIndefinitely && timers.empty ) {
196                 wait = null;
197             } else
198             if ( runIndefinitely && !timers.empty ) {
199                 wait = _calculate_timeval(&tv);
200             } else
201                 wait = _calculate_timeval(deadline, &tv);
202 
203             //debug tracef("waiting for events %s", wait is null?"forever":"%s".format(*wait));
204             auto ready = select(fdmax+1, &read_fds, &write_fds, &err_fds, wait);
205             //debug tracef("returned %d events", ready);
206             if ( ready < 0 && errno == EINTR ) {
207                 int s_ind;
208                 while(s_ind < last_signal_index) {
209                     int signum = last_signal[s_ind];
210                     assert(signals[signum].length > 0);
211                     foreach(s; signals[signum]) {
212                         debug tracef("processing signal handler %s", s);
213                         try {
214                             SigHandlerDelegate h = s._handler;
215                             h(signum);
216                         } catch (Exception e) {
217                             errorf("Uncaught exception: %s", e);
218                         }
219                     }
220                     s_ind++;
221                 }
222                 last_signal_index = 0;
223                 continue;
224             }
225             if ( ready < 0 ) {
226                 errorf("on call: (%s, %s, %s, %s)", fdmax+1, read_fds, write_fds, tv);
227                 errorf("select returned error %s", fromStringz(strerror(errno)));
228             }
229             enforce(ready >= 0);
230             if ( ready == 0 ) {
231                 // Timedout
232                 //
233                 // For select there can be two reasons for ready == 0:
234                 // 1. we reached deadline
235                 // 2. we have timer event
236                 //
237                 if ( timers.empty ) {
238                     // there were no timers, so this can be only timeout
239                     debug trace("select timedout and no timers active");
240                     assert(Clock.currTime >= deadline);
241                     return;
242                 }
243                 now = Clock.currTime;
244                 if ( !runIndefinitely && now >= deadline ) {
245                     debug trace("deadline reached");
246                     return;
247                 }
248 
249                 /*
250                  * Invariants for timers
251                  * ---------------------
252                  * timer list must not be empty at event.
253                  * we have to receive event only on the earliest timer in list
254                 */
255                 assert(!timers.empty, "timers empty on timer event");
256                 /* */
257 
258                 if ( timers.front._expires <= now) do {
259                     debug tracef("processing %s, lag: %s", timers.front, Clock.currTime - timers.front._expires);
260                     Timer t = timers.front;
261                     HandlerDelegate h = t._handler;
262                     try {
263                         h(AppEvent.TMO);
264                     } catch (Exception e) {
265                         errorf("Uncaught exception: %s", e);
266                     }
267                     // timer event handler can try to stop exactly this timer,
268                     // so when we returned from handler we can have different front
269                     // and we do not have to remove it.
270                     if ( !timers.empty && timers.front == t ) {
271                         timers.removeFront;
272                     }
273                     now = Clock.currTime;
274                 } while (!timers.empty && timers.front._expires <= now );
275             }
276             if ( ready > 0 ) {
277                 foreach(int fd; 0..numberOfDescriptors) {
278                     AppEvent e = fileDescriptors[fd]._polling;
279                     if ( e == AppEvent.NONE ) {
280                         continue;
281                     }
282                     debug tracef("check %d for %s", fd, fileDescriptors[fd]);
283                     if ( e & AppEvent.IN && FD_ISSET(fd, &read_fds) ) {
284                         debug tracef("got IN event on file %d", fd);
285                         fileHandlers[fd].eventHandler(fd, AppEvent.IN);
286                     }
287                     if ( e & AppEvent.OUT && FD_ISSET(fd, &write_fds) ) {
288                         debug tracef("got OUT event on file %d", fd);
289                         fileHandlers[fd].eventHandler(fd, AppEvent.OUT);
290                     }
291                 }
292             }
293         }
294     }
295 
296     void start_timer(Timer t) @trusted {
297         debug tracef("insert timer: %s", t);
298         auto d = t._expires - Clock.currTime;
299         d = max(d, 0.seconds);
300         if ( d == 0.seconds ) {
301             overdue ~= t;
302             return;
303         }
304         timers.insert(t);
305     }
306 
307     void stop_timer(Timer t) @trusted {
308         assert(!timers.empty, "You are trying to remove timer %s, but timer list is empty".format(t));
309         debug tracef("remove timer %s", t);
310         auto r = timers.equalRange(t);
311         timers.remove(r);
312     }
313 
314     void start_poll(int fd, AppEvent ev, FileEventHandler h) pure @safe {
315         enforce(fd >= 0, "fileno can't be negative");
316         enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors");
317         debug tracef("start poll on fd %d for events %s", fd, appeventToString(ev));
318         fileDescriptors[fd]._polling |= ev;
319         fileHandlers[fd] = h;
320     }
321     void stop_poll(int fd, AppEvent ev) @safe {
322         enforce(fd >= 0, "fileno can't be negative");
323         enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors");
324         debug tracef("stop poll on fd %d for events %s", fd, appeventToString(ev));
325         fileDescriptors[fd]._polling &= ev ^ AppEvent.ALL;
326     }
327 
328     int get_kernel_id() @safe @nogc {
329         return -1;
330     }
331 
332     void wait_for_user_event(int event_id, FileEventHandler handler) @safe {
333     
334     }
335     void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe {
336     
337     }
338 
339     void detach(int fd) @safe {
340         fileHandlers[fd] = null;
341     }
342 
343 //    pragma(inline)
344 //    void processNotification(Notification ue) @safe {
345 //        ue.handler();
346 //    }
347 
348 //    void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe {
349 //        debug trace("posting notification");
350 //        if ( !notificationsQueue.full )
351 //        {
352 //            notificationsQueue.put(notification);
353 //            return;
354 //        }
355 //        // now try to find space for next notification
356 //        auto retries = 10 * notificationsQueue.Size;
357 //        while(notificationsQueue.full && retries > 0)
358 //        {
359 //            retries--;
360 //            auto _n = notificationsQueue.get();
361 //            processNotification(_n);
362 //        }
363 //        enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue");
364 //        notificationsQueue.put(notification);
365 //    }
366     void flush() {
367     }
368     void start_signal(Signal s) {
369         debug tracef("start signal %s", s);
370         debug tracef("signals: %s", signals);
371         auto r = s._signum in signals;
372         if ( r is null || r.length == 0 ) {
373             // enable signal only through kevent
374             _add_kernel_signal(s);
375         }
376         signals[s._signum] ~= s;
377     }
378 
379     void stop_signal(Signal s) {
380         debug trace("stop signal");
381         auto r = s._signum in signals;
382         if ( r is null ) {
383             throw new NotFoundException("You tried to stop signal that was not started");
384         }
385         Signal[] new_row;
386         foreach(a; *r) {
387             if (a._id == s._id) {
388                 continue;
389             }
390             new_row ~= a;
391         }
392         if ( new_row.length == 0 ) {
393             *r = null;
394             _del_kernel_signal(s);
395             // reenable old signal behaviour
396         } else {
397             *r = new_row;
398         }
399         debug tracef("new signals %d row %s", s._signum, new_row);
400     }
401     void _add_kernel_signal(Signal s) {
402         signal(s._signum, &sig_catcher);
403         debug tracef("adding handler for signum %d: %x", s._signum, &this);
404     }
405     void _del_kernel_signal(Signal s) {
406         signal(s._signum, SIG_DFL);
407         debug tracef("deleted handler for signum %d: %x", s._signum, &this);
408     }
409 }