1 module hio.loop;
2 
3 import std.traits;
4 import std.datetime;
5 import std.container;
6 import std.exception;
7 import std.experimental.logger;
8 import std.typecons;
9 import hio.drivers;
10 import hio.events;
11 import core.time;
12 
13 import core.sys.posix.sys.socket;
14 
15 enum Mode:int {NATIVE = 0, FALLBACK}
16 
17 shared Mode globalLoopMode = Mode.NATIVE;
18 
19 private static hlEvLoop[Mode.FALLBACK+1] _defaultLoop;
20 
21 hlEvLoop getDefaultLoop(Mode mode = globalLoopMode) @safe nothrow {
22     if ( _defaultLoop[mode] is null ) {
23         _defaultLoop[mode] = new hlEvLoop(mode);
24     }
25     return _defaultLoop[mode];
26 }
27 
28 shared static this() {
29     uninitializeLoops();
30 }
31 
32 shared static ~this() {
33     uninitializeLoops();
34 }
35 
36 package void uninitializeLoops() {
37     if (_defaultLoop[Mode.NATIVE])
38     {
39         _defaultLoop[Mode.NATIVE].deinit();
40         _defaultLoop[Mode.NATIVE] = null;
41     }
42     if (_defaultLoop[Mode.FALLBACK])
43     {
44         _defaultLoop[Mode.FALLBACK].deinit();
45         _defaultLoop[Mode.FALLBACK] = null;
46     }
47 }
48 
49 version (unittest)
50 {
51     // XXX remove when https://issues.dlang.org/show_bug.cgi?id=20256 fixed
52     extern (C) __gshared string[] rt_options = ["gcopt=parallel:0"];
53 }
54 
55 ///
56 /// disable default signal handling if you plan to handle signal in
57 /// child threads
58 ///
59 void ignoreSignal(int signum) {
60     version (linux) {
61         import core.sys.posix.signal;
62 
63         sigset_t m;
64         sigemptyset(&m);
65         sigaddset(&m, signum);
66         pthread_sigmask(SIG_BLOCK, &m, null);
67     }
68     version (OSX) {
69         import core.sys.posix.signal;
70 
71         sigset_t m;
72         sigemptyset(&m);
73         sigaddset(&m, signum);
74         pthread_sigmask(SIG_BLOCK, &m, null);
75     }
76     version (FreeBSD) {
77         import core.sys.posix.signal;
78 
79         sigset_t m;
80         sigemptyset(&m);
81         sigaddset(&m, signum);
82         pthread_sigmask(SIG_BLOCK, &m, null);
83     }
84 }
85 
86 final class hlEvLoop {
87     package:
88         NativeEventLoopImpl           _nimpl;
89         FallbackEventLoopImpl         _fimpl;
90         string                        _name;
91 
92     public:
93         void delegate(scope Duration = Duration.max)       run;
94         @safe void delegate()                              stop;
95         @safe void delegate()                              shutdown;
96         @safe void delegate(Timer)                         startTimer;
97         @safe void delegate(Timer)                         stopTimer;
98         void delegate(Signal)                              startSignal;
99         void delegate(Signal)                              stopSignal;
100         @safe void delegate(int, AppEvent, FileEventHandler)   startPoll; // start listening to some events
101         @safe void delegate(int, AppEvent)                 stopPoll;  // stop listening to some events
102         @safe void delegate(int)                           detach;    // detach file from loop
103         //@safe void delegate(Notification, Broadcast)       postNotification;
104         @safe void delegate()                              deinit;
105         @safe void delegate(int, FileEventHandler)         waitForUserEvent;
106         @safe void delegate(int, FileEventHandler)         stopWaitForUserEvent;
107         @safe @nogc int delegate()                         getKernelId;
108 
109 
110     public:
111         string name() const pure nothrow @safe @property {
112             return _name;
113         }
114         this(Mode m = Mode.NATIVE) @safe nothrow {
115             switch(m) {
116             case Mode.NATIVE:
117                 _name = _nimpl._name;
118                 _nimpl.initialize();
119                 run = &_nimpl.run;
120                 stop = &_nimpl.stop;
121                 shutdown = &_nimpl.shutdown;
122                 startTimer = &_nimpl.start_timer;
123                 stopTimer = &_nimpl.stop_timer;
124                 startSignal = &_nimpl.start_signal;
125                 stopSignal = &_nimpl.stop_signal;
126                 startPoll = &_nimpl.start_poll;
127                 stopPoll = &_nimpl.stop_poll;
128                 detach = &_nimpl.detach;
129                 //postNotification = &_nimpl.postNotification;
130                 deinit = &_nimpl.deinit;
131                 waitForUserEvent = &_nimpl.wait_for_user_event;
132                 stopWaitForUserEvent = &_nimpl.stop_wait_for_user_event;
133                 getKernelId = &_nimpl.get_kernel_id;
134                 break;
135             case Mode.FALLBACK:
136                 _name = _fimpl._name;
137                 _fimpl.initialize();
138                 run = &_fimpl.run;
139                 stop = &_fimpl.stop;
140                 shutdown = &_nimpl.shutdown;
141                 startTimer = &_fimpl.start_timer;
142                 stopTimer = &_fimpl.stop_timer;
143                 startSignal = &_fimpl.start_signal;
144                 stopSignal = &_fimpl.stop_signal;
145                 startPoll = &_fimpl.start_poll;
146                 stopPoll = &_fimpl.stop_poll;
147                 detach = &_fimpl.detach;
148                 //postNotification = &_fimpl.postNotification;
149                 deinit = &_fimpl.deinit;
150                 waitForUserEvent = &_fimpl.wait_for_user_event;
151                 stopWaitForUserEvent = &_fimpl.stop_wait_for_user_event;
152                 getKernelId = &_fimpl.get_kernel_id;
153                 break;
154             default:
155                 assert(0, "Unknown loop mode");
156             }
157         }
158 }
159 
160 unittest {
161     import std.stdio;
162     globalLogLevel = LogLevel.info;
163     auto best_loop = getDefaultLoop();
164     auto fallback_loop = getDefaultLoop(Mode.FALLBACK);
165     writefln("Native   event loop: %s", best_loop.name);
166     writefln("Fallback event loop: %s", fallback_loop.name);
167     uninitializeLoops();
168 }
169 
170 unittest {
171     info(" === test 'stop before start' ===");
172     // stop before start
173     auto native = new hlEvLoop();
174     auto fallb = new hlEvLoop(Mode.FALLBACK);
175     auto loops = [native, fallb];
176     foreach(loop; loops)
177     {
178         infof(" --- '%s' loop ---", loop.name);
179         auto now = Clock.currTime;
180         loop.stop();
181         loop.run(1.seconds);
182         assert(Clock.currTime - now < 1.seconds);
183     }
184     native.deinit();
185     fallb.deinit();
186 }
187 
188 unittest {
189     info(" === Testing timers ===");
190 }
191 @safe unittest {
192     int i1, i2;
193     HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;};
194     HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;};
195     {
196         auto now = Clock.currTime;
197         Timer a = new Timer(now, h1);
198         Timer b = new Timer(now, h1);
199         assert(b>a);
200     }
201     {
202         auto now = Clock.currTime;
203         Timer a = new Timer(now, h1);
204         Timer b = new Timer(now + 1.seconds, h1);
205         assert(b>a);
206     }
207 }
208 
209 unittest {
210     import core.thread;
211     import std.format;
212     int i1, i2;
213     HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;};
214     HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;};
215     auto native = new hlEvLoop();
216     auto fallb = new hlEvLoop(Mode.FALLBACK);
217     auto loops = [native, fallb];
218     foreach(loop; loops)
219     {
220         globalLogLevel = LogLevel.info;
221         infof(" --- '%s' loop ---", loop.name);
222         i1 = i2 = 0;
223         info("test start/stop timer before loop run");
224         /** test startTimer, and then stopTimer before runLoop */
225         auto now = Clock.currTime;
226         Timer a = new Timer(now + 100.msecs, h1);
227         Timer b = new Timer(now + 1000.msecs, h2);
228         loop.startTimer(a);
229         loop.startTimer(b);
230         loop.stopTimer(b);
231         loop.run(200.msecs);
232         assert(i1==1);
233         assert(i2==0);
234 
235         /** stop event loop inside from timer **/
236         infof("stop event loop inside from timer");
237         now = Clock.currTime;
238 
239         i1 = 0;
240         a = new Timer(10.msecs, (AppEvent e) @safe {
241             loop.stop();
242         });
243         b = new Timer(110.msecs, h1);
244         loop.startTimer(a);
245         loop.startTimer(b);
246         loop.run(Duration.max);
247         assert(i1 == 0);
248         loop.stopTimer(b);
249 
250         info("test pending timer events");
251         loop.run(50.msecs);
252         i1 = 0;
253         a = new Timer(50.msecs, h1);
254         loop.startTimer(a);
255         loop.run(10.msecs);
256         assert(i1==0, "i1 expected 0, got %d".format(i1));
257         Thread.sleep(45.msecs);
258         loop.run(0.seconds);
259         assert(i1==1, "i1 expected 1, got %d".format(i1));
260 
261         info("testing overdue timers");
262         int[]   seq;
263         auto    slow = delegate void(AppEvent e) @trusted {Thread.sleep(20.msecs); seq ~= 1;};
264         auto    fast = delegate void(AppEvent e) @safe {seq ~= 2;};
265         a = new Timer(50.msecs, slow);
266         b = new Timer(60.msecs, fast);
267         loop.startTimer(a);
268         loop.startTimer(b);
269         loop.run(100.msecs);
270         assert(seq == [1,2]);
271 
272         a = new Timer(-5.seconds, fast);
273         loop.startTimer(a);
274         loop.run(0.seconds);
275 
276         assert(seq == [1,2,2]);
277 
278         seq = new int[](0);
279         /** test setting overdue timer inside from overdue timer **/
280         auto set_next = delegate void(AppEvent e) {
281             b = new Timer(-10.seconds, fast);
282             loop.startTimer(b);
283         };
284         a = new Timer(-5.seconds, set_next);
285         loop.startTimer(a);
286         loop.run(10.msecs);
287         assert(seq == [2]);
288 
289         seq = new int[](0);
290         /** test setting overdue timer inside from normal timer **/
291         set_next = delegate void(AppEvent e) {
292             b = new Timer(-10.seconds, fast);
293             loop.startTimer(b);
294         };
295         a = new Timer(50.msecs, set_next);
296         loop.startTimer(a);
297         loop.run(60.msecs);
298         assert(seq == [2]);
299 
300         info("timer execution order");
301         now = Clock.currTime;
302         seq.length = 0;
303         HandlerDelegate sh1 = delegate void(AppEvent e) {seq ~= 1;};
304         HandlerDelegate sh2 = delegate void(AppEvent e) {seq ~= 2;};
305         HandlerDelegate sh3 = delegate void(AppEvent e) {seq ~= 3;};
306         assertThrown(new Timer(SysTime.init, null));
307         a = new Timer(now + 500.msecs, sh1);
308         b = new Timer(now + 500.msecs, sh2);
309         Timer c = new Timer(now + 300.msecs, sh3);
310         loop.startTimer(a);
311         loop.startTimer(b);
312         loop.startTimer(c);
313         loop.run(510.msecs);
314         //assert(seq == [3, 1, 2]); // this should work only with precise
315 
316         // info("exception handling in timer");
317         // HandlerDelegate throws = delegate void(AppEvent e){throw new Exception("test exception");};
318         // a = new Timer(50.msecs, throws);
319         // loop.startTimer(a);
320         // auto logLevel = globalLogLevel;
321         // globalLogLevel = LogLevel.fatal;
322         // loop.run(100.msecs);
323         // globalLogLevel = logLevel;
324         // infof(" --- end ---");
325     }
326     native.deinit();
327     fallb.deinit();
328 }
329 
330 unittest {
331     info("=== Testing signals ===");
332     auto savedloglevel = globalLogLevel;
333     //globalLogLevel = LogLevel.trace;
334     import core.sys.posix.signal;
335     import core.thread;
336     import core.sys.posix.unistd;
337     import core.sys.posix.sys.wait;
338     import core.stdc.stdlib: exit;
339 
340     int i1, i2;
341     auto native = new hlEvLoop();
342     auto fallb = new hlEvLoop(Mode.FALLBACK);
343     auto loops = [native, fallb];
344     SigHandlerDelegate h1 = delegate void(int signum) {
345         i1++;
346     };
347     SigHandlerDelegate h2 = delegate void(int signum) {
348         i2++;
349     };
350     foreach(loop; loops) {
351         infof("testing loop '%s'", loop.name);
352         i1 = i2 = 0;
353         auto sighup1 = new Signal(SIGHUP, h1);
354         auto sighup2 = new Signal(SIGHUP, h2);
355         auto sigint1 = new Signal(SIGINT, h2);
356         foreach(s; [sighup1, sighup2, sigint1]) {
357             loop.startSignal(s);
358         }
359         auto parent_pid = getpid();
360         auto child_pid = fork();
361         if ( child_pid == 0 ) {
362             Thread.sleep(500.msecs);
363             kill(parent_pid, SIGHUP);
364             kill(parent_pid, SIGINT);
365             native.deinit();
366             fallb.deinit();
367             exit(0);
368         } else {
369             loop.run(1.seconds);
370             waitpid(child_pid, null, 0);
371         }
372         assert(i1 == 1);
373         assert(i2 == 2);
374         foreach(s; [sighup1, sighup2, sigint1]) {
375             loop.stopSignal(s);
376         }
377         loop.run(1.msecs); // send stopSignals to kernel
378     }
379     native.deinit();
380     fallb.deinit();
381     globalLogLevel = savedloglevel;
382 }
383 
384 unittest {
385     globalLogLevel = LogLevel.info;
386     info(" === Testing sockets ===");
387     import std.string, std.stdio;
388     import hio.socket;
389 
390     auto native = new hlEvLoop();
391     auto fallb = new hlEvLoop(Mode.FALLBACK);
392     auto loops = [native, fallb];
393     foreach(loop; loops) {
394         infof("testing loop '%s'", loop.name);
395         immutable limit = 1;
396         int requests = 0;
397         int responses = 0;
398         hlSocket client, server;
399         immutable(ubyte)[] input;
400         immutable(ubyte)[] response = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".representation;
401         string request = "GET / HTTP/1.1\r\nId: %d\r\n\r\n";
402     
403         void server_handler(AsyncSocketLike so) @safe {
404             auto s = cast(hlSocket)so;
405             tracef("server accepted on %s", s.fileno());
406             IORequest iorq;
407             iorq.to_read = 512;
408             iorq.callback = (ref IOResult r) {
409                 if ( r.timedout ) {
410                     s.close();
411                     return;
412                 }
413                 s.send(response);
414                 s.close();
415             };
416             s.io(loop, iorq, dur!"seconds"(10));
417         }
418     
419         void client_handler(AppEvent e) @safe {
420             tracef("connection app handler");
421             if ( e & (AppEvent.ERR|AppEvent.HUP) ) {
422                 tracef("error on %s", client);
423                 client.close();
424                 return;
425             }
426             tracef("sending to %s", client);
427             auto rc = client.send(request.format(requests).representation());
428             if ( rc == -1 ) {
429                 tracef("error on %s", client);
430                 client.close();
431                 return;
432             }
433             tracef("send returned %d", rc);
434             IORequest iorq;
435             iorq.to_read = 512;
436             iorq.callback = (ref IOResult r) {
437                 if ( r.timedout ) {
438                     info("Client timeout waiting for response");
439                     client.close();
440                     return;
441                 }
442                 // client received response from server
443                 responses++;
444                 client.close();
445                 if ( ++requests < limit ) {
446                     client = new hlSocket();
447                     client.open();
448                     client.connect("127.0.0.1:16000", loop, &client_handler, dur!"seconds"(5));
449                 }
450             };
451             client.io(loop, iorq, 10.seconds);
452         }
453     
454         server = new hlSocket();
455         server.open();
456         assert(server.fileno() >= 0);
457         scope(exit) {
458             debug tracef("closing server socket %s", server);
459             server.close();
460         }
461         tracef("server listen on %d", server.fileno());
462         server.bind("0.0.0.0:16000");
463         server.listen();
464         server.accept(loop, Duration.max, &server_handler);
465     
466         loop.startTimer(new Timer(50.msecs,  (AppEvent e) @safe {
467             client = new hlSocket();
468             client.open();
469             client.connect("127.0.0.1:16000", loop, &client_handler, 5.seconds);
470         }));
471 
472         loop.startTimer(new Timer(100.msecs,  (AppEvent e) @safe {
473             client = new hlSocket();
474             client.open();
475             client.connect("127.0.0.1:16001", loop, &client_handler, 5.seconds);
476         }));
477     
478         loop.run(1.seconds);
479         assert(responses == limit, "%s != %s".format(responses, limit));
480         globalLogLevel = LogLevel.info;
481     }
482     native.deinit();
483     fallb.deinit();
484 }