1 module hio.drivers.epoll; 2 3 version(linux): 4 5 import std.datetime; 6 import std.string; 7 import std.container; 8 import std.exception; 9 import std.experimental.logger; 10 import std.typecons; 11 import std.traits; 12 import std.algorithm: min, max; 13 14 import std.experimental.allocator; 15 import std.experimental.allocator.mallocator; 16 17 import core.memory: GC; 18 19 import std.algorithm.comparison: max; 20 import core.stdc.string: strerror; 21 import core.stdc.errno: errno, EAGAIN, EINTR; 22 import core.stdc.stdio: printf; 23 24 import core.sys.linux.epoll; 25 import core.sys.linux.timerfd; 26 import core.sys.linux.sys.signalfd; 27 28 import core.sys.posix.unistd: close, read; 29 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC , timespec; 30 31 import core.memory; 32 33 import timingwheels; 34 35 import hio.events; 36 import hio.common; 37 38 private enum InExpTimersSize = 16; 39 private alias TW = TimingWheels!Timer; 40 private alias TWAdvanceResult = ReturnType!(TW.advance!(TW)); 41 42 struct NativeEventLoopImpl { 43 immutable bool native = true; 44 immutable string _name = "epoll"; 45 private { 46 bool stopped = false; 47 bool inshutdown = false; 48 enum MAXEVENTS = 1024; 49 enum TOTAL_FILE_HANDLERS = 16*1024; 50 int epoll_fd = -1; 51 int signal_fd = -1; 52 sigset_t mask; 53 54 align(1) epoll_event[MAXEVENTS] events; 55 56 Duration tick = 5.msecs; 57 TW timingwheels; 58 59 TWAdvanceResult advancedTimersHash; 60 long advancedTimersHashLength; 61 62 Timer[InExpTimersSize] advancedTimersArray; 63 long advancedTimersArrayLength; 64 65 Timer[] overdue; // timers added with expiration in past 66 Signal[][int] signals; 67 FileEventHandler[] fileHandlers; 68 69 } 70 @disable this(this) {} 71 void initialize() @trusted nothrow { 72 if ( epoll_fd == -1 ) { 73 epoll_fd = (() @trusted => epoll_create(MAXEVENTS))(); 74 } 75 fileHandlers = Mallocator.instance.makeArray!FileEventHandler(TOTAL_FILE_HANDLERS); 76 GC.addRange(&fileHandlers[0], fileHandlers.length * FileEventHandler.sizeof); 77 timingwheels.init(); 78 } 79 void deinit() @trusted { 80 if (epoll_fd>=0) 81 { 82 close(epoll_fd); 83 epoll_fd = -1; 84 } 85 if (signal_fd>=0) 86 { 87 close(signal_fd); 88 signal_fd = -1; 89 } 90 //precise_timers = null; 91 if (fileHandlers !is null) 92 { 93 GC.removeRange(&fileHandlers[0]); 94 Mallocator.instance.dispose(fileHandlers); 95 fileHandlers = null; 96 } 97 timingwheels = TimingWheels!(Timer)(); 98 timingwheels.init(); 99 advancedTimersHash = TWAdvanceResult.init; 100 } 101 102 void stop() @safe { 103 stopped = true; 104 } 105 106 int _calculate_timeout(SysTime deadline) { 107 auto now_real = Clock.currTime; 108 Duration delta = deadline - now_real; 109 debug(hioepoll) safe_tracef("deadline - now_real: %s", delta); 110 auto nextTWtimer = timingwheels.timeUntilNextEvent(tick, now_real.stdTime); 111 debug(hioepoll) safe_tracef("nextTWtimer: %s", nextTWtimer); 112 delta = min(delta, nextTWtimer); 113 114 delta = max(delta, 0.seconds); 115 return cast(int)delta.total!"msecs"; 116 } 117 private void execute_overdue_timers() 118 { 119 debug(hioepoll) tracef("calling overdue timers"); 120 while (overdue.length > 0) 121 { 122 // execute timers which user requested with negative delay 123 Timer t = overdue[0]; 124 overdue = overdue[1..$]; 125 if ( t is null) 126 { 127 // timer was removed 128 continue; 129 } 130 debug(hioepoll) tracef("execute overdue %s", t); 131 assert(t._armed); 132 t._armed = false; 133 HandlerDelegate h = t._handler; 134 try { 135 if (inshutdown) 136 { 137 h(AppEvent.SHUTDOWN); 138 } 139 else 140 { 141 h(AppEvent.TMO); 142 } 143 } catch (Exception e) { 144 errorf("Uncaught exception: %s", e); 145 } 146 } 147 } 148 149 void shutdown() @safe 150 in(!stopped && !inshutdown) 151 { 152 // scan through all timers 153 auto a = timingwheels.allTimers(); 154 foreach(t; a.timers) 155 { 156 try 157 { 158 debug(hioepoll) tracef("send shutdown to timer %s", t); 159 t._handler(AppEvent.SHUTDOWN); 160 } 161 catch(Exception e) 162 { 163 errorf("Error on event SHUTDOWN in %s", t); 164 } 165 } 166 // scan through active file descriptors 167 // and send SHUTDOWN event 168 if (fileHandlers is null) 169 { 170 return; 171 } 172 for(int i=0;i<TOTAL_FILE_HANDLERS;i++) 173 { 174 if (fileHandlers[i] is null) 175 { 176 continue; 177 } 178 try 179 { 180 debug(hioepoll) tracef("send shutdown to filehandler %s", fileHandlers[i]); 181 fileHandlers[i].eventHandler(i, AppEvent.SHUTDOWN); 182 } 183 catch(Exception e) 184 { 185 () @trusted {printf("exception: %s:%d: %s", __FILE__.ptr, __LINE__, toStringz(e.toString));}(); 186 } 187 } 188 inshutdown = true; 189 } 190 /** 191 * 192 **/ 193 void run(Duration d) { 194 195 immutable bool runInfinitely = (d == Duration.max); 196 197 /** 198 * eventloop will exit when we reach deadline 199 * it is allowed to have d == 0.seconds, 200 * which mean we wil run events once 201 **/ 202 SysTime deadline = Clock.currTime + d; 203 debug(hioepoll) tracef("evl run %s",runInfinitely? "infinitely": "for %s".format(d)); 204 scope ( exit ) 205 { 206 stopped = false; 207 } 208 209 while( !stopped ) { 210 debug(hioepoll) tracef("event loop iteration"); 211 212 if (stopped) { 213 break; 214 } 215 216 int timeout_ms = _calculate_timeout(deadline); 217 218 debug(hioepoll) safe_tracef("wait in poll for %s.ms", timeout_ms); 219 220 int ready = epoll_wait(epoll_fd, &events[0], MAXEVENTS, timeout_ms); 221 222 debug(hioepoll) tracef("got %d events", ready); 223 224 SysTime now_real = Clock.currTime; 225 226 if ( ready == -1 && errno == EINTR) { 227 continue; 228 } 229 if ( ready < 0 ) { 230 errorf("epoll_wait returned error %s", fromStringz(strerror(errno))); 231 // throw new Exception("epoll errno"); 232 } 233 234 debug(hioepoll) tracef("events: %s", events[0..ready]); 235 foreach(i; 0..ready) { 236 auto e = events[i]; 237 //debug printf("epoll wait: fd:%d: 0x%0x\n", e.data.fd, e.events); 238 debug(hioepoll) tracef("got event %s", e); 239 int fd = e.data.fd; 240 241 if ( fd == signal_fd ) { 242 enum siginfo_items = 8; 243 signalfd_siginfo[siginfo_items] info; 244 debug(hioepoll) trace("got signal"); 245 assert(signal_fd != -1); 246 while (true) { 247 auto rc = read(signal_fd, &info, info.sizeof); 248 if ( rc < 0 && errno == EAGAIN ) { 249 break; 250 } 251 enforce(rc > 0); 252 auto got_signals = rc / signalfd_siginfo.sizeof; 253 debug(hioepoll) tracef("read info %d, %s", got_signals, info[0..got_signals]); 254 foreach(si; 0..got_signals) { 255 auto signum = info[si].ssi_signo; 256 debug(hioepoll) tracef("signum: %d", signum); 257 foreach(s; signals[signum]) { 258 debug(hioepoll) tracef("processing signal handler %s", s); 259 try { 260 SigHandlerDelegate h = s._handler; 261 h(signum); 262 } catch (Exception e) { 263 errorf("Uncaught exception: %s", e); 264 } 265 } 266 } 267 } 268 continue; 269 } 270 AppEvent ae; 271 if ( e.events & EPOLLIN ) { 272 ae |= AppEvent.IN; 273 } 274 if (e.events & EPOLLOUT) { 275 ae |= AppEvent.OUT; 276 } 277 if (e.events & EPOLLERR) { 278 ae |= AppEvent.ERR; 279 } 280 if (e.events & EPOLLHUP) { 281 ae |= AppEvent.HUP; 282 } 283 debug(hioepoll) tracef("process event %02x on fd: %s, handler: %s", e.events, e.data.fd, fileHandlers[fd]); 284 if ( fileHandlers[fd] !is null ) { 285 try { 286 fileHandlers[fd].eventHandler(e.data.fd, ae); 287 } 288 catch (Exception e) { 289 errorf("On file handler: %d, %s", fd, e); 290 throw e; 291 } 292 } 293 } 294 auto toCatchUp = timingwheels.ticksToCatchUp(tick, now_real.stdTime); 295 if(toCatchUp>0) 296 { 297 /* 298 ** Some timers expired. 299 ** -------------------- 300 ** Most of the time the number of this timers is low, so we optimize for 301 ** this case: copy this small number of timers into small array, then ite- 302 ** rate over this array. 303 ** 304 ** Another case - number of expired timers > InExpTimersSize, so we can't 305 ** copy timers to this array. Then we just iterate over the result. 306 ** 307 ** Why do we need random access to any expired timer (so we have to save 308 ** it in array or in map) - because any timer handler may wish to cancel 309 ** another timer (and this another timer can also be in 'advance' result). 310 ** Example - expired timer wakes up some task which cancel socket io timer. 311 */ 312 advancedTimersHash = timingwheels.advance(toCatchUp); 313 if (advancedTimersHash.length < InExpTimersSize) 314 { 315 debug(hioepoll) tracef("calling timers"); 316 // 317 // this case happens most of the time - low number of timers per tick 318 // save expired timers into small array. 319 // 320 int j = 0; 321 foreach(t; advancedTimersHash.timers) 322 { 323 advancedTimersArray[j++] = t; 324 } 325 advancedTimersArrayLength = j; 326 for(j=0;j < advancedTimersArrayLength; j++) 327 { 328 Timer t = advancedTimersArray[j]; 329 if ( t is null ) 330 { 331 continue; 332 } 333 HandlerDelegate h = t._handler; 334 assert(t._armed); 335 t._armed = false; 336 try { 337 h(AppEvent.TMO); 338 } catch (Exception e) { 339 errorf("Uncaught exception: %s", e); 340 } 341 } 342 } 343 else 344 { 345 debug(hioepoll) tracef("calling timers"); 346 advancedTimersHashLength = advancedTimersHash.length; 347 foreach (t; advancedTimersHash.timers) 348 { 349 HandlerDelegate h = t._handler; 350 assert(t._armed); 351 t._armed = false; 352 try { 353 h(AppEvent.TMO); 354 } catch (Exception e) { 355 errorf("Uncaught exception: %s", e); 356 } 357 } 358 } 359 advancedTimersArrayLength = 0; 360 advancedTimersHashLength = 0; 361 } 362 execute_overdue_timers(); 363 if (!runInfinitely && now_real >= deadline) 364 { 365 debug(hioepoll) safe_tracef("reached deadline, return"); 366 return; 367 } 368 } 369 } 370 void start_timer(Timer t) @safe { 371 debug(hioepoll) tracef("insert timer: %s", t); 372 if ( inshutdown) 373 { 374 throw new LoopShutdownException("starting timer"); 375 } 376 assert(!t._armed); 377 t._armed = true; 378 auto now = Clock.currTime; 379 auto d = t._expires - now; 380 d = max(d, 0.seconds); 381 if ( d < tick ) { 382 overdue ~= t; 383 debug(hioepoll) tracef("inserted overdue timer: %s", t); 384 return; 385 } 386 ulong twNow = timingwheels.currStdTime(tick); 387 Duration twdelay = (now.stdTime - twNow).hnsecs; 388 debug(hioepoll) safe_tracef("tw delay: %s", (now.stdTime - twNow).hnsecs); 389 timingwheels.schedule(t, (d + twdelay)/tick); 390 } 391 392 void stop_timer(Timer t) @safe { 393 debug(hioepoll) tracef("remove timer %s", t); 394 if ( advancedTimersArrayLength > 0) 395 { 396 for(int j=0; j<advancedTimersArrayLength;j++) 397 { 398 if ( t is advancedTimersArray[j]) 399 { 400 advancedTimersArray[j] = null; 401 return; 402 } 403 } 404 } 405 else if (advancedTimersHashLength>0 && advancedTimersHash.contains(t.id)) 406 { 407 advancedTimersHash.remove(t.id); 408 return; 409 } 410 else if (overdue.length > 0) 411 { 412 for(int i=0;i<overdue.length;i++) 413 { 414 if (overdue[i] is t) 415 { 416 overdue[i] = null; 417 debug(hioepoll) tracef("remove timer from overdue %s", t); 418 } 419 } 420 } 421 422 // static destructors can try to stop timers after loop deinit, so we check totalTimers 423 if (timingwheels.totalTimers() > 0) 424 { 425 timingwheels.cancel(t); 426 } 427 } 428 429 // 430 // signals 431 // 432 void start_signal(Signal s) { 433 debug(hioepoll) tracef("start signal %s", s); 434 debug(hioepoll) tracef("signals: %s", signals); 435 auto r = s._signum in signals; 436 if ( r is null || r.length == 0 ) { 437 // enable signal only through kevent 438 _add_kernel_signal(s); 439 } 440 signals[s._signum] ~= s; 441 } 442 void stop_signal(Signal s) { 443 debug(hioepoll) trace("stop signal"); 444 auto r = s._signum in signals; 445 if ( r is null ) { 446 throw new NotFoundException("You tried to stop signal that was not started"); 447 } 448 Signal[] new_row; 449 foreach(a; *r) { 450 if (a._id == s._id) { 451 continue; 452 } 453 new_row ~= a; 454 } 455 if ( new_row.length == 0 ) { 456 *r = null; 457 _del_kernel_signal(s); 458 // reenable old signal behaviour 459 } else { 460 *r = new_row; 461 } 462 debug(hioepoll) tracef("new signals %d row %s", s._signum, new_row); 463 } 464 void _add_kernel_signal(Signal s) { 465 debug(hioepoll) tracef("add kernel signal %d, id: %d", s._signum, s._id); 466 sigset_t m; 467 sigemptyset(&m); 468 sigaddset(&m, s._signum); 469 pthread_sigmask(SIG_BLOCK, &m, null); 470 471 sigaddset(&mask, s._signum); 472 if ( signal_fd == -1 ) { 473 signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); 474 debug(hioepoll) tracef("signalfd %d", signal_fd); 475 epoll_event e; 476 e.events = EPOLLIN|EPOLLET; 477 e.data.fd = signal_fd; 478 auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &e); 479 enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, fromStringz(strerror(errno)))); 480 } else { 481 signalfd(signal_fd, &mask, 0); 482 } 483 484 } 485 void _del_kernel_signal(Signal s) { 486 debug(hioepoll) tracef("del kernel signal %d, id: %d", s._signum, s._id); 487 sigset_t m; 488 sigemptyset(&m); 489 sigaddset(&m, s._signum); 490 pthread_sigmask(SIG_UNBLOCK, &m, null); 491 sigdelset(&mask, s._signum); 492 assert(signal_fd != -1); 493 signalfd(signal_fd, &mask, 0); 494 } 495 void wait_for_user_event(int event_id, FileEventHandler handler) @safe { 496 epoll_event e; 497 e.events = EPOLLIN; 498 e.data.fd = event_id; 499 auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_id, &e))(); 500 enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, s_strerror(errno))); 501 fileHandlers[event_id] = handler; 502 } 503 void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe { 504 epoll_event e; 505 e.events = EPOLLIN; 506 e.data.fd = event_id; 507 auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_DEL, event_id, &e))(); 508 fileHandlers[event_id] = null; 509 } 510 511 int get_kernel_id() pure @safe nothrow @nogc { 512 return epoll_fd; 513 } 514 515 // 516 // files/sockets 517 // 518 void detach(int fd) @safe { 519 debug(hioepoll) tracef("detaching fd(%d) from fileHandlers[%d]", fd, fileHandlers.length); 520 fileHandlers[fd] = null; 521 } 522 void start_poll(int fd, AppEvent ev, FileEventHandler f) @trusted { 523 assert(epoll_fd != -1); 524 debug(hioepoll) tracef("start poll for %s on fd: %s (inshutdown: %s)", ev, fd, inshutdown); 525 if ( inshutdown ) 526 { 527 //throw new LoopShutdownException("start polling"); 528 f.eventHandler(fd, AppEvent.SHUTDOWN); 529 return; 530 } 531 epoll_event e; 532 e.events = appEventToSysEvent(ev); 533 if ( ev & AppEvent.EXT_EPOLLEXCLUSIVE ) 534 { 535 e.events |= EPOLLEXCLUSIVE; 536 } 537 e.data.fd = fd; 538 //debug printf("epoll ctl: fd:%d: 0x%0x\n", e.data.fd, e.events); 539 auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &e); 540 enforce(rc >= 0, "epoll_ctl add(%d, %s): %s".format(fd, e, fromStringz(strerror(errno)))); 541 fileHandlers[fd] = f; 542 } 543 544 void stop_poll(int fd, AppEvent ev) @trusted { 545 assert(epoll_fd != -1); 546 epoll_event e; 547 e.events = appEventToSysEvent(ev); 548 e.data.fd = fd; 549 auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &e); 550 } 551 auto appEventToSysEvent(AppEvent ae) pure @safe { 552 import core.bitop; 553 // clear EXT_ flags 554 ae &= AppEvent.ALL; 555 assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae))); 556 assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events"); 557 switch ( ae ) { 558 case AppEvent.IN: 559 return EPOLLIN; 560 case AppEvent.OUT: 561 return EPOLLOUT; 562 //case AppEvent.CONN: 563 // return EVFILT_READ; 564 default: 565 throw new Exception("You can't wait for event %X".format(ae)); 566 } 567 } 568 } 569