1 module hio.scheduler; 2 3 import std.experimental.logger; 4 5 import core.thread; 6 import core.sync.mutex; 7 import std.concurrency; 8 import std.datetime; 9 import std.format; 10 import std.traits; 11 import std.exception; 12 import core.sync.condition; 13 import std.algorithm; 14 import std.typecons; 15 import std.range; 16 17 //import core.stdc.string; 18 //import core.stdc.errno; 19 20 //static import core.sys.posix.unistd; 21 22 import hio.events; 23 import hio.loop; 24 import hio.common; 25 26 import std.stdio; 27 28 struct TaskNotReady { 29 string msg; 30 } 31 32 class NotReadyException : Exception 33 { 34 this(string msg, string file = __FILE__, size_t line = __LINE__) 35 { 36 super(msg, file, line); 37 } 38 } 39 40 void hlSleep(Duration d) { 41 if ( d <= 0.seconds) { 42 return; 43 } 44 auto tid = Fiber.getThis(); 45 assert(tid !is null); 46 auto callback = delegate void (AppEvent e) @trusted 47 { 48 tid.call(Fiber.Rethrow.no); 49 }; 50 auto t = new Timer(d, callback); 51 getDefaultLoop().startTimer(t); 52 Fiber.yield(); 53 } 54 55 struct Box(T) { 56 57 enum Void = is(T == void); 58 59 static if (!Void) { 60 T _data; 61 } 62 SocketPair _pair; 63 Throwable _exception; 64 @disable this(this); 65 } 66 67 ReturnType!F App(F, A...) (F f, A args) { 68 alias R = ReturnType!F; 69 Box!R box; 70 static if (!box.Void) 71 { 72 R r; 73 } 74 void _wrapper() 75 { 76 try 77 { 78 static if (!box.Void) 79 { 80 r = f(args); 81 box._data = r; 82 } 83 else 84 { 85 f(args); 86 } 87 } 88 catch (Throwable e) 89 { 90 debug tracef("app throwed %s", e); 91 box._exception = e; 92 } 93 getDefaultLoop().stop(); 94 } 95 96 shared void delegate() run = () { 97 // 98 // in the child thread: 99 // 1. start new fiber (task over wrapper) with user supplied function 100 // 2. start event loop forewer 101 // 3. when eventLoop done(stopped inside from wrapper) the task will finish 102 // 4. store value in box and use socketpair to send signal to caller thread 103 // 104 auto t = task(&_wrapper); 105 auto e = t.start(Fiber.Rethrow.no); 106 if ( box._exception is null ) { // box.exception can be filled before Fiber start 107 box._exception = e; 108 } 109 getDefaultLoop.run(Duration.max); 110 t.reset(); 111 }; 112 // Thread child = new Thread(run); 113 // child.start(); 114 // child.join(); 115 run(); 116 if (box._exception) 117 { 118 throw box._exception; 119 } 120 static if (!box.Void) 121 { 122 debug tracef("joined, value = %s", box._data); 123 return box._data; 124 } 125 else 126 { 127 debug tracef("joined"); 128 } 129 } 130 /// 131 /// spawn thread or fiber, caal function and return value 132 /// 133 // ReturnType!F callInThread(F, A...)(F f, A args) { 134 // // 135 // // When called inside from fiber we can and have to yield control to eventLoop 136 // // when called from thread (eventLoop is not active, we can yield only to another thread) 137 // // everything we can do is wait function for completion - just join cinld 138 // // 139 // if ( Fiber.getThis() ) 140 // return callFromFiber(f, args); 141 // else 142 // return callFromThread(f, args); 143 // } 144 145 // private ReturnType!F callFromFiber(F, A...)(F f, A args) { 146 // auto tid = Fiber.getThis(); 147 // assert(tid, "You can call this function only inside from Task"); 148 149 // alias R = ReturnType!F; 150 // enum Void = is(ReturnType!F==void); 151 // enum Nothrow = [__traits(getFunctionAttributes, f)].canFind("nothrow"); 152 // Box!R box; 153 // static if (!Void){ 154 // R r; 155 // } 156 157 // // create socketpair for inter-thread signalling 158 // box._pair = makeSocketPair(); 159 // scope(exit) { 160 // box._pair.close(); 161 // } 162 163 // /// 164 // /// fiber where we call function, store result of exception and stop eventloop when execution completed 165 // /// 166 // void _wrapper() { 167 // scope(exit) 168 // { 169 // getDefaultLoop().stop(); 170 // } 171 // try { 172 // static if (!Void) { 173 // r = f(args); 174 // box._data = r; 175 // } 176 // else { 177 // f(args); 178 // } 179 // } catch(shared(Exception) e) { 180 // box._exception = e; 181 // } 182 // } 183 184 // /// 185 // /// this is child thread where we start fiber and event loop 186 // /// when eventLoop completed signal parent thread and exit 187 // /// 188 // shared void delegate() run = () { 189 // // 190 // // in the child thread: 191 // // 1. start new fiber (task over wrapper) with user supplied function 192 // // 2. start event loop forewer 193 // // 3. when eventLoop done(stopped inside from wrapper) the task will finish 194 // // 4. store value in box and use socketpair to send signal to caller thread 195 // // 196 // auto t = task(&_wrapper); 197 // t.call(Fiber.Rethrow.no); 198 // getDefaultLoop.run(Duration.max); 199 // getDefaultLoop.deinit(); 200 // ubyte[1] b = [0]; 201 // auto s = box._pair.write(1, b); 202 // assert(t.ready); 203 // assert(t.state == Fiber.State.TERM); 204 // assert(s == 1); 205 // debug trace("child thread done"); 206 // }; 207 208 // Thread child = new Thread(run).start(); 209 // // 210 // // in the parent 211 // // add socketpair[0] to eventloop for reading and wait for data on it 212 // // yieldng until we receive data on the socketpair 213 // // on event handler - sop polling on pipe and join child thread 214 // // 215 // final class ThreadEventHandler : FileEventHandler { 216 // override void eventHandler(int fd, AppEvent e) @trusted 217 // { 218 // // 219 // // maybe we have to read here, but actually we need only info about data availability 220 // // so why read? 221 // // 222 // debug tracef("interthread signalling - read ready"); 223 // getDefaultLoop().stopPoll(box._pair[0], AppEvent.IN); 224 // child.join(); 225 // debug tracef("interthread signalling - thread joined"); 226 // auto throwable = tid.call(Fiber.Rethrow.no); 227 // } 228 // } 229 230 // // enable listening on socketpair[0] and yield 231 // getDefaultLoop().startPoll(box._pair[0], AppEvent.IN, new ThreadEventHandler()); 232 // Fiber.yield(); 233 234 // // child thread completed 235 // if ( box._exception ) { 236 // throw box._exception; 237 // } 238 // static if (!Void) { 239 // debug tracef("joined, value = %s", box._data); 240 // return box._data; 241 // } else { 242 // debug tracef("joined"); 243 // } 244 // } 245 246 // private ReturnType!F callFromThread(F, A...)(F f, A args) { 247 // auto tid = Fiber.getThis(); 248 // assert(tid is null, "You can't call this function from Task (or fiber)"); 249 250 // alias R = ReturnType!F; 251 // enum Void = is(ReturnType!F==void); 252 // enum Nothrow = [__traits(getFunctionAttributes, f)].canFind("nothrow"); 253 // Box!R box; 254 // static if (!Void){ 255 // R r; 256 // } 257 258 // void _wrapper() { 259 // scope(exit) 260 // { 261 // getDefaultLoop().stop(); 262 // } 263 // try { 264 // static if (!Void){ 265 // r = f(args); 266 // box._data = r; 267 // } 268 // else 269 // { 270 // //writeln("calling"); 271 // f(args); 272 // } 273 // } catch (shared(Exception) e) { 274 // box._exception = e; 275 // } 276 // } 277 278 // shared void delegate() run = () { 279 // // 280 // // in the child thread: 281 // // 1. start new fiber (task over wrapper) with user supplied function 282 // // 2. start event loop forewer 283 // // 3. when eventLoop done(stopped inside from wrapper) the task will finish 284 // // 4. store value in box and use socketpair to send signal to caller thread 285 // // 286 // auto t = task(&_wrapper); 287 // t.call(Fiber.Rethrow.no); 288 // getDefaultLoop.run(Duration.max); 289 // getDefaultLoop.deinit(); 290 // assert(t.ready); 291 // assert(t.state == Fiber.State.TERM); 292 // trace("child thread done"); 293 // }; 294 // Thread child = new Thread(run).start(); 295 // child.join(); 296 // if ( box._exception ) { 297 // throw box._exception; 298 // } 299 // static if (!Void) { 300 // debug tracef("joined, value = %s", box._data); 301 // return box._data; 302 // } else { 303 // debug tracef("joined"); 304 // } 305 // } 306 307 interface Computation { 308 bool ready(); 309 bool wait(Duration t = Duration.max); 310 } 311 312 /// 313 /// Run eventloop and task in separate thread. 314 /// Send what task returned or struct TaskNotReady if task not finished in time. 315 /// 316 auto spawnTask(T)(T task, Duration howLong = Duration.max) { 317 shared void delegate() run = () { 318 Tid owner = ownerTid(); 319 Throwable throwable = task.call(Fiber.Rethrow.no); 320 getDefaultLoop.run(howLong); 321 scope (exit) { 322 task.reset(); 323 getDefaultLoop.deinit(); 324 } 325 if ( !task.ready) { 326 owner.send(TaskNotReady("Task not finished in requested time")); 327 return; 328 } 329 330 assert(task.state == Fiber.State.TERM); 331 332 if ( throwable is null ) 333 { 334 static if (!task.Void) { 335 debug tracef("sending result %s", task.result); 336 owner.send(task.result); 337 } 338 else 339 { 340 // have to send something as user code must wait for anything for non-daemons 341 debug tracef("sending null"); 342 owner.send(null); 343 } 344 } 345 else 346 { 347 immutable e = new Exception(throwable.msg); 348 try 349 { 350 debug tracef("sending exception"); 351 owner.send(e); 352 } catch (Exception ee) 353 { 354 errorf("Exception %s when sending exception %s", ee, e); 355 } 356 } 357 debug tracef("task thread finished"); 358 }; 359 auto tid = spawn(run); 360 return tid; 361 } 362 363 unittest 364 { 365 globalLogLevel = LogLevel.info; 366 info("test spawnTask"); 367 auto t0 = task(function int (){ 368 getDefaultLoop().stop(); 369 return 41; 370 }); 371 auto t1 = task(function int (){ 372 hlSleep(200.msecs); 373 getDefaultLoop().stop(); 374 return 42; 375 }); 376 Tid tid = spawnTask(t0, 100.msecs); 377 receive( 378 (const int i) 379 { 380 assert(i == 41, "expected 41, got %s".format(i)); 381 // ok 382 }, 383 (Variant v) 384 { 385 errorf("test wait task got variant %s of type %s", v, v.type); 386 assert(0); 387 } 388 ); 389 tid = spawnTask(t1, 100.msecs); 390 receive( 391 (TaskNotReady e) { 392 // ok 393 }, 394 (Variant v) 395 { 396 errorf("test wait task got variant %s of type %s", v, v.type); 397 assert(0); 398 } 399 ); 400 } 401 402 /// 403 class Threaded(F, A...) : Computation if (isCallable!F) { 404 alias start = run; 405 private { 406 alias R = ReturnType!F; 407 408 F _f; 409 A _args; 410 bool _ready = false; 411 Thread _child; 412 bool _chind_joined; // did we called _child.join? 413 Fiber _parent; 414 Box!R _box; 415 Timer _t; 416 enum Void = _box.Void; 417 } 418 final this(F f, A args) { 419 _f = f; 420 _args = args; 421 _box._pair = makeSocketPair(); 422 } 423 424 override bool ready() { 425 return _ready; 426 } 427 static if (!Void) { 428 R value() { 429 if (_ready) 430 return _box._data; 431 throw new NotReadyException("You can't call value for non-ready task"); 432 } 433 } 434 override bool wait(Duration timeout = Duration.max) { 435 if (_ready) { 436 if ( !_chind_joined ) { 437 _child.join(); 438 _chind_joined = true; 439 } 440 if ( _box._exception ) { 441 throw _box._exception; 442 } 443 return true; 444 } 445 if ( timeout <= 0.seconds ) { 446 // this is poll 447 return _ready; 448 } 449 if ( timeout < Duration.max ) { 450 // rize timer 451 _t = new Timer(timeout, (AppEvent e) @trusted { 452 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN); 453 debug tracef("threaded timed out"); 454 auto throwable = _parent.call(Fiber.Rethrow.no); 455 _t = null; 456 }); 457 getDefaultLoop().startTimer(_t); 458 } 459 // wait on the pair 460 final class ThreadEventHandler : FileEventHandler 461 { 462 override void eventHandler(int fd, AppEvent e) @trusted 463 { 464 _box._pair.read(0, 1); 465 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN); 466 debug tracef("threaded done"); 467 if ( _t ) { 468 getDefaultLoop.stopTimer(_t); 469 _t = null; 470 } 471 auto throwable = _parent.call(Fiber.Rethrow.no); 472 } 473 } 474 _parent = Fiber.getThis(); 475 assert(_parent, "You can call this only trom fiber"); 476 debug tracef("wait - start listen on socketpair"); 477 //auto eh = new ThreadEventHandler(); 478 getDefaultLoop().startPoll(_box._pair[0], AppEvent.IN, new ThreadEventHandler()); 479 Fiber.yield(); 480 debug tracef("wait done"); 481 if ( _ready && !_chind_joined ) { 482 _child.join(); 483 _chind_joined = true; 484 } 485 return _ready; 486 } 487 488 final auto run() { 489 this._child = new Thread( 490 { 491 getDefaultLoop.deinit(); 492 uninitializeLoops(); 493 try { 494 debug trace("calling"); 495 static if (!Void) { 496 _box._data = App(_f, _args); 497 } 498 else { 499 App(_f, _args); 500 } 501 } 502 catch (Throwable e) { 503 _box._exception = e; 504 } 505 ubyte[1] b = [0]; 506 _ready = true; 507 auto s = _box._pair.write(1, b); 508 } 509 ); 510 this._child.start(); 511 return this; 512 } 513 } 514 515 class Task(F, A...) : Fiber, Computation if (isCallable!F) { 516 enum Void = is(ReturnType!F==void); 517 alias start = call; 518 private { 519 alias R = ReturnType!F; 520 521 F _f; 522 A _args; 523 bool _ready; 524 // Notification _done; 525 Fiber _waitor; 526 Throwable _exception; 527 528 static if ( !Void ) { 529 R _result; 530 } 531 } 532 533 final this(F f, A args) 534 { 535 _f = f; 536 _args = args; 537 _waitor = null; 538 _exception = null; 539 static if (!Void) { 540 _result = R.init; 541 } 542 super(&run); 543 } 544 545 /// 546 /// wait() - wait forewer 547 /// wait(Duration) - wait with timeout 548 /// 549 override bool wait(Duration timeout = Duration.max) { 550 //if ( state == Fiber.State.TERM ) 551 //{ 552 // throw new Exception("You can't wait on finished task"); 553 //} 554 // if ( _ready ) 555 // { 556 // if ( _exception !is null ) { 557 // throw _exception; 558 // } 559 // return true; 560 // } 561 if ( _ready || timeout <= 0.msecs ) 562 { 563 if ( _exception !is null ) { 564 throw _exception; 565 } 566 return _ready; 567 } 568 assert(this._waitor is null, "You can't wait twice"); 569 this._waitor = Fiber.getThis(); 570 assert(_waitor !is null, "You can wait task only from another task or fiber"); 571 Timer t; 572 if ( timeout > 0.msecs ) { 573 t = new Timer(timeout, (AppEvent e) @trusted { 574 auto w = _waitor; 575 _waitor = null; 576 w.call(Fiber.Rethrow.no); 577 }); 578 getDefaultLoop().startTimer(t); 579 } 580 debug tracef("yeilding task"); 581 Fiber.yield(); 582 if ( t ) 583 { 584 getDefaultLoop().stopTimer(t); 585 } 586 if ( _exception !is null ) { 587 throw _exception; 588 } 589 return _ready; 590 } 591 592 static if (!Void) { 593 auto waitResult() { 594 wait(); 595 enforce(_ready); 596 return _result; 597 } 598 } 599 600 override bool ready() const { 601 return _ready; 602 } 603 static if (!Void) { 604 @property 605 final auto result() const { 606 enforce!NotReadyException(_ready, "You can't get result from not ready task"); 607 return _result; 608 } 609 alias value = result; 610 } 611 private final void run() { 612 static if ( Void ) 613 { 614 try { 615 _f(_args); 616 } catch (Throwable e) { 617 _exception = e; 618 debug tracef("got throwable %s", e); 619 } 620 //debug tracef("run void finished, waitors: %s", this._waitor); 621 } 622 else 623 { 624 try { 625 _result = _f(_args); 626 } catch(Throwable e) { 627 _exception = e; 628 debug tracef("got throwable %s", e); 629 } 630 //debug tracef("run finished, result: %s, waitor: %s", _result, this._waitor); 631 } 632 this._ready = true; 633 if ( this._waitor ) { 634 auto w = this._waitor; 635 this._waitor = null; 636 w.call(); 637 } 638 } 639 } 640 641 auto task(F, A...)(F f, A a) { 642 return new Task!(F,A)(f, a); 643 } 644 645 auto threaded(F, A...)(F f, A a) { 646 return new Threaded!(F, A)(f, a); 647 } 648 649 unittest { 650 int i; 651 int f(int s) { 652 i+=s; 653 return(i); 654 } 655 auto t = task(&f, 1); 656 t.call(); 657 assert(t.result == 1); 658 assert(i==1, "i=%d, expected 1".format(i)); 659 assert(t.result == 1, "result: %d, expected 1".format(t.result)); 660 } 661 662 unittest { 663 auto v = App(function int() { 664 Duration f(Duration t) 665 { 666 hlSleep(t); 667 return t; 668 } 669 670 auto t100 = task(&f, 100.msecs); 671 auto t200 = task(&f, 200.msecs); 672 t100.start; 673 t200.start; 674 t100.wait(); 675 return 1; 676 }); 677 assert(v == 1); 678 } 679 680 unittest 681 { 682 globalLogLevel = LogLevel.info; 683 auto v = App(function int() { 684 Duration f(Duration t) 685 { 686 hlSleep(t); 687 return t; 688 } 689 690 auto t100 = threaded(&f, 100.msecs).start; 691 auto t200 = threaded(&f, 200.msecs).start; 692 t200.wait(100.msecs); 693 assert(!t200.ready); 694 t100.wait(300.msecs); 695 assert(t100.ready); 696 assert(t100.value == 100.msecs); 697 t200.wait(); 698 return 1; 699 }); 700 assert(v == 1); 701 } 702 703 // unittest { 704 // // 705 // // two tasks and spawned thread under event loop 706 // // 707 // globalLogLevel = LogLevel.info; 708 // auto mode = globalLoopMode; 709 // foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 710 // globalLoopMode = m; 711 712 // int counter1 = 10; 713 // int counter2 = 20; 714 // int f0() { 715 // hlSleep(1.seconds); 716 // return 1; 717 // } 718 // void f1() { 719 // while(--counter1 > 0) { 720 // hlSleep(100.msecs); 721 // } 722 // } 723 // void f2() { 724 // while(--counter2 > 0) { 725 // hlSleep(50.msecs); 726 // } 727 // } 728 // void f3() { 729 // auto t1 = task(&f1); 730 // auto t2 = task(&f2); 731 // t1.start(); 732 // t2.start(); 733 // auto v = callInThread(&f0); 734 // // 735 // // t1 and t2 job must be done at this time 736 // // 737 // assert(counter1 == 0); 738 // assert(counter2 == 0); 739 // t1.wait(); 740 // t2.wait(); 741 // getDefaultLoop().stop(); 742 // } 743 // auto t3 = task(&f3); 744 // t3.start(); 745 // getDefaultLoop().run(1.seconds); 746 // infof("test0 ok in %s mode", m); 747 // } 748 // globalLoopMode = mode; 749 // } 750 751 unittest { 752 // 753 // just to test that we received correct value at return 754 // 755 globalLogLevel = LogLevel.info; 756 auto mode = globalLoopMode; 757 scope(exit) { 758 globalLoopMode = mode; 759 } 760 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 761 globalLoopMode = m; 762 int f() { 763 return 1; 764 } 765 auto v = App(&f); 766 assert(v == 1, "expected v==1, but received v=%d".format(v)); 767 infof("test1 ok in %s mode", m); 768 } 769 } 770 771 unittest { 772 // 773 // call sleep in spawned thread 774 // 775 globalLogLevel = LogLevel.info; 776 auto mode = globalLoopMode; 777 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 778 globalLoopMode = m; 779 int f() { 780 hlSleep(200.msecs); 781 return 2; 782 } 783 auto v = App(&f); 784 assert(v == 2, "expected v==2, but received v=%d".format(v)); 785 infof("test2 ok in %s mode", m); 786 } 787 globalLoopMode = mode; 788 } 789 790 version(unittest) { 791 class TestException : Exception { 792 this(string msg, string file = __FILE__, size_t line = __LINE__) { 793 super(msg, file, line); 794 } 795 } 796 } 797 798 unittest { 799 // 800 // test exception delivery when called from thread 801 // 802 globalLogLevel = LogLevel.info; 803 auto mode = globalLoopMode; 804 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 805 globalLoopMode = m; 806 int f() { 807 hlSleep(200.msecs); 808 throw new TestException("test exception"); 809 } 810 assertThrown!TestException(App(&f)); 811 infof("test3a ok in %s mode", m); 812 } 813 globalLoopMode = mode; 814 } 815 816 unittest { 817 // 818 // test exception delivery when called from task 819 // 820 globalLogLevel = LogLevel.info; 821 auto mode = globalLoopMode; 822 scope(exit) { 823 globalLoopMode = mode; 824 } 825 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 826 globalLoopMode = m; 827 int f() { 828 auto t = task((){ 829 hlSleep(200.msecs); 830 throw new TestException("test exception"); 831 }); 832 t.start(); 833 t.wait(300.msecs); 834 return 0; 835 } 836 assertThrown!TestException(App(&f)); 837 infof("test3b ok in %s mode", m); 838 } 839 } 840 841 unittest { 842 // 843 // test wait with timeout 844 // 845 globalLogLevel = LogLevel.info; 846 auto mode = globalLoopMode; 847 scope(exit) { 848 globalLoopMode = mode; 849 } 850 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 851 globalLoopMode = m; 852 int f0() { 853 hlSleep(100.msecs); 854 return 4; 855 } 856 int f() { 857 auto t = task(&f0); 858 t.call(); 859 t.wait(); 860 return t.result; 861 } 862 auto r = App(&f); 863 assert(r == 4, "spawnTask returned %d, expected 4".format(r)); 864 infof("test4 ok in %s mode", m); 865 } 866 } 867 868 unittest { 869 // 870 // test calling void function 871 // 872 globalLogLevel = LogLevel.info; 873 auto mode = globalLoopMode; 874 scope(exit) { 875 globalLoopMode = mode; 876 } 877 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 878 globalLoopMode = m; 879 void f() { 880 hlSleep(200.msecs); 881 } 882 App(&f); 883 infof("test6 ok in %s mode", m); 884 } 885 } 886 887 888 unittest { 889 globalLogLevel = LogLevel.info; 890 //auto oScheduler = scheduler; 891 //scheduler = new MyScheduler(); 892 auto mode = globalLoopMode; 893 scope(exit) { 894 globalLoopMode = mode; 895 } 896 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 897 globalLoopMode = m; 898 int f0() { 899 hlSleep(100.msecs); 900 tracef("sleep done"); 901 return 6; 902 } 903 int f() { 904 auto v = App(&f0); 905 tracef("got value %s", v); 906 return v+1; 907 } 908 auto r = App(&f); 909 assert(r == 7, "spawnTask returned %d, expected 7".format(r)); 910 infof("test7 ok in %s mode", m); 911 } 912 } 913 914 //////// 915 916 // unittest { 917 // info("=== test wait task ==="); 918 // //auto oScheduler = scheduler; 919 // //scheduler = new MyScheduler(); 920 921 // globalLogLevel = LogLevel.info; 922 923 // auto mode = globalLoopMode; 924 // scope(exit) { 925 // globalLoopMode = mode; 926 // } 927 // foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 928 // globalLoopMode = m; 929 // int f1(Duration d) { 930 // hlSleep(d); 931 // return 40; 932 // } 933 934 // int f2(Duration d) { 935 // auto t = task(&f1, d); 936 // t.call(); 937 // t.wait(); 938 // return t.result; 939 // } 940 941 // auto t = task(&f2, 500.msecs); 942 943 // auto tid = spawnTask(t, 1.seconds); 944 945 // receive( 946 // (const int i) 947 // { 948 // assert(i == 40, "expected 40, got %s".format(i)); 949 // }, 950 // (Variant v) 951 // { 952 // errorf("test wait task got variant %s of type %s", v, v.type); 953 // assert(0); 954 // } 955 // ); 956 // infof("ok in %s mode", m); 957 // } 958 // // 959 // //scheduler = oScheduler; 960 // } 961 962 // unittest { 963 // info("=== test wait task with timeout ==="); 964 // // 965 // // we call f2 which start f1(sleeping for 500 msecs) and wait it for 100 msecs 966 // // so 967 // globalLogLevel = LogLevel.trace; 968 // auto mode = globalLoopMode; 969 // scope(exit) { 970 // globalLoopMode = mode; 971 // } 972 // foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 973 // globalLoopMode = m; 974 975 // int f1(Duration d) { 976 // hlSleep(d); 977 // return 41; 978 // } 979 980 // bool f2(Duration d) { 981 // auto t = task(&f1, d); 982 // t.call(); 983 // bool ready = t.wait(100.msecs); 984 // assert(!t.ready); 985 // return ready; 986 // } 987 988 // auto t = task(&f2, 500.msecs); 989 // spawnTask(t, 1.seconds); 990 // receive( 991 // (Exception e) {tracef("got exception"); assert(0);}, 992 // (const bool b) {assert(!b, "got value %s instedad of false".format(b));}, 993 // (Variant v) {tracef("got variant %s", v); assert(0);} 994 // ); 995 // infof("ok in %s mode", m); 996 // } 997 // } 998 999 // class SharedNotificationChannel : FileEventHandler { 1000 // import containers.slist, containers.hashmap; 1001 // import std.experimental.logger; 1002 1003 // private { 1004 // struct SubscriptionInfo { 1005 // hlEvLoop _loop; 1006 // immutable int _loop_id; // event loop id where subscriber reside 1007 // immutable HandlerDelegate _h; 1008 // } 1009 // package shared int snc_id; 1010 // immutable int _id; 1011 // shared Mutex _subscribers_lock; 1012 1013 // SList!SubscriptionInfo _subscribers; 1014 // } 1015 1016 // this() @safe { 1017 // import core.atomic; 1018 // _subscribers_lock = new shared Mutex; 1019 // _id = atomicOp!"+="(snc_id, 1); 1020 // } 1021 // void broadcast() @safe @nogc { 1022 // _subscribers_lock.lock_nothrow(); 1023 // scope(exit) { 1024 // _subscribers_lock.unlock_nothrow(); 1025 // } 1026 1027 // foreach(destination; _subscribers) { 1028 // version(OSX) { 1029 // import core.sys.darwin.sys.event; 1030 1031 // kevent_t user_event; 1032 // immutable remote_kqueue_fd = destination._loop_id; 1033 // with (user_event) { 1034 // ident = _id; 1035 // filter = EVFILT_USER; 1036 // flags = 0; 1037 // fflags = NOTE_TRIGGER; 1038 // data = 0; 1039 // udata = null; 1040 // } 1041 // auto rc = (() @trusted => kevent(remote_kqueue_fd, cast(kevent_t*)&user_event, 1, null, 0, null))(); 1042 // } 1043 // version(linux) { 1044 // import core.sys.posix.unistd: write; 1045 // import core.stdc.string: strerror; 1046 // import core.stdc.errno: errno; 1047 // import std.string; 1048 1049 // auto rc = (() @trusted => write(destination._loop_id, &_id, 8))(); 1050 // if ( rc == -1 ) { 1051 // //errorf("event_fd write to %d returned error %s", destination, fromStringz(strerror(errno))); 1052 // } 1053 // } 1054 // } 1055 // } 1056 1057 // override void eventHandler(int _loop_id, AppEvent e) { 1058 // tracef("process user event handler on fd %d", _loop_id); 1059 // version(linux) { 1060 // import core.sys.posix.unistd: read; 1061 // ulong v; 1062 // auto rc = (() @trusted => read(_loop_id, &v, 8))(); 1063 // } 1064 // _subscribers_lock.lock_nothrow(); 1065 // scope(exit) { 1066 // _subscribers_lock.unlock_nothrow(); 1067 // } 1068 // foreach(s; _subscribers) { 1069 // if ( _loop_id != s._loop_id ) { 1070 // continue; 1071 // } 1072 // auto h = s._h; 1073 // h(e); 1074 // } 1075 // } 1076 1077 // void signal() @trusted { 1078 // _subscribers_lock.lock_nothrow(); 1079 // scope(exit) { 1080 // _subscribers_lock.unlock_nothrow(); 1081 // } 1082 // if ( _subscribers.empty ) { 1083 // trace("send signal - no subscribers"); 1084 // return; 1085 // } 1086 // auto destination = _subscribers.front(); 1087 1088 // version(OSX) { 1089 // import core.sys.darwin.sys.event; 1090 // kevent_t user_event; 1091 // immutable remote_kqueue_fd = destination._loop_id; 1092 // with (user_event) { 1093 // ident = _id; 1094 // filter = EVFILT_USER; 1095 // flags = 0; 1096 // fflags = NOTE_TRIGGER; 1097 // data = 0; 1098 // udata = null; 1099 // } 1100 // int rc = (() @trusted => kevent(remote_kqueue_fd, cast(kevent_t*)&user_event, 1, null, 0, null))(); 1101 // tracef("signal trigger rc to remote_kqueue_fd %d: %d", remote_kqueue_fd, rc); 1102 // enforce(rc>=0, "Failed to trigger event"); 1103 // } 1104 // version(linux) { 1105 // import core.sys.posix.unistd: write; 1106 // import core.stdc.string: strerror; 1107 // import core.stdc.errno: errno; 1108 // import std.string; 1109 1110 // auto rc = (() @trusted => write(destination._loop_id, &_id, 8))(); 1111 // debug tracef("event_fd %d write = %d", destination._loop_id, rc); 1112 // if ( rc == -1 ) { 1113 // errorf("event_fd write to %d returned error %s", destination, fromStringz(strerror(errno))); 1114 // } 1115 // } 1116 // } 1117 // auto subscribe(hlEvLoop loop, HandlerDelegate handler) @safe { 1118 // version(OSX) { 1119 // immutable event_fd = getDefaultLoop().getKernelId(); 1120 // //import core.sys.posix.fcntl: open; 1121 // //immutable event_fd = (() @trusted => open("/dev/null", 0))(); 1122 // SubscriptionInfo s = SubscriptionInfo(loop, event_fd, handler); 1123 // loop.waitForUserEvent(_id, this); 1124 // } 1125 // version(linux) { 1126 // import core.sys.linux.sys.eventfd; 1127 // immutable event_fd = (() @trusted => eventfd(0,EFD_NONBLOCK))(); 1128 // SubscriptionInfo s = SubscriptionInfo(loop, event_fd, handler); 1129 // loop.waitForUserEvent(event_fd, this); 1130 // } 1131 // synchronized(_subscribers_lock) { 1132 // _subscribers.put(s); 1133 // } 1134 // tracef("subscribers length = %d", _subscribers.length()); 1135 // return s; 1136 // } 1137 1138 // void unsubscribe(in SubscriptionInfo s) { 1139 // s._loop.stopWaitForUserEvent(_id, this); 1140 // synchronized(_subscribers_lock) { 1141 // _subscribers.remove(s); 1142 // version(linux) { 1143 // import core.sys.posix.unistd: close; 1144 // close(s._loop_id); 1145 // } 1146 // } 1147 // } 1148 1149 // auto register(hlEvLoop loop, HandlerDelegate handler) { 1150 // return subscribe(loop, handler); 1151 // } 1152 1153 // void deregister(SubscriptionInfo s) { 1154 // unsubscribe(s); 1155 // } 1156 1157 // void close() @safe @nogc { 1158 // } 1159 // } 1160 1161 // unittest { 1162 // // 1163 // // we call f2 which start f1(sleeping for 500 msecs) and wait it for 100 msecs 1164 // // so 1165 // globalLogLevel = LogLevel.info; 1166 // auto mode = globalLoopMode; 1167 // scope(exit) { 1168 // globalLoopMode = mode; 1169 // } 1170 // foreach(m; [Mode.NATIVE]) { 1171 // globalLoopMode = m; 1172 // infof("=== test shared notification channel signal in %s mode ===", m); 1173 // auto snc = new SharedNotificationChannel(); 1174 // scope(exit) { 1175 // snc.close(); 1176 // } 1177 // int test_value; 1178 // void signal_poster() { 1179 // hlSleep(100.msecs); 1180 // tracef("send signal"); 1181 // snc.signal(); 1182 // } 1183 // int signal_receiver() { 1184 // int test = 0; 1185 // HandlerDelegate h = (AppEvent e) { 1186 // tracef("shared notificatioin delivered"); 1187 // test = 1; 1188 // }; 1189 // hlEvLoop loop = getDefaultLoop(); 1190 // auto s = snc.register(loop, h); 1191 // hlSleep(200.msecs); 1192 // snc.deregister(s); 1193 // return test; 1194 // } 1195 // auto tp = task({ 1196 // callInThread(&signal_poster); 1197 // }); 1198 // auto tr = task({ 1199 // test_value = callInThread(&signal_receiver); 1200 // getDefaultLoop().stop(); 1201 // }); 1202 // tp.call(); 1203 // tr.call(); 1204 // getDefaultLoop().run(3000.msecs); 1205 // assert(test_value == 1, "expected 1, got %s".format(test_value)); 1206 // } 1207 // } 1208 1209 // unittest { 1210 // info("=== test shared notification channel broacast ==="); 1211 // // 1212 // // we call f2 which start f1(sleeping for 500 msecs) and wait it for 100 msecs 1213 // // so 1214 // globalLogLevel = LogLevel.info; 1215 // auto snc = new SharedNotificationChannel(); 1216 // scope(exit) { 1217 // snc.close(); 1218 // } 1219 // int test_value; 1220 // shared Mutex lock = new shared Mutex; 1221 1222 // void signal_poster() { 1223 // hlSleep(100.msecs); 1224 // snc.broadcast(); 1225 // tracef("shared notificatioin broadcasted"); 1226 // } 1227 // void signal_receiver1() { 1228 // HandlerDelegate h = (AppEvent e) { 1229 // synchronized(lock) { 1230 // test_value++; 1231 // } 1232 // tracef("shared notificatioin delivered 1 - %d", test_value); 1233 // }; 1234 // //class nHandler : FileEventHandler { 1235 // // override void eventHandler(int fd, AppEvent e) { 1236 // // tracef("shared notificatioin delivered 1"); 1237 // // synchronized(lock) { 1238 // // test_value++; 1239 // // } 1240 // // } 1241 // //} 1242 // //auto h = new nHandler(); 1243 // hlEvLoop loop = getDefaultLoop(); 1244 // auto s = snc.register(loop, h); 1245 // hlSleep(200.msecs); 1246 // snc.deregister(s); 1247 // } 1248 // void signal_receiver2() { 1249 // HandlerDelegate h = (AppEvent e) { 1250 // synchronized(lock) { 1251 // test_value++; 1252 // } 1253 // tracef("shared notificatioin delivered 2 - %d", test_value); 1254 // }; 1255 // //class nHandler : FileEventHandler { 1256 // // override void eventHandler(int fd, AppEvent e) { 1257 // // tracef("shared notificatioin delivered 2"); 1258 // // synchronized(lock) { 1259 // // test_value++; 1260 // // } 1261 // // } 1262 // //} 1263 // //auto h = new nHandler(); 1264 // hlEvLoop loop = getDefaultLoop(); 1265 // auto s = snc.register(loop, h); 1266 // hlSleep(200.msecs); 1267 // snc.deregister(s); 1268 // } 1269 // auto tp = task({ 1270 // callInThread(&signal_poster); 1271 // }); 1272 // auto tr1 = task({ 1273 // callInThread(&signal_receiver1); 1274 // }); 1275 // auto tr2 = task({ 1276 // callInThread(&signal_receiver2); 1277 // }); 1278 // tp.call(); 1279 // tr1.call(); 1280 // tr2.call(); 1281 // getDefaultLoop().run(500.msecs); 1282 // assert(test_value == 2); 1283 // } 1284 1285 // 1286 // split array on N balanced chunks 1287 // (not on chunks with N members) 1288 // 1289 private auto splitn(T)(T a, size_t slices) { 1290 T[] r; 1291 if (a.length == 0) { 1292 return r; 1293 } 1294 if (a.length % slices == 0) { 1295 return chunks(a, a.length / slices).array; 1296 } 1297 int n; 1298 while (n < a.length) { 1299 auto rest = a.length - n; 1300 auto done = slices - r.length; 1301 auto size = rest % done ? (rest / done + 1) : rest / done; 1302 r ~= a[n .. n + size]; 1303 n += size; 1304 } 1305 return r; 1306 } 1307 unittest { 1308 for(int n=1; n<100; n++) { 1309 for (int slices = 1; slices < n; slices++) { 1310 auto r = splitn(iota(n).array, slices); 1311 assert(r.length == slices); 1312 assert(equal(iota(n), r.join)); 1313 } 1314 } 1315 } 1316 1317 // Map array on M threads and N fibers 1318 // Non lazy. Return void if f is void. 1319 // : 1320 // /|\ 1321 // / | \ 1322 // ->/ | \<- M threads 1323 // / | \ 1324 // N N N 1325 // /|\ /|\ /|\ 1326 // ||| ||| ||| 1327 // ||| |||->|||<- N fibers 1328 // fff fff fff 1329 // ... ... ... 1330 // ... ... ... <- r splitted over MxN fibers 1331 // ... ... .. 1332 // 1333 auto mapMxN(F, R)(R r, F f, ulong m, ulong n) { 1334 enum Void = is(ReturnType!F == void); 1335 1336 assert(m > 0 && n > 0 && r.length > 0); 1337 1338 m = min(m, r.length); 1339 1340 auto fiberWorker(R fiber_chunk) { 1341 static if (!Void) { 1342 return fiber_chunk.map!(f).array; 1343 } else { 1344 fiber_chunk.each!f; 1345 } 1346 } 1347 1348 auto threadWorker(R thread_chunk) { 1349 auto fibers = thread_chunk.splitn(n). // split on N chunks 1350 map!(fiber_chunk => task(&fiberWorker, fiber_chunk)). // start fiber over each chunk 1351 array; 1352 fibers.each!"a.start"; 1353 fibers.each!"a.wait"; 1354 static if (!Void) { 1355 return fibers.map!"a.value".array.join; 1356 } 1357 } 1358 auto threads = r.splitn(m). // split on M chunks 1359 map!(thread_chunk => threaded(&threadWorker, thread_chunk)). // start thread over each chunk 1360 array; 1361 threads.each!"a.start"; 1362 threads.each!"a.wait"; 1363 static if (!Void) { 1364 return threads.map!"a.value".array.join; 1365 } 1366 } 1367 1368 // map array on M threads 1369 // Non lazy. Return void if f is void. 1370 // : 1371 // /|\ 1372 // / | \ 1373 // ->/ | \<- M threads 1374 // / | \ 1375 // f f f 1376 // . . . 1377 // . . . <- r splitted over M threads 1378 // . . 1379 // 1380 auto mapM(R, F)(R r, F f, ulong m) if (isArray!R) { 1381 enum Void = is(ReturnType!F == void); 1382 1383 assert(m > 0 && r.length > 0); 1384 1385 m = min(m, r.length); 1386 1387 static if (Void) { 1388 void threadWorker(R chunk) { 1389 chunk.each!f; 1390 } 1391 } else { 1392 auto threadWorker(R chunk) { 1393 return chunk.map!f.array; 1394 } 1395 } 1396 1397 auto threads = r.splitn(m).map!(thread_chunk => threaded(&threadWorker, thread_chunk).start).array; 1398 1399 threads.each!"a.wait"; 1400 1401 static if (!Void) { 1402 return threads.map!"a.value".array.join; 1403 } 1404 } 1405 1406 unittest { 1407 import std.range; 1408 import std.stdio; 1409 import core.atomic; 1410 1411 shared int cnt; 1412 1413 void f0(int arg) { 1414 atomicOp!"+="(cnt,arg); 1415 } 1416 1417 int f1(int i) { 1418 return i * i; 1419 } 1420 1421 int[] f2(int i) { 1422 return [i,i+1]; 1423 } 1424 1425 App({ 1426 // woid function, updates shared counter 1427 iota(20).array.mapMxN(&f0, 2, 3); 1428 assert(cnt == 190); 1429 }); 1430 1431 cnt = 0; 1432 App({ 1433 // woid function, updates shared counter 1434 iota(20).array.mapM(&f0, 5); 1435 assert(cnt == 190); 1436 }); 1437 1438 App({ 1439 auto r = iota(20).array.mapMxN(&f1, 2, 3); 1440 assert(equal(r, iota(20).map!"a*a")); 1441 }); 1442 1443 App({ 1444 auto r = iota(20).array.mapM(&f1, 5); 1445 assert(equal(r, iota(20).map!"a*a")); 1446 }); 1447 1448 App({ 1449 auto r = iota(20).array.mapM(&f2, 5); 1450 assert(equal(r, iota(20).map!"[a, a+1]")); 1451 }); 1452 }