1 module hio.loop; 2 3 import std.traits; 4 import std.datetime; 5 import std.container; 6 import std.exception; 7 import std.experimental.logger; 8 import std.typecons; 9 import hio.drivers; 10 import hio.events; 11 import core.time; 12 13 import core.sys.posix.sys.socket; 14 15 enum Mode:int {NATIVE = 0, FALLBACK} 16 17 shared Mode globalLoopMode = Mode.NATIVE; 18 19 private static hlEvLoop[Mode.FALLBACK+1] _defaultLoop; 20 21 hlEvLoop getDefaultLoop(Mode mode = globalLoopMode) @safe nothrow { 22 if ( _defaultLoop[mode] is null ) { 23 _defaultLoop[mode] = new hlEvLoop(mode); 24 } 25 return _defaultLoop[mode]; 26 } 27 28 shared static this() { 29 uninitializeLoops(); 30 }; 31 32 package void uninitializeLoops() { 33 _defaultLoop[Mode.NATIVE] = null; 34 _defaultLoop[Mode.FALLBACK] = null; 35 } 36 37 version (unittest) 38 { 39 // XXX remove when https://issues.dlang.org/show_bug.cgi?id=20256 fixed 40 extern (C) __gshared string[] rt_options = ["gcopt=parallel:0"]; 41 } 42 43 /// 44 /// disable default signal handling if you plan to handle signal in 45 /// child threads 46 /// 47 void ignoreSignal(int signum) { 48 version (linux) { 49 import core.sys.posix.signal; 50 51 sigset_t m; 52 sigemptyset(&m); 53 sigaddset(&m, signum); 54 pthread_sigmask(SIG_BLOCK, &m, null); 55 } 56 version (OSX) { 57 import core.sys.posix.signal; 58 59 sigset_t m; 60 sigemptyset(&m); 61 sigaddset(&m, signum); 62 pthread_sigmask(SIG_BLOCK, &m, null); 63 } 64 } 65 66 final class hlEvLoop { 67 package: 68 NativeEventLoopImpl _nimpl; 69 FallbackEventLoopImpl _fimpl; 70 string _name; 71 72 public: 73 void delegate(scope Duration = Duration.max) run; 74 @safe void delegate() stop; 75 @safe void delegate(Timer) startTimer; 76 @safe void delegate(Timer) stopTimer; 77 void delegate(Signal) startSignal; 78 void delegate(Signal) stopSignal; 79 @safe void delegate(int, AppEvent, FileEventHandler) startPoll; // start listening to some events 80 @safe void delegate(int, AppEvent) stopPoll; // stop listening to some events 81 @safe void delegate(int) detach; // detach file from loop 82 //@safe void delegate(Notification, Broadcast) postNotification; 83 @safe void delegate() deinit; 84 @safe void delegate(int, FileEventHandler) waitForUserEvent; 85 @safe void delegate(int, FileEventHandler) stopWaitForUserEvent; 86 @safe @nogc int delegate() getKernelId; 87 88 89 public: 90 string name() const pure nothrow @safe @property { 91 return _name; 92 } 93 this(Mode m = Mode.NATIVE) @safe nothrow { 94 switch(m) { 95 case Mode.NATIVE: 96 _name = _nimpl._name; 97 _nimpl.initialize(); 98 run = &_nimpl.run; 99 stop = &_nimpl.stop; 100 startTimer = &_nimpl.start_timer; 101 stopTimer = &_nimpl.stop_timer; 102 startSignal = &_nimpl.start_signal; 103 stopSignal = &_nimpl.stop_signal; 104 startPoll = &_nimpl.start_poll; 105 stopPoll = &_nimpl.stop_poll; 106 detach = &_nimpl.detach; 107 //postNotification = &_nimpl.postNotification; 108 deinit = &_nimpl.deinit; 109 waitForUserEvent = &_nimpl.wait_for_user_event; 110 stopWaitForUserEvent = &_nimpl.stop_wait_for_user_event; 111 getKernelId = &_nimpl.get_kernel_id; 112 break; 113 case Mode.FALLBACK: 114 _name = _fimpl._name; 115 _fimpl.initialize(); 116 run = &_fimpl.run; 117 stop = &_fimpl.stop; 118 startTimer = &_fimpl.start_timer; 119 stopTimer = &_fimpl.stop_timer; 120 startSignal = &_fimpl.start_signal; 121 stopSignal = &_fimpl.stop_signal; 122 startPoll = &_fimpl.start_poll; 123 stopPoll = &_fimpl.stop_poll; 124 detach = &_fimpl.detach; 125 //postNotification = &_fimpl.postNotification; 126 deinit = &_fimpl.deinit; 127 waitForUserEvent = &_fimpl.wait_for_user_event; 128 stopWaitForUserEvent = &_fimpl.stop_wait_for_user_event; 129 getKernelId = &_fimpl.get_kernel_id; 130 break; 131 default: 132 assert(0, "Unknown loop mode"); 133 } 134 } 135 } 136 137 unittest { 138 import std.stdio; 139 globalLogLevel = LogLevel.info; 140 auto best_loop = getDefaultLoop(); 141 auto fallback_loop = getDefaultLoop(Mode.FALLBACK); 142 writefln("Native event loop: %s", best_loop.name); 143 writefln("Fallback event loop: %s", fallback_loop.name); 144 } 145 146 unittest { 147 info(" === test 'stop before start' ==="); 148 // stop before start 149 auto native = new hlEvLoop(); 150 auto fallb = new hlEvLoop(Mode.FALLBACK); 151 auto loops = [native, fallb]; 152 foreach(loop; loops) 153 { 154 infof(" --- '%s' loop ---", loop.name); 155 auto now = Clock.currTime; 156 loop.stop(); 157 loop.run(1.seconds); 158 assert(Clock.currTime - now < 1.seconds); 159 } 160 } 161 162 unittest { 163 info(" === Testing timers ==="); 164 } 165 @safe unittest { 166 int i1, i2; 167 HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;}; 168 HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;}; 169 { 170 auto now = Clock.currTime; 171 Timer a = new Timer(now, h1); 172 Timer b = new Timer(now, h1); 173 assert(b>a); 174 } 175 { 176 auto now = Clock.currTime; 177 Timer a = new Timer(now, h1); 178 Timer b = new Timer(now + 1.seconds, h1); 179 assert(b>a); 180 } 181 } 182 183 unittest { 184 import core.thread; 185 int i1, i2; 186 HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;}; 187 HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;}; 188 auto native = new hlEvLoop(); 189 auto fallb = new hlEvLoop(Mode.FALLBACK); 190 auto loops = [native, fallb]; 191 foreach(loop; loops) 192 { 193 globalLogLevel = LogLevel.info; 194 infof(" --- '%s' loop ---", loop.name); 195 i1 = i2 = 0; 196 info("test start/stop timer before loop run"); 197 /** test startTimer, and then stopTimer before runLoop */ 198 auto now = Clock.currTime; 199 Timer a = new Timer(now + 100.msecs, h1); 200 Timer b = new Timer(now + 1000.msecs, h2); 201 loop.startTimer(a); 202 loop.startTimer(b); 203 loop.stopTimer(b); 204 loop.run(200.msecs); 205 assert(i1==1); 206 assert(i2==0); 207 208 /** stop event loop inside from timer **/ 209 infof("stop event loop inside from timer"); 210 now = Clock.currTime; 211 212 i1 = 0; 213 a = new Timer(10.msecs, (AppEvent e) @safe { 214 loop.stop(); 215 }); 216 b = new Timer(110.msecs, h1); 217 loop.startTimer(a); 218 loop.startTimer(b); 219 loop.run(Duration.max); 220 assert(i1 == 0); 221 loop.stopTimer(b); 222 223 info("test pending timer events"); 224 loop.run(50.msecs); 225 i1 = 0; 226 a = new Timer(50.msecs, h1); 227 loop.startTimer(a); 228 loop.run(10.msecs); 229 assert(i1==0); 230 Thread.sleep(45.msecs); 231 loop.run(0.seconds); 232 assert(i1==1); 233 234 info("testing overdue timers"); 235 int[] seq; 236 auto slow = delegate void(AppEvent e) @trusted {Thread.sleep(20.msecs); seq ~= 1;}; 237 auto fast = delegate void(AppEvent e) @safe {seq ~= 2;}; 238 a = new Timer(50.msecs, slow); 239 b = new Timer(60.msecs, fast); 240 loop.startTimer(a); 241 loop.startTimer(b); 242 loop.run(100.msecs); 243 assert(seq == [1,2]); 244 245 a = new Timer(-5.seconds, fast); 246 loop.startTimer(a); 247 loop.run(0.seconds); 248 249 assert(seq == [1,2,2]); 250 251 //globalLogLevel = LogLevel.trace; 252 seq = new int[](0); 253 /** test setting overdue timer inside from overdue timer **/ 254 auto set_next = delegate void(AppEvent e) { 255 b = new Timer(-10.seconds, fast); 256 loop.startTimer(b); 257 }; 258 a = new Timer(-5.seconds, set_next); 259 loop.startTimer(a); 260 loop.run(10.msecs); 261 assert(seq == [2]); 262 263 seq = new int[](0); 264 /** test setting overdue timer inside from normal timer **/ 265 set_next = delegate void(AppEvent e) { 266 b = new Timer(-10.seconds, fast); 267 loop.startTimer(b); 268 }; 269 a = new Timer(50.msecs, set_next); 270 loop.startTimer(a); 271 loop.run(60.msecs); 272 assert(seq == [2]); 273 274 info("timer execution order"); 275 now = Clock.currTime; 276 seq.length = 0; 277 HandlerDelegate sh1 = delegate void(AppEvent e) {seq ~= 1;}; 278 HandlerDelegate sh2 = delegate void(AppEvent e) {seq ~= 2;}; 279 HandlerDelegate sh3 = delegate void(AppEvent e) {seq ~= 3;}; 280 assertThrown(new Timer(SysTime.init, null)); 281 a = new Timer(now + 500.msecs, sh1); 282 b = new Timer(now + 500.msecs, sh2); 283 Timer c = new Timer(now + 300.msecs, sh3); 284 loop.startTimer(a); 285 loop.startTimer(b); 286 loop.startTimer(c); 287 loop.run(510.msecs); 288 assert(seq == [3, 1, 2]); 289 290 info("exception handling in timer"); 291 HandlerDelegate throws = delegate void(AppEvent e){throw new Exception("test exception");}; 292 a = new Timer(50.msecs, throws); 293 loop.startTimer(a); 294 auto logLevel = globalLogLevel; 295 globalLogLevel = LogLevel.fatal; 296 loop.run(100.msecs); 297 globalLogLevel = logLevel; 298 infof(" --- end ---"); 299 } 300 301 } 302 303 unittest { 304 info("=== Testing signals ==="); 305 auto savedloglevel = globalLogLevel; 306 //globalLogLevel = LogLevel.trace; 307 import core.sys.posix.signal; 308 import core.thread; 309 import core.sys.posix.unistd; 310 import core.sys.posix.sys.wait; 311 312 int i1, i2; 313 auto native = new hlEvLoop(); 314 auto fallb = new hlEvLoop(Mode.FALLBACK); 315 auto loops = [native, fallb]; 316 SigHandlerDelegate h1 = delegate void(int signum) { 317 i1++; 318 }; 319 SigHandlerDelegate h2 = delegate void(int signum) { 320 i2++; 321 }; 322 foreach(loop; loops) { 323 infof("testing loop '%s'", loop.name); 324 i1 = i2 = 0; 325 auto sighup1 = new Signal(SIGHUP, h1); 326 auto sighup2 = new Signal(SIGHUP, h2); 327 auto sigint1 = new Signal(SIGINT, h2); 328 foreach(s; [sighup1, sighup2, sigint1]) { 329 loop.startSignal(s); 330 } 331 auto parent_pid = getpid(); 332 auto child_pid = fork(); 333 if ( child_pid == 0 ) { 334 Thread.sleep(500.msecs); 335 kill(parent_pid, SIGHUP); 336 kill(parent_pid, SIGINT); 337 _exit(0); 338 } else { 339 loop.run(1.seconds); 340 waitpid(child_pid, null, 0); 341 } 342 assert(i1 == 1); 343 assert(i2 == 2); 344 foreach(s; [sighup1, sighup2, sigint1]) { 345 loop.stopSignal(s); 346 } 347 loop.run(1.msecs); // send stopSignals to kernel 348 } 349 globalLogLevel = savedloglevel; 350 } 351 352 unittest { 353 globalLogLevel = LogLevel.info; 354 info(" === Testing sockets ==="); 355 import std.string, std.stdio; 356 import hio.socket: hlSocket; 357 358 auto native = new hlEvLoop(); 359 auto fallb = new hlEvLoop(Mode.FALLBACK); 360 auto loops = [native, fallb]; 361 foreach(loop; loops) { 362 infof("testing loop '%s'", loop.name); 363 immutable limit = 1; 364 int resuests = 0; 365 int responses = 0; 366 hlSocket client, server; 367 immutable(ubyte)[] input; 368 immutable(ubyte)[] response = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".representation; 369 string request = "GET / HTTP/1.1\r\nId: %d\r\n\r\n"; 370 371 void server_handler(int so) @safe { 372 auto s = new hlSocket(so); 373 tracef("server accepted on %s", s.fileno()); 374 IORequest iorq; 375 iorq.to_read = 512; 376 iorq.callback = (IOResult r) { 377 if ( r.timedout ) { 378 s.close(); 379 return; 380 } 381 s.send(response); 382 s.close(); 383 }; 384 s.io(loop, iorq, dur!"seconds"(10)); 385 } 386 387 void client_handler(AppEvent e) @safe { 388 tracef("connection app handler"); 389 if ( e & (AppEvent.ERR|AppEvent.HUP) ) { 390 tracef("error on %s", client); 391 client.close(); 392 return; 393 } 394 tracef("sending to %s", client); 395 auto rc = client.send(request.format(resuests).representation()); 396 if ( rc == -1 ) { 397 tracef("error on %s", client); 398 client.close(); 399 return; 400 } 401 tracef("send returned %d", rc); 402 IORequest iorq; 403 iorq.to_read = 512; 404 iorq.callback = (IOResult r) { 405 if ( r.timedout ) { 406 info("Client timeout waiting for response"); 407 client.close(); 408 return; 409 } 410 // client received response from server 411 responses++; 412 client.close(); 413 if ( ++resuests < limit ) { 414 client = new hlSocket(); 415 client.open(); 416 client.connect("127.0.0.1:16000", loop, &client_handler, dur!"seconds"(5)); 417 } 418 }; 419 client.io(loop, iorq, 10.seconds); 420 } 421 422 server = new hlSocket(); 423 server.open(); 424 assert(server.fileno() >= 0); 425 scope(exit) { 426 debug tracef("closing server socket %s", server); 427 server.close(); 428 } 429 tracef("server listen on %d", server.fileno()); 430 server.bind("0.0.0.0:16000"); 431 server.listen(); 432 server.accept(loop, Duration.max, &server_handler); 433 434 loop.startTimer(new Timer(50.msecs, (AppEvent e) @safe { 435 client = new hlSocket(); 436 client.open(); 437 client.connect("127.0.0.1:16000", loop, &client_handler, 5.seconds); 438 })); 439 440 loop.startTimer(new Timer(100.msecs, (AppEvent e) @safe { 441 client = new hlSocket(); 442 client.open(); 443 client.connect("127.0.0.1:16001", loop, &client_handler, 5.seconds); 444 })); 445 446 loop.run(1.seconds); 447 assert(responses == limit, "%s != %s".format(responses, limit)); 448 globalLogLevel = LogLevel.info; 449 } 450 }