1 module hio.loop; 2 3 import std.traits; 4 import std.datetime; 5 import std.container; 6 import std.exception; 7 import std.experimental.logger; 8 import std.typecons; 9 import hio.drivers; 10 import hio.events; 11 import core.time; 12 13 import core.sys.posix.sys.socket; 14 15 enum Mode:int {NATIVE = 0, FALLBACK} 16 17 shared Mode globalLoopMode = Mode.NATIVE; 18 19 private static hlEvLoop[Mode.FALLBACK+1] _defaultLoop; 20 21 hlEvLoop getDefaultLoop(Mode mode = globalLoopMode) @safe nothrow { 22 if ( _defaultLoop[mode] is null ) { 23 _defaultLoop[mode] = new hlEvLoop(mode); 24 } 25 return _defaultLoop[mode]; 26 } 27 28 shared static this() { 29 uninitializeLoops(); 30 } 31 32 shared static ~this() { 33 uninitializeLoops(); 34 } 35 36 package void uninitializeLoops() { 37 if (_defaultLoop[Mode.NATIVE]) 38 { 39 _defaultLoop[Mode.NATIVE].deinit(); 40 _defaultLoop[Mode.NATIVE] = null; 41 } 42 if (_defaultLoop[Mode.FALLBACK]) 43 { 44 _defaultLoop[Mode.FALLBACK].deinit(); 45 _defaultLoop[Mode.FALLBACK] = null; 46 } 47 } 48 49 version (unittest) 50 { 51 // XXX remove when https://issues.dlang.org/show_bug.cgi?id=20256 fixed 52 extern (C) __gshared string[] rt_options = ["gcopt=parallel:0"]; 53 } 54 55 /// 56 /// disable default signal handling if you plan to handle signal in 57 /// child threads 58 /// 59 void ignoreSignal(int signum) { 60 version (linux) { 61 import core.sys.posix.signal; 62 63 sigset_t m; 64 sigemptyset(&m); 65 sigaddset(&m, signum); 66 pthread_sigmask(SIG_BLOCK, &m, null); 67 } 68 version (OSX) { 69 import core.sys.posix.signal; 70 71 sigset_t m; 72 sigemptyset(&m); 73 sigaddset(&m, signum); 74 pthread_sigmask(SIG_BLOCK, &m, null); 75 } 76 version (FreeBSD) { 77 import core.sys.posix.signal; 78 79 sigset_t m; 80 sigemptyset(&m); 81 sigaddset(&m, signum); 82 pthread_sigmask(SIG_BLOCK, &m, null); 83 } 84 } 85 86 final class hlEvLoop { 87 package: 88 NativeEventLoopImpl _nimpl; 89 FallbackEventLoopImpl _fimpl; 90 string _name; 91 92 public: 93 void delegate(scope Duration = Duration.max) run; 94 @safe void delegate() stop; 95 @safe void delegate() shutdown; 96 @safe void delegate(Timer) startTimer; 97 @safe void delegate(Timer) stopTimer; 98 void delegate(Signal) startSignal; 99 void delegate(Signal) stopSignal; 100 @safe void delegate(int, AppEvent, FileEventHandler) startPoll; // start listening to some events 101 @safe void delegate(int, AppEvent) stopPoll; // stop listening to some events 102 @safe void delegate(int) detach; // detach file from loop 103 //@safe void delegate(Notification, Broadcast) postNotification; 104 @safe void delegate() deinit; 105 @safe void delegate(int, FileEventHandler) waitForUserEvent; 106 @safe void delegate(int, FileEventHandler) stopWaitForUserEvent; 107 @safe @nogc int delegate() getKernelId; 108 109 110 public: 111 string name() const pure nothrow @safe @property { 112 return _name; 113 } 114 this(Mode m = Mode.NATIVE) @safe nothrow { 115 switch(m) { 116 case Mode.NATIVE: 117 _name = _nimpl._name; 118 _nimpl.initialize(); 119 run = &_nimpl.run; 120 stop = &_nimpl.stop; 121 shutdown = &_nimpl.shutdown; 122 startTimer = &_nimpl.start_timer; 123 stopTimer = &_nimpl.stop_timer; 124 startSignal = &_nimpl.start_signal; 125 stopSignal = &_nimpl.stop_signal; 126 startPoll = &_nimpl.start_poll; 127 stopPoll = &_nimpl.stop_poll; 128 detach = &_nimpl.detach; 129 //postNotification = &_nimpl.postNotification; 130 deinit = &_nimpl.deinit; 131 waitForUserEvent = &_nimpl.wait_for_user_event; 132 stopWaitForUserEvent = &_nimpl.stop_wait_for_user_event; 133 getKernelId = &_nimpl.get_kernel_id; 134 break; 135 case Mode.FALLBACK: 136 _name = _fimpl._name; 137 _fimpl.initialize(); 138 run = &_fimpl.run; 139 stop = &_fimpl.stop; 140 shutdown = &_nimpl.shutdown; 141 startTimer = &_fimpl.start_timer; 142 stopTimer = &_fimpl.stop_timer; 143 startSignal = &_fimpl.start_signal; 144 stopSignal = &_fimpl.stop_signal; 145 startPoll = &_fimpl.start_poll; 146 stopPoll = &_fimpl.stop_poll; 147 detach = &_fimpl.detach; 148 //postNotification = &_fimpl.postNotification; 149 deinit = &_fimpl.deinit; 150 waitForUserEvent = &_fimpl.wait_for_user_event; 151 stopWaitForUserEvent = &_fimpl.stop_wait_for_user_event; 152 getKernelId = &_fimpl.get_kernel_id; 153 break; 154 default: 155 assert(0, "Unknown loop mode"); 156 } 157 } 158 } 159 160 unittest { 161 import std.stdio; 162 globalLogLevel = LogLevel.info; 163 auto best_loop = getDefaultLoop(); 164 auto fallback_loop = getDefaultLoop(Mode.FALLBACK); 165 writefln("Native event loop: %s", best_loop.name); 166 writefln("Fallback event loop: %s", fallback_loop.name); 167 uninitializeLoops(); 168 } 169 170 unittest { 171 info(" === test 'stop before start' ==="); 172 // stop before start 173 auto native = new hlEvLoop(); 174 auto fallb = new hlEvLoop(Mode.FALLBACK); 175 auto loops = [native, fallb]; 176 foreach(loop; loops) 177 { 178 infof(" --- '%s' loop ---", loop.name); 179 auto now = Clock.currTime; 180 loop.stop(); 181 loop.run(1.seconds); 182 assert(Clock.currTime - now < 1.seconds); 183 } 184 native.deinit(); 185 fallb.deinit(); 186 } 187 188 unittest { 189 info(" === Testing timers ==="); 190 } 191 @safe unittest { 192 int i1, i2; 193 HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;}; 194 HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;}; 195 { 196 auto now = Clock.currTime; 197 Timer a = new Timer(now, h1); 198 Timer b = new Timer(now, h1); 199 assert(b>a); 200 } 201 { 202 auto now = Clock.currTime; 203 Timer a = new Timer(now, h1); 204 Timer b = new Timer(now + 1.seconds, h1); 205 assert(b>a); 206 } 207 } 208 209 unittest { 210 import core.thread; 211 import std.format; 212 int i1, i2; 213 HandlerDelegate h1 = delegate void(AppEvent e) {tracef("h1 called");i1++;}; 214 HandlerDelegate h2 = delegate void(AppEvent e) {tracef("h2 called");i2++;}; 215 auto native = new hlEvLoop(); 216 auto fallb = new hlEvLoop(Mode.FALLBACK); 217 auto loops = [native, fallb]; 218 foreach(loop; loops) 219 { 220 globalLogLevel = LogLevel.info; 221 infof(" --- '%s' loop ---", loop.name); 222 i1 = i2 = 0; 223 info("test start/stop timer before loop run"); 224 /** test startTimer, and then stopTimer before runLoop */ 225 auto now = Clock.currTime; 226 Timer a = new Timer(now + 100.msecs, h1); 227 Timer b = new Timer(now + 1000.msecs, h2); 228 loop.startTimer(a); 229 loop.startTimer(b); 230 loop.stopTimer(b); 231 loop.run(200.msecs); 232 assert(i1==1); 233 assert(i2==0); 234 235 /** stop event loop inside from timer **/ 236 infof("stop event loop inside from timer"); 237 now = Clock.currTime; 238 239 i1 = 0; 240 a = new Timer(10.msecs, (AppEvent e) @safe { 241 loop.stop(); 242 }); 243 b = new Timer(110.msecs, h1); 244 loop.startTimer(a); 245 loop.startTimer(b); 246 loop.run(Duration.max); 247 assert(i1 == 0); 248 loop.stopTimer(b); 249 250 info("test pending timer events"); 251 loop.run(50.msecs); 252 i1 = 0; 253 a = new Timer(50.msecs, h1); 254 loop.startTimer(a); 255 loop.run(10.msecs); 256 assert(i1==0, "i1 expected 0, got %d".format(i1)); 257 Thread.sleep(45.msecs); 258 loop.run(0.seconds); 259 assert(i1==1, "i1 expected 1, got %d".format(i1)); 260 261 info("testing overdue timers"); 262 int[] seq; 263 auto slow = delegate void(AppEvent e) @trusted {Thread.sleep(20.msecs); seq ~= 1;}; 264 auto fast = delegate void(AppEvent e) @safe {seq ~= 2;}; 265 a = new Timer(50.msecs, slow); 266 b = new Timer(60.msecs, fast); 267 loop.startTimer(a); 268 loop.startTimer(b); 269 loop.run(100.msecs); 270 assert(seq == [1,2]); 271 272 a = new Timer(-5.seconds, fast); 273 loop.startTimer(a); 274 loop.run(0.seconds); 275 276 assert(seq == [1,2,2]); 277 278 seq = new int[](0); 279 /** test setting overdue timer inside from overdue timer **/ 280 auto set_next = delegate void(AppEvent e) { 281 b = new Timer(-10.seconds, fast); 282 loop.startTimer(b); 283 }; 284 a = new Timer(-5.seconds, set_next); 285 loop.startTimer(a); 286 loop.run(10.msecs); 287 assert(seq == [2]); 288 289 seq = new int[](0); 290 /** test setting overdue timer inside from normal timer **/ 291 set_next = delegate void(AppEvent e) { 292 b = new Timer(-10.seconds, fast); 293 loop.startTimer(b); 294 }; 295 a = new Timer(50.msecs, set_next); 296 loop.startTimer(a); 297 loop.run(60.msecs); 298 assert(seq == [2]); 299 300 info("timer execution order"); 301 now = Clock.currTime; 302 seq.length = 0; 303 HandlerDelegate sh1 = delegate void(AppEvent e) {seq ~= 1;}; 304 HandlerDelegate sh2 = delegate void(AppEvent e) {seq ~= 2;}; 305 HandlerDelegate sh3 = delegate void(AppEvent e) {seq ~= 3;}; 306 assertThrown(new Timer(SysTime.init, null)); 307 a = new Timer(now + 500.msecs, sh1); 308 b = new Timer(now + 500.msecs, sh2); 309 Timer c = new Timer(now + 300.msecs, sh3); 310 loop.startTimer(a); 311 loop.startTimer(b); 312 loop.startTimer(c); 313 loop.run(510.msecs); 314 //assert(seq == [3, 1, 2]); // this should work only with precise 315 316 // info("exception handling in timer"); 317 // HandlerDelegate throws = delegate void(AppEvent e){throw new Exception("test exception");}; 318 // a = new Timer(50.msecs, throws); 319 // loop.startTimer(a); 320 // auto logLevel = globalLogLevel; 321 // globalLogLevel = LogLevel.fatal; 322 // loop.run(100.msecs); 323 // globalLogLevel = logLevel; 324 // infof(" --- end ---"); 325 } 326 native.deinit(); 327 fallb.deinit(); 328 } 329 330 unittest { 331 info("=== Testing signals ==="); 332 auto savedloglevel = globalLogLevel; 333 //globalLogLevel = LogLevel.trace; 334 import core.sys.posix.signal; 335 import core.thread; 336 import core.sys.posix.unistd; 337 import core.sys.posix.sys.wait; 338 import core.stdc.stdlib: exit; 339 340 int i1, i2; 341 auto native = new hlEvLoop(); 342 auto fallb = new hlEvLoop(Mode.FALLBACK); 343 auto loops = [native, fallb]; 344 SigHandlerDelegate h1 = delegate void(int signum) { 345 i1++; 346 }; 347 SigHandlerDelegate h2 = delegate void(int signum) { 348 i2++; 349 }; 350 foreach(loop; loops) { 351 infof("testing loop '%s'", loop.name); 352 i1 = i2 = 0; 353 auto sighup1 = new Signal(SIGHUP, h1); 354 auto sighup2 = new Signal(SIGHUP, h2); 355 auto sigint1 = new Signal(SIGINT, h2); 356 foreach(s; [sighup1, sighup2, sigint1]) { 357 loop.startSignal(s); 358 } 359 auto parent_pid = getpid(); 360 auto child_pid = fork(); 361 if ( child_pid == 0 ) { 362 Thread.sleep(500.msecs); 363 kill(parent_pid, SIGHUP); 364 kill(parent_pid, SIGINT); 365 native.deinit(); 366 fallb.deinit(); 367 exit(0); 368 } else { 369 loop.run(1.seconds); 370 waitpid(child_pid, null, 0); 371 } 372 assert(i1 == 1); 373 assert(i2 == 2); 374 foreach(s; [sighup1, sighup2, sigint1]) { 375 loop.stopSignal(s); 376 } 377 loop.run(1.msecs); // send stopSignals to kernel 378 } 379 native.deinit(); 380 fallb.deinit(); 381 globalLogLevel = savedloglevel; 382 } 383 384 unittest { 385 globalLogLevel = LogLevel.info; 386 info(" === Testing sockets ==="); 387 import std.string, std.stdio; 388 import hio.socket; 389 390 auto native = new hlEvLoop(); 391 auto fallb = new hlEvLoop(Mode.FALLBACK); 392 auto loops = [native, fallb]; 393 foreach(loop; loops) { 394 infof("testing loop '%s'", loop.name); 395 immutable limit = 1; 396 int requests = 0; 397 int responses = 0; 398 hlSocket client, server; 399 immutable(ubyte)[] input; 400 immutable(ubyte)[] response = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".representation; 401 string request = "GET / HTTP/1.1\r\nId: %d\r\n\r\n"; 402 403 void server_handler(AsyncSocketLike so) @safe { 404 auto s = cast(hlSocket)so; 405 tracef("server accepted on %s", s.fileno()); 406 IORequest iorq; 407 iorq.to_read = 512; 408 iorq.callback = (ref IOResult r) { 409 if ( r.timedout ) { 410 s.close(); 411 return; 412 } 413 s.send(response); 414 s.close(); 415 }; 416 s.io(loop, iorq, dur!"seconds"(10)); 417 } 418 419 void client_handler(AppEvent e) @safe { 420 tracef("connection app handler"); 421 if ( e & (AppEvent.ERR|AppEvent.HUP) ) { 422 tracef("error on %s", client); 423 client.close(); 424 return; 425 } 426 tracef("sending to %s", client); 427 auto rc = client.send(request.format(requests).representation()); 428 if ( rc == -1 ) { 429 tracef("error on %s", client); 430 client.close(); 431 return; 432 } 433 tracef("send returned %d", rc); 434 IORequest iorq; 435 iorq.to_read = 512; 436 iorq.callback = (ref IOResult r) { 437 if ( r.timedout ) { 438 info("Client timeout waiting for response"); 439 client.close(); 440 return; 441 } 442 // client received response from server 443 responses++; 444 client.close(); 445 if ( ++requests < limit ) { 446 client = new hlSocket(); 447 client.open(); 448 client.connect("127.0.0.1:16000", loop, &client_handler, dur!"seconds"(5)); 449 } 450 }; 451 client.io(loop, iorq, 10.seconds); 452 } 453 454 server = new hlSocket(); 455 server.open(); 456 assert(server.fileno() >= 0); 457 scope(exit) { 458 debug tracef("closing server socket %s", server); 459 server.close(); 460 } 461 tracef("server listen on %d", server.fileno()); 462 server.bind("0.0.0.0:16000"); 463 server.listen(); 464 server.accept(loop, Duration.max, &server_handler); 465 466 loop.startTimer(new Timer(50.msecs, (AppEvent e) @safe { 467 client = new hlSocket(); 468 client.open(); 469 client.connect("127.0.0.1:16000", loop, &client_handler, 5.seconds); 470 })); 471 472 loop.startTimer(new Timer(100.msecs, (AppEvent e) @safe { 473 client = new hlSocket(); 474 client.open(); 475 client.connect("127.0.0.1:16001", loop, &client_handler, 5.seconds); 476 })); 477 478 loop.run(1.seconds); 479 assert(responses == limit, "%s != %s".format(responses, limit)); 480 globalLogLevel = LogLevel.info; 481 } 482 native.deinit(); 483 fallb.deinit(); 484 }