1 module hio.drivers.kqueue; 2 3 version(FreeBSD) { 4 version = KQUEUE; 5 } 6 version(OSX) { 7 version = KQUEUE; 8 } 9 10 version(KQUEUE): 11 12 import std.algorithm; 13 import std.datetime; 14 import std.conv; 15 import std.string; 16 import std.container; 17 import std.stdio; 18 import std.exception; 19 import std.experimental.logger; 20 import std.typecons; 21 import std.experimental.allocator; 22 import std.experimental.allocator.mallocator; 23 24 import core.memory; 25 26 import std.algorithm.comparison: max; 27 import core.sys.posix.fcntl: open, O_RDONLY; 28 import core.sys.posix.unistd: close; 29 30 version(OSX) { 31 import core.sys.darwin.sys.event; 32 } 33 version(FreeBSD) { 34 import core.sys.freebsd.sys.event; 35 } 36 37 import core.sys.posix.signal; 38 import core.stdc.stdint : intptr_t, uintptr_t; 39 import core.stdc.string: strerror; 40 import core.stdc.errno: errno; 41 42 import timingwheels; 43 44 import hio.events; 45 import hio.common; 46 47 //enum : short { 48 // EVFILT_READ = (-1), 49 // EVFILT_WRITE = (-2), 50 // EVFILT_AIO = (-3), /* attached to aio requests */ 51 // EVFILT_VNODE = (-4), /* attached to vnodes */ 52 // EVFILT_PROC = (-5), /* attached to struct proc */ 53 // EVFILT_SIGNAL = (-6), /* attached to struct proc */ 54 // EVFILT_TIMER = (-7), /* timers */ 55 // EVFILT_MACHPORT = (-8), /* Mach portsets */ 56 // EVFILT_FS = (-9), /* Filesystem events */ 57 // EVFILT_USER = (-10), /* User events */ 58 // /* (-11) unused */ 59 // EVFILT_VM = (-12) /* Virtual memory events */ 60 //} 61 62 //enum : ushort { 63 ///* actions */ 64 // EV_ADD = 0x0001, /* add event to kq (implies enable) */ 65 // EV_DELETE = 0x0002, /* delete event from kq */ 66 // EV_ENABLE = 0x0004, /* enable event */ 67 // EV_DISABLE = 0x0008 /* disable event (not reported) */ 68 //} 69 70 //struct kevent_t { 71 // uintptr_t ident; /* identifier for this event */ 72 // short filter; /* filter for event */ 73 // ushort flags; /* general flags */ 74 // uint fflags; /* filter-specific flags */ 75 // intptr_t data; /* filter-specific data */ 76 // void* udata; 77 //} 78 79 //extern(C) int kqueue() @safe @nogc nothrow; 80 //extern(C) int kevent(int kqueue_fd, const kevent_t *events, int ne, const kevent_t *events, int ne,timespec* timeout) @safe @nogc nothrow; 81 82 auto s_kevent(A...)(A args) @trusted @nogc nothrow { 83 return kevent(args); 84 } 85 86 Timer udataToTimer(T)(T udata) @trusted { 87 return cast(Timer)udata; 88 } 89 90 struct NativeEventLoopImpl { 91 immutable bool native = true; 92 immutable string _name = "kqueue"; 93 @disable this(this) {} 94 private { 95 bool stopped = false; 96 bool inshutdown = false; 97 enum MAXEVENTS = 512; 98 enum TOTAL_FILE_HANDLERS = 16*1024; 99 int kqueue_fd = -1; // interface to kernel 100 int in_index; 101 int ready; 102 103 timespec ts; 104 105 kevent_t[MAXEVENTS] in_events; 106 kevent_t[MAXEVENTS] out_events; 107 108 Duration tick = 5.msecs; 109 TimingWheels!Timer timingwheels; 110 Timer[] overdue; // timers added with expiration in past placed here 111 112 Signal[][int] signals; // this is signals container 113 114 FileEventHandler[] fileHandlers; 115 116 } 117 void initialize() @trusted nothrow 118 { 119 if ( kqueue_fd == -1) { 120 kqueue_fd = kqueue(); 121 } 122 debug(hiokqueue) safe_tracef("kqueue_fd=%d", kqueue_fd); 123 fileHandlers = Mallocator.instance.makeArray!FileEventHandler(TOTAL_FILE_HANDLERS); 124 GC.addRange(fileHandlers.ptr, fileHandlers.length*FileEventHandler.sizeof); 125 timingwheels.init(); 126 } 127 void deinit() @trusted { 128 debug tracef("deinit"); 129 if ( kqueue_fd != -1 ) 130 { 131 close(kqueue_fd); 132 kqueue_fd = -1; 133 } 134 in_index = 0; 135 GC.removeRange(&fileHandlers[0]); 136 Mallocator.instance.dispose(fileHandlers); 137 } 138 int get_kernel_id() pure @safe nothrow @nogc { 139 return kqueue_fd; 140 } 141 void stop() @safe pure { 142 debug trace("mark eventloop as stopped"); 143 stopped = true; 144 } 145 146 timespec _calculate_timespec(SysTime deadline) @safe { 147 timespec ts; 148 149 auto now_real = Clock.currTime; 150 Duration delta = deadline - now_real; 151 debug(hiokqueue) safe_tracef("deadline - now_real: %s", delta); 152 auto nextTWtimer = timingwheels.timeUntilNextEvent(tick, now_real.stdTime); 153 debug(hiokqueue) safe_tracef("nextTWtimer: %s", nextTWtimer); 154 delta = min(delta, nextTWtimer); 155 156 delta = max(delta, 0.seconds); 157 158 debug(hiokqueue) safe_tracef("delta = %s", delta); 159 auto ds = delta.split!("seconds", "nsecs"); 160 ts.tv_sec = cast(typeof(timespec.tv_sec))ds.seconds; 161 ts.tv_nsec = cast(typeof(timespec.tv_nsec))ds.nsecs; 162 return ts; 163 } 164 165 private void execute_overdue_timers() @safe 166 { 167 int limit = 100; 168 while (overdue.length > 0 && --limit > 0 ) 169 { 170 // execute timers which user requested with negative delay 171 Timer t = overdue[0]; 172 overdue = overdue[1..$]; 173 if ( t is null ) 174 { 175 // timer was removed 176 continue; 177 } 178 debug(hiokqueue) tracef("execute overdue %s", t); 179 HandlerDelegate h = t._handler; 180 try { 181 if (inshutdown) 182 { 183 h(AppEvent.SHUTDOWN); 184 } 185 else 186 { 187 h(AppEvent.TMO); 188 } 189 } catch (Exception e) { 190 import core.stdc.stdio: stderr, fprintf; 191 () @trusted {fprintf(stderr, "Uncaught exception: %s", toStringz(e.toString));}(); 192 } 193 } 194 } 195 void shutdown() @safe 196 in(!stopped && !inshutdown) 197 { 198 // scan through all timers 199 auto a = timingwheels.allTimers(); 200 foreach(t; a.timers) 201 { 202 try 203 { 204 debug(hiokqueue) tracef("send shutdown to timer %s", t); 205 t._handler(AppEvent.SHUTDOWN); 206 } 207 catch(Exception e) 208 { 209 errorf("Error on event SHUTDOWN in %s", t); 210 } 211 } 212 // scan through active file descriptors 213 // and send SHUTDOWN event 214 if (fileHandlers is null) 215 { 216 inshutdown = true; 217 return; 218 } 219 for(int i=0;i<TOTAL_FILE_HANDLERS;i++) 220 { 221 if (fileHandlers[i] is null) 222 { 223 continue; 224 } 225 try 226 { 227 debug(hiokqueue) tracef("send shutdown to filehandler %s", fileHandlers[i]); 228 fileHandlers[i].eventHandler(i, AppEvent.SHUTDOWN); 229 } 230 catch(Exception e) 231 { 232 () @trusted {printf("exception: %s:%d: %s", __FILE__.ptr, __LINE__, toStringz(e.toString));}(); 233 } 234 } 235 inshutdown = true; 236 } 237 void run(Duration d) @safe { 238 239 immutable bool runInfinitely = (d == Duration.max); 240 SysTime deadline; 241 242 if ( !runInfinitely ) { 243 deadline = Clock.currTime + d; 244 } 245 else 246 { 247 deadline = SysTime.max; 248 } 249 250 debug tracef("evl run for %s", d); 251 252 scope(exit) { 253 stopped = false; 254 } 255 256 while(!stopped) 257 { 258 // handle overdue timers 259 execute_overdue_timers(); 260 261 if (stopped) { 262 break; 263 } 264 ts = _calculate_timespec(deadline); 265 266 debug(hiokqueue) safe_tracef("waiting for %s", ts); 267 debug(hiokqueue) safe_tracef("waiting events %s", in_events[0..in_index]); 268 269 ready = s_kevent(kqueue_fd, 270 cast(kevent_t*)&in_events[0], in_index, 271 cast(kevent_t*)&out_events[0], MAXEVENTS, 272 &ts); 273 SysTime now_real = Clock.currTime; 274 in_index = 0; 275 debug tracef("kevent returned %d events", ready); 276 debug tracef(""); 277 278 279 if ( ready < 0 ) { 280 error("kevent returned error %s".format(s_strerror(errno))); 281 } 282 // 283 // handle kernel events 284 // 285 foreach(i; 0..ready) { 286 if ( stopped ) { 287 break; 288 } 289 auto e = out_events[i]; 290 debug tracef("got kevent[%d] %s, data: %d, udata: %0x", i, e, e.data, e.udata); 291 292 switch (e.filter) { 293 case EVFILT_READ: 294 debug tracef("Read on fd %d", e.ident); 295 AppEvent ae = AppEvent.IN; 296 if ( e.flags & EV_ERROR) { 297 ae |= AppEvent.ERR; 298 } 299 if ( e.flags & EV_EOF) { 300 ae |= AppEvent.HUP; 301 } 302 int fd = cast(int)e.ident; 303 fileHandlers[fd].eventHandler(cast(int)e.ident, ae); 304 continue; 305 case EVFILT_WRITE: 306 debug tracef("Write on fd %d", e.ident); 307 AppEvent ae = AppEvent.OUT; 308 if ( e.flags & EV_ERROR) { 309 ae |= AppEvent.ERR; 310 } 311 if ( e.flags & EV_EOF) { 312 ae |= AppEvent.HUP; 313 } 314 int fd = cast(int)e.ident; 315 fileHandlers[fd].eventHandler(cast(int)e.ident, ae); 316 continue; 317 case EVFILT_SIGNAL: 318 assert(signals.length != 0); 319 auto signum = cast(int)e.ident; 320 debug tracef("received signal %d", signum); 321 assert(signals[signum].length > 0); 322 foreach(s; signals[signum]) { 323 debug tracef("processing signal handler %s", s); 324 try { 325 SigHandlerDelegate h = s._handler; 326 h(signum); 327 } catch (Exception e) { 328 errorf("Uncaught exception: %s", e.msg); 329 } 330 } 331 continue; 332 case EVFILT_USER: 333 handle_user_event(e); 334 continue; 335 default: 336 break; 337 } 338 } 339 auto toCatchUp = timingwheels.ticksToCatchUp(tick, now_real.stdTime); 340 if(toCatchUp>0) 341 { 342 auto wr = timingwheels.advance(toCatchUp); 343 foreach(t; wr.timers) 344 { 345 HandlerDelegate h = t._handler; 346 assert(t._armed); 347 t._armed = false; 348 try { 349 h(AppEvent.TMO); 350 } catch (Exception e) { 351 throw e; 352 //errorf("Uncaught exception: %s", e); 353 } 354 } 355 } 356 execute_overdue_timers(); 357 if (!runInfinitely && now_real >= deadline) 358 { 359 debug(hiokqueue) safe_tracef("reached deadline, return"); 360 return; 361 } 362 } 363 } 364 365 void start_timer(Timer t) @trusted { 366 debug(hiokqueue) tracef("insert timer: %s", t); 367 if ( inshutdown) 368 { 369 throw new LoopShutdownException("starting timer"); 370 } 371 assert(!t._armed); 372 t._armed = true; 373 auto now = Clock.currTime; 374 auto d = t._expires - now; 375 d = max(d, 0.seconds); 376 if ( d < tick ) { 377 overdue ~= t; 378 return; 379 } 380 ulong twNow = timingwheels.currStdTime(tick); 381 Duration twdelay = (now.stdTime - twNow).hnsecs; 382 debug(hiokqueue) safe_tracef("tw delay: %s", (now.stdTime - twNow).hnsecs); 383 timingwheels.schedule(t, (d + twdelay)/tick); 384 } 385 386 // bool timer_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc { 387 // foreach(ref o; out_events[0..ready]) { 388 // if ( o.ident == e.ident && o.filter == e.filter && o.udata == e.udata ) { 389 // o.ident = 0; 390 // o.filter = 0; 391 // o.udata = null; 392 // return true; 393 // } 394 // } 395 // return false; 396 // } 397 398 void stop_timer(Timer t) @safe { 399 debug(hiokqueue) safe_tracef("remove timer %s", t); 400 if (overdue.length > 0) 401 for(int i=0;i<overdue.length;i++) 402 { 403 if (overdue[i] is t) 404 { 405 overdue[i] = null; 406 debug(hioepoll) tracef("remove timer from overdue %s", t); 407 } 408 } 409 if (timingwheels.totalTimers() > 0) 410 { 411 // static destructors can try to stop timers after loop deinit 412 timingwheels.cancel(t); 413 } 414 } 415 416 void flush() @trusted { 417 if ( in_index == 0 ) { 418 return; 419 } 420 // flush 421 int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null); 422 enforce(rc>=0, "flush: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index])); 423 in_index = 0; 424 } 425 426 bool fd_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc { 427 foreach(ref o; out_events[0..ready]) { 428 if ( o.ident == e.ident && o.filter == e.filter ) { 429 o.ident = 0; 430 o.filter = 0; 431 return true; 432 } 433 } 434 return false; 435 } 436 437 void detach(int fd) @safe { 438 fileHandlers[fd] = null; 439 } 440 void start_poll(int fd, AppEvent ev, FileEventHandler h) @safe { 441 assert(fd>=0); 442 immutable filter = appEventToSysEvent(ev); 443 debug tracef("start poll on fd %d for events %s", fd, appeventToString(ev)); 444 if ( inshutdown ) 445 { 446 //throw new LoopShutdownException("start polling"); 447 h.eventHandler(fd, AppEvent.SHUTDOWN); 448 return; 449 } 450 kevent_t e; 451 e.ident = fd; 452 e.filter = cast(typeof(e.filter))filter; 453 e.flags = EV_ADD; 454 if ( in_index == MAXEVENTS ) { 455 flush(); 456 } 457 in_events[in_index++] = e; 458 fileHandlers[fd] = h; 459 } 460 void stop_poll(int fd, AppEvent ev) @safe { 461 assert(fd>=0); 462 immutable filter = appEventToSysEvent(ev); 463 debug tracef("stop poll on fd %d for events %s", fd, appeventToString(ev)); 464 kevent_t e; 465 e.ident = fd; 466 e.filter = cast(typeof(e.filter))filter; 467 e.flags = EV_DELETE|EV_DISABLE; 468 fd_cleared_from_out_events(e); 469 if ( in_index == MAXEVENTS ) { 470 flush(); 471 } 472 in_events[in_index++] = e; 473 flush(); 474 } 475 476 pragma(inline, true) 477 void handle_user_event(kevent_t e) @safe { 478 import core.thread; 479 debug tracef("Got user event thread.id:%s event.id:%d", Thread.getThis().id(), e.ident); 480 disable_user_event(e); 481 auto h = fileHandlers[e.ident]; 482 h.eventHandler(kqueue_fd, AppEvent.USER); 483 } 484 485 void wait_for_user_event(int event_id, FileEventHandler handler) @safe { 486 debug tracef("start waiting for user_event %s", event_id); 487 fileHandlers[event_id] = handler; 488 kevent_t e; 489 e.ident = event_id; 490 e.filter = EVFILT_USER; 491 e.flags = EV_ADD; 492 if ( in_index == MAXEVENTS ) { 493 flush(); 494 } 495 in_events[in_index++] = e; 496 } 497 void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe { 498 debug tracef("start waiting for user_event %s", event_id); 499 fileHandlers[event_id] = null; 500 kevent_t e; 501 e.ident = event_id; 502 e.filter = EVFILT_USER; 503 e.flags = EV_DELETE; 504 if ( in_index == MAXEVENTS ) { 505 flush(); 506 } 507 in_events[in_index++] = e; 508 } 509 void disable_user_event(kevent_t e) @safe { 510 e.flags = EV_DISABLE; 511 if ( in_index == MAXEVENTS ) { 512 flush(); 513 } 514 in_events[in_index++] = e; 515 } 516 void _add_kernel_timer(in Timer t, in Duration d) @trusted { 517 debug tracef("add kernel timer %s, delta %s", t, d); 518 assert(d >= 0.seconds); 519 intptr_t delay_ms; 520 if ( d < 36500.days) 521 { 522 delay_ms = d.split!"msecs".msecs; 523 } 524 else 525 { 526 // https://github.com/opensource-apple/xnu/blob/master/bsd/kern/kern_event.c#L1188 527 // OSX kerner refuses to set too large timer interwal with errno ERANGE 528 delay_ms = 36500.days.split!"msecs".msecs; 529 } 530 kevent_t e; 531 e.ident = 0; 532 e.filter = EVFILT_TIMER; 533 e.flags = EV_ADD | EV_ONESHOT; 534 e.data = delay_ms; 535 e.udata = cast(void*)t; 536 if ( in_index == MAXEVENTS ) { 537 flush(); 538 } 539 in_events[in_index++] = e; 540 } 541 542 alias _mod_kernel_timer = _add_kernel_timer; 543 544 void _del_kernel_timer() @safe { 545 debug trace("del kernel timer"); 546 kevent_t e; 547 e.ident = 0; 548 e.filter = EVFILT_TIMER; 549 e.flags = EV_DELETE; 550 if ( in_index == MAXEVENTS ) { 551 flush(); 552 } 553 in_events[in_index++] = e; 554 } 555 556 /* 557 * signal functions 558 */ 559 560 void start_signal(Signal s) { 561 debug tracef("start signal %s", s); 562 debug tracef("signals: %s", signals); 563 auto r = s._signum in signals; 564 if ( r is null || r.length == 0 ) { 565 // enable signal only through kevent 566 _add_kernel_signal(s); 567 } 568 signals[s._signum] ~= s; 569 } 570 571 void stop_signal(Signal s) { 572 debug trace("stop signal"); 573 auto r = s._signum in signals; 574 if ( r is null ) { 575 throw new NotFoundException("You tried to stop signal that was not started"); 576 } 577 Signal[] new_row; 578 foreach(a; *r) { 579 if (a._id == s._id) { 580 continue; 581 } 582 new_row ~= a; 583 } 584 if ( new_row.length == 0 ) { 585 *r = null; 586 _del_kernel_signal(s); 587 // reenable old signal behaviour 588 } else { 589 *r = new_row; 590 } 591 debug tracef("new signals %d row %s", s._signum, new_row); 592 } 593 594 void _add_kernel_signal(in Signal s) { 595 debug tracef("add kernel signal %d, id: %d", s._signum, s._id); 596 signal(s._signum, SIG_IGN); 597 598 kevent_t e; 599 e.ident = s._signum; 600 e.filter = EVFILT_SIGNAL; 601 e.flags = EV_ADD; 602 if ( in_index == MAXEVENTS ) { 603 // flush 604 int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null); 605 enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index])); 606 in_index = 0; 607 } 608 in_events[in_index++] = e; 609 } 610 611 void _del_kernel_signal(in Signal s) { 612 debug tracef("del kernel signal %d, id: %d", s._signum, s._id); 613 614 signal(s._signum, SIG_DFL); 615 616 kevent_t e; 617 e.ident = s._signum; 618 e.filter = EVFILT_SIGNAL; 619 e.flags = EV_DELETE; 620 if ( in_index == MAXEVENTS ) { 621 // flush 622 int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null); 623 enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index])); 624 in_index = 0; 625 } 626 in_events[in_index++] = e; 627 } 628 } 629 630 auto appEventToSysEvent(AppEvent ae) { 631 import core.bitop; 632 // clear EXT_ flags 633 ae &= AppEvent.ALL; 634 assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae))); 635 assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events"); 636 switch ( ae ) { 637 case AppEvent.IN: 638 return EVFILT_READ; 639 case AppEvent.OUT: 640 return EVFILT_WRITE; 641 case AppEvent.CONN: 642 return EVFILT_READ; 643 default: 644 throw new Exception("You can't wait for event %X".format(ae)); 645 } 646 } 647 AppEvent sysEventToAppEvent(short se) { 648 final switch ( se ) { 649 case EVFILT_READ: 650 return AppEvent.IN; 651 case EVFILT_WRITE: 652 return AppEvent.OUT; 653 // default: 654 // throw new Exception("Unexpected event %d".format(se)); 655 } 656 } 657 unittest { 658 import std.exception; 659 import core.exception; 660 661 assert(appEventToSysEvent(AppEvent.IN)==EVFILT_READ); 662 assert(appEventToSysEvent(AppEvent.OUT)==EVFILT_WRITE); 663 assert(appEventToSysEvent(AppEvent.CONN)==EVFILT_READ); 664 //assertThrown!AssertError(appEventToSysEvent(AppEvent.IN | AppEvent.OUT)); 665 assert(sysEventToAppEvent(EVFILT_READ) == AppEvent.IN); 666 assert(sysEventToAppEvent(EVFILT_WRITE) == AppEvent.OUT); 667 }