1 module hio.scheduler;
2 
3 import std.experimental.logger;
4 
5 import core.thread;
6 import core.sync.mutex;
7 import std.concurrency;
8 import std.datetime;
9 import std.format;
10 import std.traits;
11 import std.exception;
12 import core.sync.condition;
13 import std.algorithm;
14 import std.typecons;
15 import std.range;
16 
17 //import core.stdc.string;
18 //import core.stdc.errno;
19 
20 //static import core.sys.posix.unistd;
21 
22 import hio.events;
23 import hio.loop;
24 import hio.common;
25 
26 import std.stdio;
27 
28 struct TaskNotReady {
29     string msg;
30 }
31 
32 class NotReadyException : Exception
33 {
34     this(string msg, string file = __FILE__, size_t line = __LINE__)
35     {
36         super(msg, file, line);
37     }
38 }
39 
40 void hlSleep(Duration d) {
41     if ( d <= 0.seconds) {
42         return;
43     }
44     auto tid = Fiber.getThis();
45     assert(tid !is null);
46     auto callback = delegate void (AppEvent e) @trusted
47     {
48         tid.call(Fiber.Rethrow.no);
49     };
50     auto t = new Timer(d, callback);
51     getDefaultLoop().startTimer(t);
52     Fiber.yield();
53 }
54 
55 struct Box(T) {
56 
57     enum Void = is(T == void);
58 
59     static if (!Void) {
60         T   _data;
61     }
62     SocketPair          _pair;
63     Throwable   _exception;
64     @disable this(this);
65 }
66 
67 ReturnType!F App(F, A...) (F f, A args) {
68     alias R = ReturnType!F;
69     Box!R box;
70     static if (!box.Void)
71     {
72         R r;
73     }
74     void _wrapper()
75     {
76         try
77         {
78             static if (!box.Void)
79             {
80                 r = f(args);
81                 box._data = r;
82             }
83             else
84             {
85                 f(args);
86             }
87         }
88         catch (Throwable e)
89         {
90             debug tracef("app throwed %s", e);
91             box._exception = e;
92         }
93         getDefaultLoop().stop();
94     }
95 
96     shared void delegate() run = () {
97         //
98         // in the child thread:
99         // 1. start new fiber (task over wrapper) with user supplied function
100         // 2. start event loop forewer
101         // 3. when eventLoop done(stopped inside from wrapper) the task will finish
102         // 4. store value in box and use socketpair to send signal to caller thread
103         //
104         auto t = task(&_wrapper);
105         auto e = t.start(Fiber.Rethrow.no);
106         if ( box._exception is null ) { // box.exception can be filled before Fiber start
107             box._exception = e;
108         }
109         getDefaultLoop.run(Duration.max);
110         t.reset();
111     };
112     // Thread child = new Thread(run);
113     // child.start();
114     // child.join();
115     run();
116     if (box._exception)
117     {
118         throw box._exception;
119     }
120     static if (!box.Void)
121     {
122         debug tracef("joined, value = %s", box._data);
123         return box._data;
124     }
125     else
126     {
127         debug tracef("joined");
128     }
129 }
130 ///
131 /// spawn thread or fiber, caal function and return value
132 ///
133 // ReturnType!F callInThread(F, A...)(F f, A args) {
134 //     //
135 //     // When called inside from fiber we can and have to yield control to eventLoop
136 //     // when called from thread (eventLoop is not active, we can yield only to another thread)
137 //     // everything we can do is wait function for completion - just join cinld
138 //     //
139 //     if ( Fiber.getThis() )
140 //         return callFromFiber(f, args);
141 //     else
142 //         return callFromThread(f, args);
143 // }
144 
145 // private ReturnType!F callFromFiber(F, A...)(F f, A args) {
146 //     auto tid = Fiber.getThis();
147 //     assert(tid, "You can call this function only inside from Task");
148 
149 //     alias R = ReturnType!F;
150 //     enum  Void = is(ReturnType!F==void);
151 //     enum  Nothrow = [__traits(getFunctionAttributes, f)].canFind("nothrow");
152 //     Box!R box;
153 //     static if (!Void){
154 //         R   r;
155 //     }
156 
157 //     // create socketpair for inter-thread signalling
158 //     box._pair = makeSocketPair();
159 //     scope(exit) {
160 //         box._pair.close();
161 //     }
162 
163 //     ///
164 //     /// fiber where we call function, store result of exception and stop eventloop when execution completed
165 //     ///
166 //     void _wrapper() {
167 //         scope(exit)
168 //         {
169 //             getDefaultLoop().stop();
170 //         }
171 //         try {
172 //             static if (!Void) {
173 //                 r = f(args);
174 //                 box._data = r;
175 //             }
176 //             else {
177 //                 f(args);
178 //             }
179 //         } catch(shared(Exception) e) {
180 //             box._exception = e;
181 //         }
182 //     }
183 
184 //     ///
185 //     /// this is child thread where we start fiber and event loop
186 //     /// when eventLoop completed signal parent thread and exit
187 //     ///
188 //     shared void delegate() run = () {
189 //         //
190 //         // in the child thread:
191 //         // 1. start new fiber (task over wrapper) with user supplied function
192 //         // 2. start event loop forewer
193 //         // 3. when eventLoop done(stopped inside from wrapper) the task will finish
194 //         // 4. store value in box and use socketpair to send signal to caller thread
195 //         //
196 //         auto t = task(&_wrapper);
197 //         t.call(Fiber.Rethrow.no);
198 //         getDefaultLoop.run(Duration.max);
199 //         getDefaultLoop.deinit();
200 //         ubyte[1] b = [0];
201 //         auto s = box._pair.write(1, b);
202 //         assert(t.ready);
203 //         assert(t.state == Fiber.State.TERM);
204 //         assert(s == 1);
205 //         debug trace("child thread done");
206 //     };
207 
208 //     Thread child = new Thread(run).start();
209 //     //
210 //     // in the parent
211 //     // add socketpair[0] to eventloop for reading and wait for data on it
212 //     // yieldng until we receive data on the socketpair
213 //     // on event handler - sop polling on pipe and join child thread
214 //     //
215 //     final class ThreadEventHandler : FileEventHandler {
216 //         override void eventHandler(int fd, AppEvent e) @trusted
217 //         {
218 //             //
219 //             // maybe we have to read here, but actually we need only info about data availability
220 //             // so why read?
221 //             //
222 //             debug tracef("interthread signalling - read ready");
223 //             getDefaultLoop().stopPoll(box._pair[0], AppEvent.IN);
224 //             child.join();
225 //             debug tracef("interthread signalling - thread joined");
226 //             auto throwable = tid.call(Fiber.Rethrow.no);
227 //         }
228 //     }
229 
230 //     // enable listening on socketpair[0] and yield
231 //     getDefaultLoop().startPoll(box._pair[0], AppEvent.IN, new ThreadEventHandler());
232 //     Fiber.yield();
233 
234 //     // child thread completed
235 //     if ( box._exception ) {
236 //         throw box._exception;
237 //     }
238 //     static if (!Void) {
239 //         debug tracef("joined, value = %s", box._data);
240 //         return box._data;
241 //     } else {
242 //         debug tracef("joined");
243 //     }
244 // }
245 
246 // private ReturnType!F callFromThread(F, A...)(F f, A args) {
247 //     auto tid = Fiber.getThis();
248 //     assert(tid is null, "You can't call this function from Task (or fiber)");
249 
250 //     alias R = ReturnType!F;
251 //     enum  Void = is(ReturnType!F==void);
252 //     enum  Nothrow = [__traits(getFunctionAttributes, f)].canFind("nothrow");
253 //     Box!R box;
254 //     static if (!Void){
255 //         R   r;
256 //     }
257 
258 //     void _wrapper() {
259 //         scope(exit)
260 //         {
261 //             getDefaultLoop().stop();
262 //         }
263 //         try {
264 //             static if (!Void){
265 //                 r = f(args);
266 //                 box._data = r;
267 //             }
268 //             else
269 //             {
270 //                 //writeln("calling");
271 //                 f(args);
272 //             }
273 //         } catch (shared(Exception) e) {
274 //             box._exception = e;
275 //         }
276 //     }
277 
278 //     shared void delegate() run = () {
279 //         //
280 //         // in the child thread:
281 //         // 1. start new fiber (task over wrapper) with user supplied function
282 //         // 2. start event loop forewer
283 //         // 3. when eventLoop done(stopped inside from wrapper) the task will finish
284 //         // 4. store value in box and use socketpair to send signal to caller thread
285 //         //
286 //         auto t = task(&_wrapper);
287 //         t.call(Fiber.Rethrow.no);
288 //         getDefaultLoop.run(Duration.max);
289 //         getDefaultLoop.deinit();
290 //         assert(t.ready);
291 //         assert(t.state == Fiber.State.TERM);
292 //         trace("child thread done");
293 //     };
294 //     Thread child = new Thread(run).start();
295 //     child.join();
296 //     if ( box._exception ) {
297 //         throw box._exception;
298 //     }
299 //     static if (!Void) {
300 //         debug tracef("joined, value = %s", box._data);
301 //         return box._data;
302 //     } else {
303 //         debug tracef("joined");
304 //     }
305 // }
306 
307 interface Computation {
308     bool ready();
309     bool wait(Duration t = Duration.max);
310 }
311 
312 ///
313 /// Run eventloop and task in separate thread.
314 /// Send what task returned or struct TaskNotReady if task not finished in time.
315 ///
316 auto spawnTask(T)(T task, Duration howLong = Duration.max) {
317     shared void delegate() run = () {
318         Tid owner = ownerTid();
319         Throwable throwable = task.call(Fiber.Rethrow.no);
320         getDefaultLoop.run(howLong);
321         scope (exit) {
322             task.reset();
323             getDefaultLoop.deinit();
324         }
325         if ( !task.ready) {
326             owner.send(TaskNotReady("Task not finished in requested time"));
327             return;
328         }
329 
330         assert(task.state == Fiber.State.TERM);
331 
332         if ( throwable is null )
333         {
334             static if (!task.Void) {
335                 debug tracef("sending result %s", task.result);
336                 owner.send(task.result);
337             }
338             else
339             {
340                 // have to send something as user code must wait for anything for non-daemons
341                 debug tracef("sending null");
342                 owner.send(null);
343             }
344         }
345         else
346         {
347             immutable e = new Exception(throwable.msg);
348             try
349             {
350                 debug tracef("sending exception");
351                 owner.send(e);
352             } catch (Exception ee)
353             {
354                 errorf("Exception %s when sending exception %s", ee, e);
355             }
356         }
357         debug tracef("task thread finished");
358     };
359     auto tid = spawn(run);
360     return tid;
361 }
362 
363 unittest
364 {
365     globalLogLevel = LogLevel.info;
366     info("test spawnTask");
367     auto t0 = task(function int (){
368         getDefaultLoop().stop();
369         return 41;
370     });
371     auto t1 = task(function int (){
372         hlSleep(200.msecs);
373         getDefaultLoop().stop();
374         return 42;
375     });
376     Tid tid = spawnTask(t0, 100.msecs);
377     receive(
378         (const int i)
379         {
380             assert(i == 41, "expected 41, got %s".format(i));
381             // ok
382         },
383         (Variant v)
384         {
385             errorf("test wait task got variant %s of type %s", v, v.type);
386             assert(0);
387         }
388     );
389     tid = spawnTask(t1, 100.msecs);
390     receive(
391         (TaskNotReady e) {
392             // ok
393         },
394         (Variant v)
395         {
396             errorf("test wait task got variant %s of type %s", v, v.type);
397             assert(0);
398         }
399     );
400 }
401 
402 ///
403 class Threaded(F, A...) : Computation if (isCallable!F) {
404     alias start = run;
405     private {
406         alias R = ReturnType!F;
407 
408         F       _f;
409         A       _args;
410         bool    _ready = false;
411         Thread  _child;
412         bool    _chind_joined; // did we called _child.join?
413         Fiber   _parent;
414         Box!R   _box;
415         Timer   _t;
416         enum Void = _box.Void;
417     }
418     final this(F f, A args) {
419         _f = f;
420         _args = args;
421         _box._pair = makeSocketPair();
422     }
423 
424     override bool ready() {
425         return _ready;
426     }
427     static if (!Void) {
428         R value() {
429             if (_ready)
430                 return _box._data;
431             throw new NotReadyException("You can't call value for non-ready task");
432         }
433     }
434     override bool wait(Duration timeout = Duration.max) {
435         if (_ready) {
436             if ( !_chind_joined ) {
437                 _child.join();
438                 _chind_joined = true;
439             }
440             if ( _box._exception ) {
441                 throw _box._exception;
442             }
443             return true;
444         }
445         if ( timeout <= 0.seconds ) {
446             // this is poll
447             return _ready;
448         }
449         if ( timeout < Duration.max ) {
450             // rize timer
451             _t = new Timer(timeout, (AppEvent e) @trusted {
452                 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN);
453                 debug tracef("threaded timed out");
454                 auto throwable = _parent.call(Fiber.Rethrow.no);
455                 _t = null;
456             });
457             getDefaultLoop().startTimer(_t);
458         }
459         // wait on the pair
460         final class ThreadEventHandler : FileEventHandler
461         {
462             override void eventHandler(int fd, AppEvent e) @trusted
463             {
464                 _box._pair.read(0, 1);
465                 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN);
466                 debug tracef("threaded done");
467                 if ( _t ) {
468                     getDefaultLoop.stopTimer(_t);
469                     _t = null;
470                 }
471                 auto throwable = _parent.call(Fiber.Rethrow.no);
472             }
473         }
474         _parent = Fiber.getThis();
475         assert(_parent, "You can call this only trom fiber");
476         debug tracef("wait - start listen on socketpair");
477         //auto eh = new ThreadEventHandler();
478         getDefaultLoop().startPoll(_box._pair[0], AppEvent.IN, new ThreadEventHandler());
479         Fiber.yield();
480         debug tracef("wait done");
481         if ( _ready && !_chind_joined ) {
482             _child.join();
483             _chind_joined = true;
484         }
485         return _ready;
486     }
487 
488     final auto run() {
489         this._child = new Thread(
490             {
491                 getDefaultLoop.deinit();
492                 uninitializeLoops();
493                 try {
494                     debug trace("calling");
495                     static if (!Void) {
496                         _box._data = App(_f, _args);
497                     }
498                     else {
499                         App(_f, _args);
500                     }
501                 }
502                 catch (Throwable e) {
503                     _box._exception = e;
504                 }
505                 ubyte[1] b = [0];
506                 _ready = true;
507                 auto s = _box._pair.write(1, b);
508             }
509         );
510         this._child.start();
511         return this;
512     }
513 }
514 
515 class Task(F, A...) : Fiber, Computation if (isCallable!F) {
516     enum  Void = is(ReturnType!F==void);
517     alias start = call;
518     private {
519         alias R = ReturnType!F;
520 
521         F            _f;
522         A            _args;
523         bool         _ready;
524         // Notification _done;
525         Fiber        _waitor;
526         Throwable    _exception;
527 
528         static if ( !Void ) {
529             R       _result;
530         }
531     }
532 
533     final this(F f, A args)
534     {
535         _f = f;
536         _args = args;
537         _waitor = null;
538         _exception = null;
539         static if (!Void) {
540             _result = R.init;
541         }
542         super(&run);
543     }
544 
545     ///
546     /// wait() - wait forewer
547     /// wait(Duration) - wait with timeout
548     /// 
549     override bool wait(Duration timeout = Duration.max) {
550         //if ( state == Fiber.State.TERM )
551         //{
552         //    throw new Exception("You can't wait on finished task");
553         //}
554         // if ( _ready )
555         // {
556         //     if ( _exception !is null ) {
557         //         throw _exception;
558         //     }
559         //     return true;
560         // }
561         if ( _ready || timeout <= 0.msecs )
562         {
563             if ( _exception !is null ) {
564                 throw _exception;
565             }
566             return _ready;
567         }
568         assert(this._waitor is null, "You can't wait twice");
569         this._waitor = Fiber.getThis();
570         assert(_waitor !is null, "You can wait task only from another task or fiber");
571         Timer t;
572         if ( timeout > 0.msecs ) {
573             t = new Timer(timeout, (AppEvent e) @trusted {
574                 auto w = _waitor;
575                 _waitor = null;
576                 w.call(Fiber.Rethrow.no);
577             });
578             getDefaultLoop().startTimer(t);
579         }
580         debug tracef("yeilding task");
581         Fiber.yield();
582         if ( t )
583         {
584             getDefaultLoop().stopTimer(t);
585         }
586         if ( _exception !is null ) {
587             throw _exception;
588         }
589         return _ready;
590     }
591 
592     static if (!Void) {
593         auto waitResult() {
594             wait();
595             enforce(_ready);
596             return _result;
597         }
598     }
599 
600     override bool ready() const {
601         return _ready;
602     }
603     static if (!Void) {
604         @property
605         final auto result() const {
606             enforce!NotReadyException(_ready, "You can't get result from not ready task");
607             return _result;
608         }
609         alias value = result;
610     }
611     private final void run() {
612         static if ( Void )
613         {
614             try {
615                 _f(_args);
616             } catch (Throwable e) {
617                 _exception = e;
618                 debug tracef("got throwable %s", e);
619             }
620             //debug tracef("run void finished, waitors: %s", this._waitor);
621         }
622         else 
623         {
624             try {
625                 _result = _f(_args);
626             } catch(Throwable e) {
627                 _exception = e;
628                 debug tracef("got throwable %s", e);
629             }
630             //debug tracef("run finished, result: %s, waitor: %s", _result, this._waitor);
631         }
632         this._ready = true;
633         if ( this._waitor ) {
634             auto w = this._waitor;
635             this._waitor = null;
636             w.call();
637         }
638     }
639 }
640 
641 auto task(F, A...)(F f, A a) {
642     return new Task!(F,A)(f, a);
643 }
644 
645 auto threaded(F, A...)(F f, A a) {
646     return new Threaded!(F, A)(f, a);
647 }
648 
649 unittest {
650     int i;
651     int f(int s) {
652         i+=s;
653         return(i);
654     }
655     auto t = task(&f, 1);
656     t.call();
657     assert(t.result == 1);
658     assert(i==1, "i=%d, expected 1".format(i));
659     assert(t.result == 1, "result: %d, expected 1".format(t.result));
660 }
661 
662 unittest {
663     auto v = App(function int() {
664         Duration f(Duration t)
665         {
666             hlSleep(t);
667             return t;
668         }
669 
670         auto t100 = task(&f, 100.msecs);
671         auto t200 = task(&f, 200.msecs);
672         t100.start;
673         t200.start;
674         t100.wait();
675         return 1;
676     });
677     assert(v == 1);
678 }
679 
680 unittest
681 {
682     globalLogLevel = LogLevel.info;
683     auto v = App(function int() {
684         Duration f(Duration t)
685         {
686             hlSleep(t);
687             return t;
688         }
689 
690         auto t100 = threaded(&f, 100.msecs).start;
691         auto t200 = threaded(&f, 200.msecs).start;
692         t200.wait(100.msecs);
693         assert(!t200.ready);
694         t100.wait(300.msecs);
695         assert(t100.ready);
696         assert(t100.value == 100.msecs);
697         t200.wait();
698         return 1;
699     });
700     assert(v == 1);
701 }
702 
703 // unittest {
704 //     //
705 //     // two tasks and spawned thread under event loop
706 //     //
707 //     globalLogLevel = LogLevel.info;
708 //     auto mode = globalLoopMode;
709 //     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
710 //         globalLoopMode = m;
711     
712 //         int counter1 = 10;
713 //         int counter2 = 20;
714 //         int f0() {
715 //             hlSleep(1.seconds);
716 //             return 1;
717 //         }
718 //         void f1() {
719 //             while(--counter1 > 0) {
720 //                 hlSleep(100.msecs);
721 //             }
722 //         }
723 //         void f2() {
724 //             while(--counter2 > 0) {
725 //                 hlSleep(50.msecs);
726 //             }
727 //         }
728 //         void f3() {
729 //             auto t1 = task(&f1);
730 //             auto t2 = task(&f2);
731 //             t1.start();
732 //             t2.start();
733 //             auto v = callInThread(&f0);
734 //             //
735 //             // t1 and t2 job must be done at this time
736 //             //
737 //             assert(counter1 == 0);
738 //             assert(counter2 == 0);
739 //             t1.wait();
740 //             t2.wait();
741 //             getDefaultLoop().stop();
742 //         }
743 //         auto t3 = task(&f3);
744 //         t3.start();
745 //         getDefaultLoop().run(1.seconds);
746 //         infof("test0 ok in %s mode", m);
747 //     }
748 //     globalLoopMode = mode;
749 // }
750 
751 unittest {
752     //
753     // just to test that we received correct value at return
754     //
755     globalLogLevel = LogLevel.info;
756     auto mode = globalLoopMode;
757     scope(exit) {
758         globalLoopMode = mode;
759     }
760     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
761         globalLoopMode = m;
762         int f() {
763             return 1;
764         }
765         auto v = App(&f);
766         assert(v == 1, "expected v==1, but received v=%d".format(v));
767         infof("test1 ok in %s mode", m);
768     }
769 }
770 
771 unittest {
772     //
773     // call sleep in spawned thread
774     //
775     globalLogLevel = LogLevel.info;
776     auto mode = globalLoopMode;
777     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
778         globalLoopMode = m;
779         int f() {
780             hlSleep(200.msecs);
781             return 2;
782         }
783         auto v = App(&f);
784         assert(v == 2, "expected v==2, but received v=%d".format(v));
785         infof("test2 ok in %s mode", m);
786     }
787     globalLoopMode = mode;
788 }
789 
790 version(unittest) {
791     class TestException : Exception {
792         this(string msg, string file = __FILE__, size_t line = __LINE__) {
793             super(msg, file, line);
794         }
795     }
796 }
797 
798 unittest {
799     //
800     // test exception delivery when called from thread
801     //
802     globalLogLevel = LogLevel.info;
803     auto mode = globalLoopMode;
804     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
805         globalLoopMode = m;
806         int f() {
807             hlSleep(200.msecs);
808             throw new TestException("test exception");
809         }
810         assertThrown!TestException(App(&f));
811         infof("test3a ok in %s mode", m);
812     }
813     globalLoopMode = mode;
814 }
815 
816 unittest {
817     //
818     // test exception delivery when called from task
819     //
820     globalLogLevel = LogLevel.info;
821     auto mode = globalLoopMode;
822     scope(exit) {
823         globalLoopMode = mode;
824     }
825     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
826         globalLoopMode = m;
827         int f() {
828             auto t = task((){
829                 hlSleep(200.msecs);
830                 throw new TestException("test exception");
831             });
832             t.start();
833             t.wait(300.msecs);
834             return 0;
835         }
836         assertThrown!TestException(App(&f));
837         infof("test3b ok in %s mode", m);
838     }
839 }
840 
841 unittest {
842     //
843     // test wait with timeout
844     //
845     globalLogLevel = LogLevel.info;
846     auto mode = globalLoopMode;
847     scope(exit) {
848         globalLoopMode = mode;
849     }
850     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
851         globalLoopMode = m;
852         int f0() {
853             hlSleep(100.msecs);
854             return 4;
855         }
856         int f() {
857             auto t = task(&f0);
858             t.call();
859             t.wait();
860             return t.result;
861         }
862         auto r = App(&f);
863         assert(r == 4, "spawnTask returned %d, expected 4".format(r));
864         infof("test4 ok in %s mode", m);
865     }
866 }
867 
868 unittest {
869     //
870     // test calling void function
871     //
872     globalLogLevel = LogLevel.info;
873     auto mode = globalLoopMode;
874     scope(exit) {
875         globalLoopMode = mode;
876     }
877     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
878         globalLoopMode = m;
879         void f() {
880             hlSleep(200.msecs);
881         }
882         App(&f);
883         infof("test6 ok in %s mode", m);
884     }
885 }
886 
887 
888 unittest {
889     globalLogLevel = LogLevel.info;
890     //auto oScheduler = scheduler;
891     //scheduler = new MyScheduler();
892     auto mode = globalLoopMode;
893     scope(exit) {
894         globalLoopMode = mode;
895     }
896     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
897         globalLoopMode = m;
898         int f0() {
899             hlSleep(100.msecs);
900             tracef("sleep done");
901             return 6;
902         }
903         int f() {
904             auto v = App(&f0);
905             tracef("got value %s", v);
906             return v+1;
907         }
908         auto r = App(&f);
909         assert(r == 7, "spawnTask returned %d, expected 7".format(r));
910         infof("test7 ok in %s mode", m);
911     }
912 }
913 
914 ////////
915 
916 // unittest {
917 //     info("=== test wait task ===");
918 //     //auto oScheduler = scheduler;
919 //     //scheduler = new MyScheduler();
920 
921 //     globalLogLevel = LogLevel.info;
922 
923 //     auto mode = globalLoopMode;
924 //     scope(exit) {
925 //         globalLoopMode = mode;
926 //     }
927 //     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
928 //         globalLoopMode = m;
929 //         int f1(Duration d) {
930 //             hlSleep(d);
931 //             return 40;
932 //         }
933     
934 //         int f2(Duration d) {
935 //             auto t = task(&f1, d);
936 //             t.call();
937 //             t.wait();
938 //             return t.result;
939 //         }
940     
941 //         auto t = task(&f2, 500.msecs);
942     
943 //         auto tid = spawnTask(t, 1.seconds);
944     
945 //         receive(
946 //             (const int i)
947 //             {
948 //                 assert(i == 40, "expected 40, got %s".format(i));
949 //             },
950 //             (Variant v)
951 //             {
952 //                 errorf("test wait task got variant %s of type %s", v, v.type);
953 //                 assert(0);
954 //             }
955 //         );
956 //         infof("ok in %s mode", m);
957 //     }
958 //     //
959 //     //scheduler = oScheduler;
960 // }
961 
962 // unittest {
963 //     info("=== test wait task with timeout ===");
964 //     //
965 //     // we call f2 which start f1(sleeping for 500 msecs) and wait it for 100 msecs
966 //     // so 
967 //     globalLogLevel = LogLevel.trace;
968 //     auto mode = globalLoopMode;
969 //     scope(exit) {
970 //         globalLoopMode = mode;
971 //     }
972 //     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
973 //         globalLoopMode = m;
974     
975 //         int f1(Duration d) {
976 //             hlSleep(d);
977 //             return 41;
978 //         }
979     
980 //         bool f2(Duration d) {
981 //             auto t = task(&f1, d);
982 //             t.call();
983 //             bool ready = t.wait(100.msecs);
984 //             assert(!t.ready);
985 //             return ready;
986 //         }
987     
988 //         auto t = task(&f2, 500.msecs);
989 //         spawnTask(t, 1.seconds);
990 //         receive(
991 //             (Exception e) {tracef("got exception"); assert(0);},
992 //             (const bool b) {assert(!b, "got value %s instedad of false".format(b));},
993 //             (Variant v) {tracef("got variant %s", v); assert(0);}
994 //         );
995 //         infof("ok in %s mode", m);
996 //     }
997 // }
998 
999 // class SharedNotificationChannel : FileEventHandler {
1000 //     import containers.slist, containers.hashmap;
1001 //     import std.experimental.logger;
1002 
1003 //     private {
1004 //         struct SubscriptionInfo {
1005 //             hlEvLoop                  _loop;
1006 //             immutable int             _loop_id; // event loop id where subscriber reside
1007 //             immutable HandlerDelegate _h;
1008 //         }
1009 //         package shared int  snc_id;
1010 //         immutable int       _id;
1011 //         shared Mutex        _subscribers_lock;
1012 
1013 //         SList!SubscriptionInfo            _subscribers;
1014 //     }
1015 
1016 //     this() @safe {
1017 //         import core.atomic;
1018 //         _subscribers_lock = new shared Mutex;
1019 //         _id = atomicOp!"+="(snc_id, 1);
1020 //     }
1021 //     void broadcast() @safe @nogc {
1022 //         _subscribers_lock.lock_nothrow();
1023 //         scope(exit) {
1024 //             _subscribers_lock.unlock_nothrow();
1025 //         }
1026 
1027 //         foreach(destination; _subscribers) {
1028 //             version(OSX) {
1029 //                 import core.sys.darwin.sys.event;
1030 
1031 //                 kevent_t    user_event;
1032 //                 immutable remote_kqueue_fd = destination._loop_id;
1033 //                 with (user_event) {
1034 //                     ident = _id;
1035 //                     filter = EVFILT_USER;
1036 //                     flags = 0;
1037 //                     fflags = NOTE_TRIGGER;
1038 //                     data = 0;
1039 //                     udata = null;
1040 //                 }
1041 //                 auto rc = (() @trusted => kevent(remote_kqueue_fd, cast(kevent_t*)&user_event, 1, null, 0, null))();
1042 //             }
1043 //             version(linux) {
1044 //                 import core.sys.posix.unistd: write;
1045 //                 import core.stdc.string: strerror;
1046 //                 import core.stdc.errno: errno;
1047 //                 import std.string;
1048     
1049 //                 auto rc = (() @trusted => write(destination._loop_id, &_id, 8))();
1050 //                 if ( rc == -1 ) {
1051 //                     //errorf("event_fd write to %d returned error %s", destination, fromStringz(strerror(errno)));
1052 //                 }
1053 //             }
1054 //         }
1055 //     }
1056 
1057 //     override void eventHandler(int _loop_id, AppEvent e) {
1058 //         tracef("process user event handler on fd %d", _loop_id);
1059 //         version(linux) {
1060 //             import core.sys.posix.unistd: read;
1061 //             ulong v;
1062 //             auto rc = (() @trusted => read(_loop_id, &v, 8))();
1063 //         }
1064 //         _subscribers_lock.lock_nothrow();
1065 //         scope(exit) {
1066 //             _subscribers_lock.unlock_nothrow();
1067 //         }
1068 //         foreach(s; _subscribers) {
1069 //             if ( _loop_id != s._loop_id ) {
1070 //                 continue;
1071 //             }
1072 //             auto h = s._h;
1073 //             h(e);
1074 //         }
1075 //     }
1076 
1077 //     void signal() @trusted {
1078 //         _subscribers_lock.lock_nothrow();
1079 //         scope(exit) {
1080 //             _subscribers_lock.unlock_nothrow();
1081 //         }
1082 //         if ( _subscribers.empty ) {
1083 //             trace("send signal - no subscribers");
1084 //             return;
1085 //         }
1086 //         auto destination = _subscribers.front();
1087 
1088 //         version(OSX) {
1089 //             import core.sys.darwin.sys.event;
1090 //             kevent_t user_event;
1091 //             immutable remote_kqueue_fd = destination._loop_id;
1092 //             with (user_event) {
1093 //                 ident = _id;
1094 //                 filter = EVFILT_USER;
1095 //                 flags = 0;
1096 //                 fflags = NOTE_TRIGGER;
1097 //                 data = 0;
1098 //                 udata = null;
1099 //             }
1100 //             int rc = (() @trusted => kevent(remote_kqueue_fd, cast(kevent_t*)&user_event, 1, null, 0, null))();
1101 //             tracef("signal trigger rc to remote_kqueue_fd %d: %d", remote_kqueue_fd, rc);
1102 //             enforce(rc>=0, "Failed to trigger event");
1103 //         }
1104 //         version(linux) {
1105 //             import core.sys.posix.unistd: write;
1106 //             import core.stdc.string: strerror;
1107 //             import core.stdc.errno: errno;
1108 //             import std.string;
1109 
1110 //             auto rc = (() @trusted => write(destination._loop_id, &_id, 8))();
1111 //             debug tracef("event_fd %d write = %d", destination._loop_id, rc);
1112 //             if ( rc == -1 ) {
1113 //                 errorf("event_fd write to %d returned error %s", destination, fromStringz(strerror(errno)));
1114 //             }
1115 //         }
1116 //     }
1117 //     auto subscribe(hlEvLoop loop, HandlerDelegate handler) @safe {
1118 //         version(OSX) {
1119 //             immutable event_fd = getDefaultLoop().getKernelId();
1120 //             //import core.sys.posix.fcntl: open;
1121 //             //immutable event_fd = (() @trusted => open("/dev/null", 0))();
1122 //             SubscriptionInfo s = SubscriptionInfo(loop, event_fd, handler);
1123 //             loop.waitForUserEvent(_id, this);
1124 //         }
1125 //         version(linux) {
1126 //             import core.sys.linux.sys.eventfd;
1127 //             immutable event_fd = (() @trusted => eventfd(0,EFD_NONBLOCK))();
1128 //             SubscriptionInfo s = SubscriptionInfo(loop, event_fd, handler);
1129 //             loop.waitForUserEvent(event_fd, this);
1130 //         }
1131 //         synchronized(_subscribers_lock) {
1132 //             _subscribers.put(s);
1133 //         }
1134 //         tracef("subscribers length = %d", _subscribers.length());
1135 //         return s;
1136 //     }
1137 
1138 //     void unsubscribe(in SubscriptionInfo s) {
1139 //         s._loop.stopWaitForUserEvent(_id, this);
1140 //         synchronized(_subscribers_lock) {
1141 //             _subscribers.remove(s);
1142 //             version(linux) {
1143 //                 import core.sys.posix.unistd: close;
1144 //                 close(s._loop_id);
1145 //             }
1146 //         }
1147 //     }
1148 
1149 //     auto register(hlEvLoop loop, HandlerDelegate handler) {
1150 //         return subscribe(loop, handler);
1151 //     }
1152 
1153 //     void deregister(SubscriptionInfo s) {
1154 //         unsubscribe(s);
1155 //     }
1156 
1157 //     void close() @safe @nogc {
1158 //     }
1159 // }
1160 
1161 // unittest {
1162 //     //
1163 //     // we call f2 which start f1(sleeping for 500 msecs) and wait it for 100 msecs
1164 //     // so 
1165 //     globalLogLevel = LogLevel.info;
1166 //     auto mode = globalLoopMode;
1167 //     scope(exit) {
1168 //         globalLoopMode = mode;
1169 //     }
1170 //     foreach(m; [Mode.NATIVE]) {
1171 //         globalLoopMode = m;
1172 //         infof("=== test shared notification channel signal in %s mode ===", m);
1173 //         auto snc = new SharedNotificationChannel();
1174 //         scope(exit) {
1175 //             snc.close();
1176 //         }
1177 //         int  test_value;
1178 //         void signal_poster() {
1179 //             hlSleep(100.msecs);
1180 //             tracef("send signal");
1181 //             snc.signal();
1182 //         }
1183 //         int signal_receiver() {
1184 //             int test = 0;
1185 //             HandlerDelegate h = (AppEvent e) {
1186 //                 tracef("shared notificatioin delivered");
1187 //                 test = 1;
1188 //             };
1189 //             hlEvLoop loop = getDefaultLoop();
1190 //             auto s = snc.register(loop, h);
1191 //             hlSleep(200.msecs);
1192 //             snc.deregister(s);
1193 //             return test;
1194 //         }
1195 //         auto tp = task({
1196 //             callInThread(&signal_poster);
1197 //         });
1198 //         auto tr = task({
1199 //             test_value = callInThread(&signal_receiver);
1200 //             getDefaultLoop().stop();
1201 //         });
1202 //         tp.call();
1203 //         tr.call();
1204 //         getDefaultLoop().run(3000.msecs);
1205 //         assert(test_value == 1, "expected 1, got %s".format(test_value));
1206 //     }
1207 // }
1208 
1209 // unittest {
1210 //     info("=== test shared notification channel broacast ===");
1211 //     //
1212 //     // we call f2 which start f1(sleeping for 500 msecs) and wait it for 100 msecs
1213 //     // so 
1214 //     globalLogLevel = LogLevel.info;
1215 //     auto snc = new SharedNotificationChannel();
1216 //     scope(exit) {
1217 //         snc.close();
1218 //     }
1219 //     int   test_value;
1220 //     shared Mutex lock = new shared Mutex;
1221 
1222 //     void signal_poster() {
1223 //         hlSleep(100.msecs);
1224 //         snc.broadcast();
1225 //         tracef("shared notificatioin broadcasted");
1226 //     }
1227 //     void signal_receiver1() {
1228 //         HandlerDelegate h = (AppEvent e) {
1229 //             synchronized(lock) {
1230 //                 test_value++;
1231 //             }
1232 //             tracef("shared notificatioin delivered 1 - %d", test_value);
1233 //         };
1234 //         //class nHandler : FileEventHandler {
1235 //         //    override void eventHandler(int fd, AppEvent e) {
1236 //         //    tracef("shared notificatioin delivered 1");
1237 //         //        synchronized(lock) {
1238 //         //            test_value++;
1239 //         //        }
1240 //         //    }
1241 //         //}
1242 //         //auto h = new nHandler();
1243 //         hlEvLoop loop = getDefaultLoop();
1244 //         auto s = snc.register(loop, h);
1245 //         hlSleep(200.msecs);
1246 //         snc.deregister(s);
1247 //     }
1248 //     void signal_receiver2() {
1249 //         HandlerDelegate h = (AppEvent e) {
1250 //             synchronized(lock) {
1251 //                 test_value++;
1252 //             }
1253 //             tracef("shared notificatioin delivered 2 - %d", test_value);
1254 //         };
1255 //         //class nHandler : FileEventHandler {
1256 //         //    override void eventHandler(int fd, AppEvent e) {
1257 //         //        tracef("shared notificatioin delivered 2");
1258 //         //        synchronized(lock) {
1259 //         //            test_value++;
1260 //         //        }
1261 //         //    }
1262 //         //}
1263 //         //auto h = new nHandler();
1264 //         hlEvLoop loop = getDefaultLoop();
1265 //         auto s = snc.register(loop, h);
1266 //         hlSleep(200.msecs);
1267 //         snc.deregister(s);
1268 //     }
1269 //     auto tp = task({
1270 //         callInThread(&signal_poster);
1271 //     });
1272 //     auto tr1 = task({
1273 //         callInThread(&signal_receiver1);
1274 //     });
1275 //     auto tr2 = task({
1276 //         callInThread(&signal_receiver2);
1277 //     });
1278 //     tp.call();
1279 //     tr1.call();
1280 //     tr2.call();
1281 //     getDefaultLoop().run(500.msecs);
1282 //     assert(test_value == 2);
1283 // }
1284 
1285 //
1286 // split array on N balanced chunks
1287 // (not on chunks with N members)
1288 //
1289 private auto splitn(T)(T a, size_t slices) {
1290     T[] r;
1291     if (a.length == 0) {
1292         return r;
1293     }
1294     if (a.length % slices == 0) {
1295         return chunks(a, a.length / slices).array;
1296     }
1297     int n;
1298     while (n < a.length) {
1299         auto rest = a.length - n;
1300         auto done = slices - r.length;
1301         auto size = rest % done ? (rest / done + 1) : rest / done;
1302         r ~= a[n .. n + size];
1303         n += size;
1304     }
1305     return r;
1306 }
1307 unittest {
1308     for(int n=1; n<100; n++) {
1309         for (int slices = 1; slices < n; slices++) {
1310             auto r = splitn(iota(n).array, slices);
1311             assert(r.length == slices);
1312             assert(equal(iota(n), r.join));
1313         }
1314     }
1315 }
1316 
1317 // Map array on M threads and N fibers
1318 // Non lazy. Return void if f is void.
1319 //            :
1320 //           /|\
1321 //          / | \
1322 //       ->/  |  \<- M threads
1323 //        /   |   \
1324 //       N    N    N
1325 //      /|\  /|\  /|\
1326 //      |||  |||  |||
1327 //      |||  |||->|||<- N fibers
1328 //      fff  fff  fff
1329 //      ...  ...  ...
1330 //      ...  ...  ... <- r splitted over MxN fibers
1331 //      ...  ...  .. 
1332 //
1333 auto mapMxN(F, R)(R r, F f, ulong m, ulong n) {
1334     enum Void = is(ReturnType!F == void);
1335 
1336     assert(m > 0 && n > 0 && r.length > 0);
1337 
1338     m = min(m, r.length);
1339 
1340     auto fiberWorker(R fiber_chunk) {
1341         static if (!Void) {
1342             return fiber_chunk.map!(f).array;
1343         } else {
1344             fiber_chunk.each!f;
1345         }
1346     }
1347 
1348     auto threadWorker(R thread_chunk) {
1349         auto fibers = thread_chunk.splitn(n). // split on N chunks
1350             map!(fiber_chunk => task(&fiberWorker, fiber_chunk)). // start fiber over each chunk
1351             array;
1352         fibers.each!"a.start";
1353         fibers.each!"a.wait";
1354         static if (!Void) {
1355             return fibers.map!"a.value".array.join;
1356         }
1357     }
1358     auto threads = r.splitn(m). // split on M chunks
1359         map!(thread_chunk => threaded(&threadWorker, thread_chunk)). // start thread over each chunk
1360         array;
1361     threads.each!"a.start";
1362     threads.each!"a.wait";
1363     static if (!Void) {
1364         return threads.map!"a.value".array.join;
1365     }
1366 }
1367 
1368 // map array on M threads
1369 // Non lazy. Return void if f is void.
1370 //            :
1371 //           /|\
1372 //          / | \
1373 //       ->/  |  \<- M threads
1374 //        /   |   \
1375 //       f    f    f
1376 //       .    .    . 
1377 //       .    .    .  <- r splitted over M threads
1378 //       .    .      
1379 //
1380 auto mapM(R, F)(R r, F f, ulong m) if (isArray!R) {
1381     enum Void = is(ReturnType!F == void);
1382 
1383     assert(m > 0 && r.length > 0);
1384 
1385     m = min(m, r.length);
1386 
1387     static if (Void) {
1388         void threadWorker(R chunk) {
1389             chunk.each!f;
1390         }
1391     } else {
1392         auto threadWorker(R chunk) {
1393             return chunk.map!f.array;
1394         }
1395     }
1396 
1397     auto threads = r.splitn(m).map!(thread_chunk => threaded(&threadWorker, thread_chunk).start).array;
1398 
1399     threads.each!"a.wait";
1400 
1401     static if (!Void) {
1402         return threads.map!"a.value".array.join;
1403     }
1404 }
1405 
1406 unittest {
1407     import std.range;
1408     import std.stdio;
1409     import core.atomic;
1410 
1411     shared int cnt;
1412 
1413     void f0(int arg) {
1414         atomicOp!"+="(cnt,arg);
1415     }
1416 
1417     int f1(int i) {
1418         return i * i;
1419     }
1420 
1421     int[] f2(int i) {
1422         return [i,i+1];
1423     }
1424 
1425     App({
1426         // woid function, updates shared counter
1427         iota(20).array.mapMxN(&f0, 2, 3);
1428         assert(cnt == 190);
1429     });
1430 
1431     cnt = 0;
1432     App({
1433         // woid function, updates shared counter
1434         iota(20).array.mapM(&f0, 5);
1435         assert(cnt == 190);
1436     });
1437 
1438     App({
1439         auto r = iota(20).array.mapMxN(&f1, 2, 3);
1440         assert(equal(r, iota(20).map!"a*a"));
1441     });
1442 
1443     App({
1444         auto r = iota(20).array.mapM(&f1, 5);
1445         assert(equal(r, iota(20).map!"a*a"));
1446     });
1447 
1448     App({
1449         auto r = iota(20).array.mapM(&f2, 5);
1450         assert(equal(r, iota(20).map!"[a, a+1]"));
1451     });
1452 }