1 module hio.drivers.select; 2 3 import std.datetime; 4 import std.container; 5 import std.experimental.logger; 6 7 import std.experimental.allocator; 8 import std.experimental.allocator.mallocator; 9 import std.typecons; 10 11 import std.string; 12 import std.algorithm.comparison: min, max; 13 import std.exception: enforce; 14 import core.thread; 15 16 version(Windows) { 17 import core.sys.windows.winsock2; 18 } 19 version(Posix) { 20 import core.sys.posix.sys.select; 21 } 22 23 import core.stdc.string: strerror; 24 import core.stdc.errno; 25 import core.stdc.signal; 26 27 import timingwheels; 28 29 import hio.events; 30 import hio.common; 31 32 // 33 // TODO add support for multiple select event loops 34 // 35 private enum sig_array_length = 256; 36 private static int[sig_array_length] last_signal; 37 private static int last_signal_index; 38 39 extern(C) void sig_catcher(int signum) nothrow @nogc { 40 last_signal[last_signal_index++] = signum; 41 } 42 43 private struct FileDescriptor { 44 package { 45 AppEvent _polling = AppEvent.NONE; 46 } 47 string toString() const @safe { 48 import std.format: format; 49 return appeventToString(_polling); 50 //return "FileDescriptor: filehandle: %d, events: %s".format(_fileno, appeventToString(_polling)); 51 } 52 } 53 54 struct FallbackEventLoopImpl { 55 immutable string _name = "select"; 56 immutable numberOfDescriptors = 1024; 57 58 private { 59 fd_set read_fds; 60 fd_set write_fds; 61 fd_set err_fds; 62 63 bool stopped = false; 64 65 Signal[][int] signals; 66 67 FileDescriptor[numberOfDescriptors] fileDescriptors; 68 FileEventHandler[] fileHandlers; 69 70 Timer[] overdue; // timers added with expiration in past or with cicks==0 71 72 RedBlackTree!Timer precise_timers; // precise timers 73 TimingWheels!Timer timingwheels; // timing wheels 74 Duration tick = 5.msecs; 75 //CircBuff!Notification notificationsQueue; 76 } 77 78 @disable this(this) {}; 79 80 void initialize() @safe nothrow { 81 precise_timers = new RedBlackTree!Timer(); 82 fileHandlers = new FileEventHandler[](1024); 83 timingwheels.init(); 84 } 85 void deinit() @trusted { 86 precise_timers = null; 87 timingwheels = TimingWheels!(Timer)(); 88 timingwheels.init(); 89 } 90 void stop() @safe { 91 debug(hioselect) safe_tracef("mark eventloop as stopped"); 92 stopped = true; 93 } 94 95 private Duration timeUntilNextTimer() 96 out(r;r>=0.seconds) 97 { 98 Duration result = Duration.max; 99 ulong nowRT = Clock.currStdTime; 100 101 if ( ! precise_timers.empty ) 102 { 103 result = min(result, precise_timers.front._expires - SysTime(nowRT)); 104 } 105 auto nextTWtimer = timingwheels.timeUntilNextEvent(tick, nowRT); 106 nextTWtimer = max(nextTWtimer, 0.seconds); 107 debug(hioselect) safe_tracef("prec: %s, timingwheel: %s", result, nextTWtimer); 108 result = min(result, nextTWtimer); 109 return result; 110 } 111 private timeval* _calculate_timeval(Duration d, timeval* tv) 112 { 113 if (d < 0.seconds) 114 { 115 d = 0.seconds; 116 } 117 immutable converted = d.split!("seconds", "usecs"); 118 tv.tv_sec = cast(typeof(tv.tv_sec))converted.seconds; 119 tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs; 120 return tv; 121 } 122 private void execute_overdue_timers() 123 { 124 while (overdue.length > 0) 125 { 126 // execute timers which user requested with negative delay 127 Timer t = overdue[0]; 128 overdue = overdue[1..$]; 129 debug(hioselect) safe_tracef("execute overdue %s", t); 130 HandlerDelegate h = t._handler; 131 try { 132 h(AppEvent.TMO); 133 } catch (Exception e) { 134 errorf("Uncaught exception: %s", e); 135 } 136 } 137 } 138 // /** 139 // * Find shortest interval between now->deadline, now->earliest timer 140 // * If deadline expired or timer in past - set zero wait time 141 // */ 142 // timeval* _calculate_timeval(SysTime deadline, timeval* tv) { 143 // SysTime now = Clock.currTime; 144 // Duration d = deadline - now; 145 // if ( ! precise_timers.empty ) { 146 // d = min(d, precise_timers.front._expires - now); 147 // } 148 // d = min(d, timingwheels.timeUntilNextEvent(tick)); 149 // d = max(d, 0.seconds); 150 // auto converted = d.split!("seconds", "usecs"); 151 // tv.tv_sec = cast(typeof(tv.tv_sec))converted.seconds; 152 // tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs; 153 // return tv; 154 // } 155 // timeval* _calculate_timeval(timeval* tv) { 156 // SysTime now = Clock.currTime; 157 // Duration d; 158 // d = timers.front._expires - now; 159 // d = max(d, 0.seconds); 160 // auto converted = d.split!("seconds", "usecs"); 161 // tv.tv_sec = cast(typeof(tv.tv_sec))converted.seconds; 162 // tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs; 163 // return tv; 164 // } 165 void run(Duration d) { 166 167 immutable bool runIndefinitely = (d == Duration.max); 168 SysTime now = Clock.currTime; 169 SysTime deadline; 170 timeval tv; 171 timeval* wait; 172 173 if ( ! runIndefinitely ) { 174 deadline = now + d; 175 } 176 177 debug(hioselect) safe_tracef("evl run %s",d); 178 179 scope(exit) { 180 stopped = false; 181 } 182 183 while (!stopped) { 184 185 int fdmax = -1; 186 187 execute_overdue_timers(); 188 189 if (stopped) { 190 break; 191 } 192 193 while ( !precise_timers.empty && precise_timers.front._expires <= now) { 194 debug(hioselect) safe_tracef("processing overdue from precise %s, lag: %s", 195 precise_timers.front, Clock.currTime - precise_timers.front._expires); 196 Timer t = precise_timers.front; 197 HandlerDelegate h = t._handler; 198 precise_timers.removeFront; 199 try { 200 h(AppEvent.TMO); 201 } catch (Exception e) { 202 errorf("Uncaught exception: %s", e); 203 } 204 now = Clock.currTime; 205 } 206 if (stopped) { 207 break; 208 } 209 210 FD_ZERO(&read_fds); 211 FD_ZERO(&write_fds); 212 FD_ZERO(&err_fds); 213 214 foreach(int fd; 0..numberOfDescriptors) { 215 AppEvent e = fileDescriptors[fd]._polling; 216 if ( e == AppEvent.NONE ) { 217 continue; 218 } 219 debug(hioselect) safe_tracef("poll %d for %s", fd, fileDescriptors[fd]); 220 if ( e & AppEvent.IN ) { 221 FD_SET(fd, &read_fds); 222 } 223 if ( e & AppEvent.OUT ) { 224 FD_SET(fd, &write_fds); 225 } 226 fdmax = max(fdmax, fd); 227 } 228 229 // 230 // Next limits for wait time: 231 // - deadline (user ask loop run duration d) 232 // - or requested to run indefinitely 233 // - next precise_timer 234 // - next timingwheel timer 235 // 236 immutable untilTimer = timeUntilNextTimer(); 237 immutable untilDeadline = runIndefinitely ? Duration.max : deadline - Clock.currTime; 238 immutable nextStop = min(untilTimer, untilDeadline); 239 debug(hioselect) safe_tracef("untilTimer: %s, untilDeadline: %s", untilTimer, untilDeadline); 240 if ( nextStop == Duration.max ) 241 { 242 wait = null; 243 } 244 else 245 { 246 wait = _calculate_timeval(nextStop, &tv); 247 } 248 249 debug(hioselect) safe_tracef("waiting for events %s", wait is null?"forever":"%s".format(*wait)); 250 auto ready = select(fdmax+1, &read_fds, &write_fds, &err_fds, wait); 251 debug(hioselect) safe_tracef("returned %d events", ready); 252 if ( ready < 0 && errno == EINTR ) { 253 int s_ind; 254 while(s_ind < last_signal_index) { 255 int signum = last_signal[s_ind]; 256 assert(signals[signum].length > 0); 257 foreach(s; signals[signum]) { 258 debug(hioselect) safe_tracef("processing signal handler %s", s); 259 try { 260 SigHandlerDelegate h = s._handler; 261 h(signum); 262 } catch (Exception e) { 263 errorf("Uncaught exception: %s", e); 264 } 265 } 266 s_ind++; 267 } 268 last_signal_index = 0; 269 continue; 270 } 271 if ( ready < 0 ) 272 { 273 errorf("on call: (%s, %s, %s, %s)", fdmax+1, read_fds, write_fds, tv); 274 errorf("select returned error %s", fromStringz(strerror(errno))); 275 } 276 enforce(ready >= 0); 277 if ( ready == 0 ) { 278 ulong nowRT = Clock.currStdTime; 279 // Timedout 280 // check timingweels 281 auto toCatchUp = timingwheels.ticksToCatchUp(tick, nowRT); 282 if(toCatchUp>0) 283 { 284 auto wr = timingwheels.advance(toCatchUp); 285 foreach(t; wr.timers) 286 { 287 HandlerDelegate h = t._handler; 288 try { 289 h(AppEvent.TMO); 290 } catch (Exception e) { 291 errorf("Uncaught exception: %s", e); 292 } 293 } 294 } 295 296 if ( precise_timers.length > 0 && precise_timers.front._expires <= now) do { 297 debug(hioselect) safe_tracef("processing %s, lag: %s", 298 precise_timers.front, Clock.currTime - precise_timers.front._expires); 299 Timer t = precise_timers.front; 300 HandlerDelegate h = t._handler; 301 assert(t._armed); 302 t._armed = false; 303 try { 304 h(AppEvent.TMO); 305 } catch (Exception e) { 306 errorf("Uncaught exception: %s", e); 307 } 308 // timer event handler can try to stop exactly this timer, 309 // so when we returned from handler we can have different front 310 // and we do not have to remove it. 311 if ( !precise_timers.empty && precise_timers.front == t ) { 312 precise_timers.removeFront; 313 } 314 now = Clock.currTime; 315 } while (!precise_timers.empty && precise_timers.front._expires <= now ); 316 317 // handlers can install some late timers, so... 318 execute_overdue_timers(); 319 320 now = Clock.currTime; 321 if ( !runIndefinitely && now >= deadline ) { 322 debug(hioselect) safe_tracef("deadline reached"); 323 return; 324 } 325 } 326 if ( ready > 0 ) { 327 foreach(int fd; 0..numberOfDescriptors) { 328 AppEvent e = fileDescriptors[fd]._polling; 329 if ( e == AppEvent.NONE ) { 330 continue; 331 } 332 debug(hioselect) safe_tracef("check %d for %s", fd, fileDescriptors[fd]); 333 if ( e & AppEvent.IN && FD_ISSET(fd, &read_fds) ) { 334 debug(hioselect) safe_tracef("got IN event on file %d", fd); 335 fileHandlers[fd].eventHandler(fd, AppEvent.IN); 336 } 337 if ( e & AppEvent.OUT && FD_ISSET(fd, &write_fds) ) { 338 debug(hioselect) safe_tracef("got OUT event on file %d", fd); 339 fileHandlers[fd].eventHandler(fd, AppEvent.OUT); 340 } 341 } 342 } 343 } 344 } 345 346 void start_timer(Timer t) @trusted { 347 debug(hioselect) safe_tracef("insert timer: %s", t); 348 auto now = Clock.currTime; 349 auto d = t._expires - now; 350 d = max(d, 0.seconds); 351 if ( d < tick ) { 352 overdue ~= t; 353 return; 354 } 355 assert(!t._armed); 356 t._armed = true; 357 ulong twNow = timingwheels.currStdTime(tick); 358 Duration twdelay = (now.stdTime - twNow).hnsecs; 359 debug(hioselect) safe_tracef("tw delay: %s", (now.stdTime - twNow).hnsecs); 360 timingwheels.schedule(t, (d + twdelay)/tick); 361 } 362 363 void stop_timer(Timer t) @trusted { 364 debug(hioselect) safe_tracef("remove timer %s", t); 365 { 366 // static destructors can try to stop timers after loop deinit 367 timingwheels.cancel(t); 368 } 369 } 370 371 void start_precise_timer(Timer t) @trusted { 372 debug(hioselect) safe_tracef("insert timer: %s", t); 373 auto d = t._expires - Clock.currTime; 374 d = max(d, 0.seconds); 375 if ( d == 0.seconds ) { 376 overdue ~= t; 377 return; 378 } 379 precise_timers.insert(t); 380 } 381 382 void stop_precise_timer(Timer t) @trusted { 383 assert(!precise_timers.empty, "You are trying to remove timer %s, but timer list is empty".format(t)); 384 debug(hioselect) safe_tracef("remove timer %s", t); 385 auto r = precise_timers.equalRange(t); 386 precise_timers.remove(r); 387 } 388 389 void start_poll(int fd, AppEvent ev, FileEventHandler h) pure @safe { 390 enforce(fd >= 0, "fileno can't be negative"); 391 enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors"); 392 debug(hioselect) safe_tracef("start poll on fd %d for events %s", fd, appeventToString(ev)); 393 fileDescriptors[fd]._polling |= ev; 394 fileHandlers[fd] = h; 395 } 396 void stop_poll(int fd, AppEvent ev) @safe { 397 enforce(fd >= 0, "fileno can't be negative"); 398 enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors"); 399 debug(hioselect) safe_tracef("stop poll on fd %d for events %s", fd, appeventToString(ev)); 400 fileDescriptors[fd]._polling &= ev ^ AppEvent.ALL; 401 } 402 403 int get_kernel_id() @safe @nogc { 404 return -1; 405 } 406 407 void wait_for_user_event(int event_id, FileEventHandler handler) @safe { 408 409 } 410 void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe { 411 412 } 413 414 void detach(int fd) @safe { 415 fileHandlers[fd] = null; 416 } 417 418 // pragma(inline) 419 // void processNotification(Notification ue) @safe { 420 // ue.handler(); 421 // } 422 423 // void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe { 424 // debug trace("posting notification"); 425 // if ( !notificationsQueue.full ) 426 // { 427 // notificationsQueue.put(notification); 428 // return; 429 // } 430 // // now try to find space for next notification 431 // auto retries = 10 * notificationsQueue.Size; 432 // while(notificationsQueue.full && retries > 0) 433 // { 434 // retries--; 435 // auto _n = notificationsQueue.get(); 436 // processNotification(_n); 437 // } 438 // enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue"); 439 // notificationsQueue.put(notification); 440 // } 441 void flush() { 442 } 443 void start_signal(Signal s) { 444 debug(hioselect) safe_tracef("start signal %s", s); 445 debug(hioselect) safe_tracef("signals: %s", signals); 446 auto r = s._signum in signals; 447 if ( r is null || r.length == 0 ) { 448 // enable signal only through kevent 449 _add_kernel_signal(s); 450 } 451 signals[s._signum] ~= s; 452 } 453 454 void stop_signal(Signal s) { 455 debug trace("stop signal"); 456 auto r = s._signum in signals; 457 if ( r is null ) { 458 throw new NotFoundException("You tried to stop signal that was not started"); 459 } 460 Signal[] new_row; 461 foreach(a; *r) { 462 if (a._id == s._id) { 463 continue; 464 } 465 new_row ~= a; 466 } 467 if ( new_row.length == 0 ) { 468 *r = null; 469 _del_kernel_signal(s); 470 // reenable old signal behaviour 471 } else { 472 *r = new_row; 473 } 474 debug(hioselect) safe_tracef("new signals %d row %s", s._signum, new_row); 475 } 476 void _add_kernel_signal(Signal s) { 477 signal(s._signum, &sig_catcher); 478 debug(hioselect) safe_tracef("adding handler for signum %d: %x", s._signum, &this); 479 } 480 void _del_kernel_signal(Signal s) { 481 signal(s._signum, SIG_DFL); 482 debug(hioselect) safe_tracef("deleted handler for signum %d: %x", s._signum, &this); 483 } 484 }