1 module hio.loop;
2 
3 import std.traits;
4 import std.datetime;
5 import std.container;
6 import std.exception;
7 import std.experimental.logger;
8 import std.typecons;
9 import hio.drivers;
10 import hio.events;
11 import core.time;
12 
13 import core.sys.posix.sys.socket;
14 
15 enum Mode:int {NATIVE = 0, FALLBACK}
16 
17 shared Mode globalLoopMode = Mode.NATIVE;
18 
19 private static hlEvLoop[Mode.FALLBACK+1] _defaultLoop;
20 
21 hlEvLoop getDefaultLoop(Mode mode = globalLoopMode) @safe nothrow {
22     if ( _defaultLoop[mode] is null ) {
23         _defaultLoop[mode] = new hlEvLoop(mode);
24     }
25     return _defaultLoop[mode];
26 }
27 
28 shared static this() {
29     uninitializeLoops();
30 };
31 
32 package void uninitializeLoops() {
33     _defaultLoop[Mode.NATIVE] = null;
34     _defaultLoop[Mode.FALLBACK] = null;
35 }
36 
37 version (unittest)
38 {
39     // XXX remove when https://issues.dlang.org/show_bug.cgi?id=20256 fixed
40     extern (C) __gshared string[] rt_options = ["gcopt=parallel:0"];
41 }
42 
43 ///
44 /// disable default signal handling if you plan to handle signal in
45 /// child threads
46 ///
47 void ignoreSignal(int signum) {
48     version (linux) {
49         import core.sys.posix.signal;
50 
51         sigset_t m;
52         sigemptyset(&m);
53         sigaddset(&m, signum);
54         pthread_sigmask(SIG_BLOCK, &m, null);
55     }
56     version (OSX) {
57         import core.sys.posix.signal;
58 
59         sigset_t m;
60         sigemptyset(&m);
61         sigaddset(&m, signum);
62         pthread_sigmask(SIG_BLOCK, &m, null);
63     }
64 }
65 
66 final class hlEvLoop {
67     package:
68         NativeEventLoopImpl           _nimpl;
69         FallbackEventLoopImpl         _fimpl;
70         string                        _name;
71 
72     public:
73         void delegate(scope Duration = Duration.max)                      run;
74         @safe void delegate()                              stop;
75         @safe void delegate(Timer)                         startTimer;
76         @safe void delegate(Timer)                         stopTimer;
77         void delegate(Signal)                              startSignal;
78         void delegate(Signal)                              stopSignal;
79         @safe void delegate(int, AppEvent, FileEventHandler)   startPoll; // start listening to some events
80         @safe void delegate(int, AppEvent)                 stopPoll;  // stop listening to some events
81         @safe void delegate(int)                           detach;    // detach file from loop
82         //@safe void delegate(Notification, Broadcast)       postNotification;
83         @safe void delegate()                              deinit;
84         @safe void delegate(int, FileEventHandler)         waitForUserEvent;
85         @safe void delegate(int, FileEventHandler)         stopWaitForUserEvent;
86         @safe @nogc int delegate()                         getKernelId;
87 
88 
89     public:
90         string name() const pure nothrow @safe @property {
91             return _name;
92         }
93         this(Mode m = Mode.NATIVE) @safe nothrow {
94             switch(m) {
95             case Mode.NATIVE:
96                 _name = _nimpl._name;
97                 _nimpl.initialize();
98                 run = &_nimpl.run;
99                 stop = &_nimpl.stop;
100                 startTimer = &_nimpl.start_timer;
101                 stopTimer = &_nimpl.stop_timer;
102                 startSignal = &_nimpl.start_signal;
103                 stopSignal = &_nimpl.stop_signal;
104                 startPoll = &_nimpl.start_poll;
105                 stopPoll = &_nimpl.stop_poll;
106                 detach = &_nimpl.detach;
107                 //postNotification = &_nimpl.postNotification;
108                 deinit = &_nimpl.deinit;
109                 waitForUserEvent = &_nimpl.wait_for_user_event;
110                 stopWaitForUserEvent = &_nimpl.stop_wait_for_user_event;
111                 getKernelId = &_nimpl.get_kernel_id;
112                 break;
113             case Mode.FALLBACK:
114                 _name = _fimpl._name;
115                 _fimpl.initialize();
116                 run = &_fimpl.run;
117                 stop = &_fimpl.stop;
118                 startTimer = &_fimpl.start_timer;
119                 stopTimer = &_fimpl.stop_timer;
120                 startSignal = &_fimpl.start_signal;
121                 stopSignal = &_fimpl.stop_signal;
122                 startPoll = &_fimpl.start_poll;
123                 stopPoll = &_fimpl.stop_poll;
124                 detach = &_fimpl.detach;
125                 //postNotification = &_fimpl.postNotification;
126                 deinit = &_fimpl.deinit;
127                 waitForUserEvent = &_fimpl.wait_for_user_event;
128                 stopWaitForUserEvent = &_fimpl.stop_wait_for_user_event;
129                 getKernelId = &_fimpl.get_kernel_id;
130                 break;
131             default:
132                 assert(0, "Unknown loop mode");
133             }
134         }
135 }
136 
137 unittest {
138     import std.stdio;
139     globalLogLevel = LogLevel.info;
140     auto best_loop = getDefaultLoop();
141     auto fallback_loop = getDefaultLoop(Mode.FALLBACK);
142     writefln("Native   event loop: %s", best_loop.name);
143     writefln("Fallback event loop: %s", fallback_loop.name);
144 }
145 
146 unittest {
147     info(" === test 'stop before start' ===");
148     // stop before start
149     auto native = new hlEvLoop();
150     auto fallb = new hlEvLoop(Mode.FALLBACK);
151     auto loops = [native, fallb];
152     foreach(loop; loops)
153     {
154         infof(" --- '%s' loop ---", loop.name);
155         auto now = Clock.currTime;
156         loop.stop();
157         loop.run(1.seconds);
158         assert(Clock.currTime - now < 1.seconds);
159     }
160 }
161 
162 unittest {
163     info(" === Testing timers ===");
164 }
165 @safe unittest {
166     int i1, i2;
167     HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;};
168     HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;};
169     {
170         auto now = Clock.currTime;
171         Timer a = new Timer(now, h1);
172         Timer b = new Timer(now, h1);
173         assert(b>a);
174     }
175     {
176         auto now = Clock.currTime;
177         Timer a = new Timer(now, h1);
178         Timer b = new Timer(now + 1.seconds, h1);
179         assert(b>a);
180     }
181 }
182 
183 unittest {
184     import core.thread;
185     int i1, i2;
186     HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;};
187     HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;};
188     auto native = new hlEvLoop();
189     auto fallb = new hlEvLoop(Mode.FALLBACK);
190     auto loops = [native, fallb];
191     foreach(loop; loops)
192     {
193         globalLogLevel = LogLevel.info;
194         infof(" --- '%s' loop ---", loop.name);
195         i1 = i2 = 0;
196         info("test start/stop timer before loop run");
197         /** test startTimer, and then stopTimer before runLoop */
198         auto now = Clock.currTime;
199         Timer a = new Timer(now + 100.msecs, h1);
200         Timer b = new Timer(now + 1000.msecs, h2);
201         loop.startTimer(a);
202         loop.startTimer(b);
203         loop.stopTimer(b);
204         loop.run(200.msecs);
205         assert(i1==1);
206         assert(i2==0);
207 
208         /** stop event loop inside from timer **/
209         infof("stop event loop inside from timer");
210         now = Clock.currTime;
211 
212         i1 = 0;
213         a = new Timer(10.msecs, (AppEvent e) @safe {
214             loop.stop();
215         });
216         b = new Timer(110.msecs, h1);
217         loop.startTimer(a);
218         loop.startTimer(b);
219         loop.run(Duration.max);
220         assert(i1 == 0);
221         loop.stopTimer(b);
222 
223         info("test pending timer events");
224         loop.run(50.msecs);
225         i1 = 0;
226         a = new Timer(50.msecs, h1);
227         loop.startTimer(a);
228         loop.run(10.msecs);
229         assert(i1==0);
230         Thread.sleep(45.msecs);
231         loop.run(0.seconds);
232         assert(i1==1);
233 
234         info("testing overdue timers");
235         int[]   seq;
236         auto    slow = delegate void(AppEvent e) @trusted {Thread.sleep(20.msecs); seq ~= 1;};
237         auto    fast = delegate void(AppEvent e) @safe {seq ~= 2;};
238         a = new Timer(50.msecs, slow);
239         b = new Timer(60.msecs, fast);
240         loop.startTimer(a);
241         loop.startTimer(b);
242         loop.run(100.msecs);
243         assert(seq == [1,2]);
244 
245         a = new Timer(-5.seconds, fast);
246         loop.startTimer(a);
247         loop.run(0.seconds);
248 
249         assert(seq == [1,2,2]);
250 
251         //globalLogLevel = LogLevel.trace;
252         seq = new int[](0);
253         /** test setting overdue timer inside from overdue timer **/
254         auto set_next = delegate void(AppEvent e) {
255             b = new Timer(-10.seconds, fast);
256             loop.startTimer(b);
257         };
258         a = new Timer(-5.seconds, set_next);
259         loop.startTimer(a);
260         loop.run(10.msecs);
261         assert(seq == [2]);
262 
263         seq = new int[](0);
264         /** test setting overdue timer inside from normal timer **/
265         set_next = delegate void(AppEvent e) {
266             b = new Timer(-10.seconds, fast);
267             loop.startTimer(b);
268         };
269         a = new Timer(50.msecs, set_next);
270         loop.startTimer(a);
271         loop.run(60.msecs);
272         assert(seq == [2]);
273 
274         info("timer execution order");
275         now = Clock.currTime;
276         seq.length = 0;
277         HandlerDelegate sh1 = delegate void(AppEvent e) {seq ~= 1;};
278         HandlerDelegate sh2 = delegate void(AppEvent e) {seq ~= 2;};
279         HandlerDelegate sh3 = delegate void(AppEvent e) {seq ~= 3;};
280         assertThrown(new Timer(SysTime.init, null));
281         a = new Timer(now + 500.msecs, sh1);
282         b = new Timer(now + 500.msecs, sh2);
283         Timer c = new Timer(now + 300.msecs, sh3);
284         loop.startTimer(a);
285         loop.startTimer(b);
286         loop.startTimer(c);
287         loop.run(510.msecs);
288         assert(seq == [3, 1, 2]);
289 
290         info("exception handling in timer");
291         HandlerDelegate throws = delegate void(AppEvent e){throw new Exception("test exception");};
292         a = new Timer(50.msecs, throws);
293         loop.startTimer(a);
294         auto logLevel = globalLogLevel;
295         globalLogLevel = LogLevel.fatal;
296         loop.run(100.msecs);
297         globalLogLevel = logLevel;
298         infof(" --- end ---");
299     }
300 
301 }
302 
303 unittest {
304     info("=== Testing signals ===");
305     auto savedloglevel = globalLogLevel;
306     //globalLogLevel = LogLevel.trace;
307     import core.sys.posix.signal;
308     import core.thread;
309     import core.sys.posix.unistd;
310     import core.sys.posix.sys.wait;
311 
312     int i1, i2;
313     auto native = new hlEvLoop();
314     auto fallb = new hlEvLoop(Mode.FALLBACK);
315     auto loops = [native, fallb];
316     SigHandlerDelegate h1 = delegate void(int signum) {
317         i1++;
318     };
319     SigHandlerDelegate h2 = delegate void(int signum) {
320         i2++;
321     };
322     foreach(loop; loops) {
323         infof("testing loop '%s'", loop.name);
324         i1 = i2 = 0;
325         auto sighup1 = new Signal(SIGHUP, h1);
326         auto sighup2 = new Signal(SIGHUP, h2);
327         auto sigint1 = new Signal(SIGINT, h2);
328         foreach(s; [sighup1, sighup2, sigint1]) {
329             loop.startSignal(s);
330         }
331         auto parent_pid = getpid();
332         auto child_pid = fork();
333         if ( child_pid == 0 ) {
334             Thread.sleep(500.msecs);
335             kill(parent_pid, SIGHUP);
336             kill(parent_pid, SIGINT);
337             _exit(0);
338         } else {
339             loop.run(1.seconds);
340             waitpid(child_pid, null, 0);
341         }
342         assert(i1 == 1);
343         assert(i2 == 2);
344         foreach(s; [sighup1, sighup2, sigint1]) {
345             loop.stopSignal(s);
346         }
347         loop.run(1.msecs); // send stopSignals to kernel
348     }
349     globalLogLevel = savedloglevel;
350 }
351 
352 unittest {
353     globalLogLevel = LogLevel.info;
354     info(" === Testing sockets ===");
355     import std.string, std.stdio;
356     import hio.socket: hlSocket;
357 
358     auto native = new hlEvLoop();
359     auto fallb = new hlEvLoop(Mode.FALLBACK);
360     auto loops = [native, fallb];
361     foreach(loop; loops) {
362         infof("testing loop '%s'", loop.name);
363         immutable limit = 1;
364         int resuests = 0;
365         int responses = 0;
366         hlSocket client, server;
367         immutable(ubyte)[] input;
368         immutable(ubyte)[] response = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".representation;
369         string request = "GET / HTTP/1.1\r\nId: %d\r\n\r\n";
370     
371         void server_handler(int so) @safe {
372             auto s = new hlSocket(so);
373             tracef("server accepted on %s", s.fileno());
374             IORequest iorq;
375             iorq.to_read = 512;
376             iorq.callback = (IOResult r) {
377                 if ( r.timedout ) {
378                     s.close();
379                     return;
380                 }
381                 s.send(response);
382                 s.close();
383             };
384             s.io(loop, iorq, dur!"seconds"(10));
385         }
386     
387         void client_handler(AppEvent e) @safe {
388             tracef("connection app handler");
389             if ( e & (AppEvent.ERR|AppEvent.HUP) ) {
390                 tracef("error on %s", client);
391                 client.close();
392                 return;
393             }
394             tracef("sending to %s", client);
395             auto rc = client.send(request.format(resuests).representation());
396             if ( rc == -1 ) {
397                 tracef("error on %s", client);
398                 client.close();
399                 return;
400             }
401             tracef("send returned %d", rc);
402             IORequest iorq;
403             iorq.to_read = 512;
404             iorq.callback = (IOResult r) {
405                 if ( r.timedout ) {
406                     info("Client timeout waiting for response");
407                     client.close();
408                     return;
409                 }
410                 // client received response from server
411                 responses++;
412                 client.close();
413                 if ( ++resuests < limit ) {
414                     client = new hlSocket();
415                     client.open();
416                     client.connect("127.0.0.1:16000", loop, &client_handler, dur!"seconds"(5));
417                 }
418             };
419             client.io(loop, iorq, 10.seconds);
420         }
421     
422         server = new hlSocket();
423         server.open();
424         assert(server.fileno() >= 0);
425         scope(exit) {
426             debug tracef("closing server socket %s", server);
427             server.close();
428         }
429         tracef("server listen on %d", server.fileno());
430         server.bind("0.0.0.0:16000");
431         server.listen();
432         server.accept(loop, Duration.max, &server_handler);
433     
434         loop.startTimer(new Timer(50.msecs,  (AppEvent e) @safe {
435             client = new hlSocket();
436             client.open();
437             client.connect("127.0.0.1:16000", loop, &client_handler, 5.seconds);
438         }));
439 
440         loop.startTimer(new Timer(100.msecs,  (AppEvent e) @safe {
441             client = new hlSocket();
442             client.open();
443             client.connect("127.0.0.1:16001", loop, &client_handler, 5.seconds);
444         }));
445     
446         loop.run(1.seconds);
447         assert(responses == limit, "%s != %s".format(responses, limit));
448         globalLogLevel = LogLevel.info;
449     }
450 }