1 module hio.socket;
2 
3 import std.typecons;
4 import std.string;
5 import std.conv;
6 import std.traits;
7 import std.datetime;
8 import std.exception;
9 import std.algorithm;
10 
11 import std.algorithm.comparison: min;
12 
13 import std.experimental.logger;
14 
15 import core.memory: pureMalloc, GC;
16 import core.exception : onOutOfMemoryError;
17 
18 import std.experimental.allocator;
19 import std.experimental.allocator.building_blocks;
20 
21 //static import std.socket;
22 
23 import std.socket;
24 
25 import core.sys.posix.sys.socket;
26 import core.sys.posix.unistd;
27 import core.sys.posix.arpa.inet;
28 import core.sys.posix.netinet.tcp;
29 import core.sys.posix.netinet.in_;
30 import core.sys.posix.sys.time : timeval;
31 
32 import core.sys.posix.fcntl;
33 
34 import core.stdc.string;
35 import core.stdc.errno;
36 import core.stdc.stdio: printf;
37 
38 public import hio.events;
39 import hio.common;
40 import nbuff;
41 
42 import hio;
43 
44 //alias Socket = RefCounted!SocketImpl;
45 
46 alias AcceptFunction = void function(int fileno);
47 alias AcceptDelegate = void delegate(hlSocket);
48 
49 // static ~this() {
50 //     trace("deinit");
51 // }
52 
53 //hlSocket[int] fd2so;
54 //
55 //void loopCallback(int fd, AppEvent ev) @safe {
56 //    debug(hiosocket) tracef("loopCallback for %d", fd);
57 //    hlSocket s = fd2so[fd];
58 //    if ( s && s._fileno >= 0) {
59 //        debug(hiosocket) tracef("calling handler(%s) for %s", appeventToString(ev), s);
60 //        s._handler(ev);
61 //    } else {
62 //        infof("impossible event %s on fd: %d", appeventToString(ev), fd);
63 //    }
64 //}
65 
66 class SocketException : Exception {
67     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe {
68         super(msg, file, line);
69     }
70 }
71 
72 class ConnectionRefused : Exception {
73     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe {
74         super(msg, file, line);
75     }
76 }
77 
78 class Timeout : Exception {
79     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe {
80         super(msg, file, line);
81     }
82 }
83 
84 bool isLinux() pure nothrow @nogc @safe {
85     version(linux) {
86         return true;
87     } else {
88         return false;
89     }
90 }
91 
92 interface AsyncSocketLike
93 {
94     bool open() @safe;
95     void close() @safe;
96     bool connected() @safe;
97     void bind(Address addr);
98     bool connect(Address addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe;
99     void accept(hlEvLoop loop, Duration timeout, void delegate(AsyncSocketLike) @safe f) @safe;
100     int  io(hlEvLoop, ref IORequest, Duration) @safe;
101 }
102 
103 // callback based socket
104 class hlSocket : FileEventHandler, AsyncSocketLike {
105     private {
106         enum State {
107             NEW = 0,
108             IDLE,
109             CONNECTING,
110             ACCEPTING,
111             IO,
112             SHUTDOWN,
113         }
114         enum Flags
115         {
116             EXTERNALLY_MANAGED_FD = 1, // socket fd created and managed outside of this class
117         }
118         ushort               _flags;
119         immutable ubyte      _af = AF_INET;
120         immutable int        _sock_type = SOCK_STREAM;
121         int                  _fileno = -1;
122         int                  _errno;
123         HandlerDelegate      _handler;
124         AppEvent             _polling = AppEvent.NONE;
125         size_t               _buffer_size = 16*1024;
126         hlEvLoop             _loop;
127         immutable string     _file;
128         immutable int        _line;
129         State                _state = State.NEW;
130         HandlerDelegate      _callback;
131         // accept related fields
132         void delegate(AsyncSocketLike) @safe _accept_callback;
133         // io related fields
134         IORequest           _iorq;
135         IOResult            _result;
136         Timer                _connect_timer;
137         Timer                _io_timer;
138         AppEvent             _pollingFor = AppEvent.NONE;
139         MutableNbuffChunk    _input;
140         size_t               _input_length;
141         bool                 _connected;
142         uint                 _accepts_in_a_row = 10;
143     }
144     override string describe() @safe
145     {
146         return "hlSocket: "
147            ~"_state: %s; ".format(_state)
148            ~"_file(_line): %s:%s; ".format(_file, _line)
149            ~"_fileno: %s; ".format(_fileno)
150            ~"_polling: %s; ".format(appeventToString(_polling))
151            ~"_connected: %s; ".format(_connected)
152            ~"_conn_timer: [%s]; ".format(_connect_timer)
153            ~"_io_timer: [%s]; ".format(_io_timer)
154            ~"_callback: %s; ".format(_callback)
155            ;
156     }
157     this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l =  __LINE__) @safe {
158         debug(hiosocket) tracef("create socket");
159         _af = af;
160         _sock_type = sock_type;
161         _file = f;
162         _line = l;
163     }
164 
165     this(int s, ubyte af = AF_INET, int sock_type = 0, string f = __FILE__, int l =  __LINE__) @safe
166     in {assert(s>=0);}
167     body {
168         _af = af;
169         _sock_type = sock_type;
170         _fileno = s;
171         _flags |= Flags.EXTERNALLY_MANAGED_FD;
172         _file = f;
173         _line = l;
174         auto flags = (() @trusted => fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK)();
175         (() @trusted => fcntl(_fileno, F_SETFL, flags))();
176     }
177 
178 
179     ~this() {
180         // if ( _fileno != -1 ) {
181         //     if ( _loop && _polling != AppEvent.NONE ) {
182         //         _loop.stopPoll(_fileno, _polling);
183         //         _loop.detach(_fileno);
184         //     }
185         //     //fd2so[_fileno] = null;
186         //     .close(_fileno);
187         //     _fileno = -1;
188         // }
189         // close();
190         assert(_io_timer is null);
191         assert(_connect_timer is null);
192     }
193     
194     override string toString() const @safe {
195         import std.format: format;
196         return "socket: fileno: %d, (%s:%d)".format(_fileno, _file, _line);
197     }
198     public bool connected() const pure @safe nothrow {
199         return _connected;
200     }
201 
202     public auto fileno() const pure @safe nothrow {
203         return _fileno;
204     }
205 
206     public auto socket_errno() const pure @safe nothrow {
207         return _errno;
208     }
209 
210     bool blocking() @property @safe
211     {
212         auto flags = () @trusted {return fcntl(_fileno, F_GETFL, 0);}();
213         return cast(bool)(flags & O_NONBLOCK);
214     }
215     void blocking(bool blocking) @property @safe {
216         auto flags = () @trusted {return fcntl(_fileno, F_GETFL, 0);}();
217         if ( blocking ) {
218             (() @trusted => fcntl(_fileno, F_SETFL, flags & ~O_NONBLOCK))();
219         } else {
220             (() @trusted => fcntl(_fileno, F_SETFL, flags | O_NONBLOCK))();
221         }
222     }
223     void timeoutHandler(AppEvent e) @safe {
224         debug
225         {
226             tracef("Timeout handler: %s", appeventToString(e));
227         }
228         final switch (_state) {
229         case State.SHUTDOWN:
230             assert(0);
231         case State.NEW:
232             assert(0);
233         case State.IDLE:
234             assert(0);
235         case State.CONNECTING:
236             debug(hiosocket) tracef("connection timed out");
237             _connected = false;
238             _errno = ETIMEDOUT;
239             _polling = AppEvent.NONE;
240             _loop.stopPoll(_fileno, AppEvent.OUT);
241             _loop.detach(_fileno);
242             _state = State.IDLE;
243             _connect_timer = null;
244             if ( e & AppEvent.SHUTDOWN)
245             {
246                 _state = State.SHUTDOWN;
247             }
248             _callback(e);
249             return;
250         case State.ACCEPTING:
251             debug(hiosocket) tracef("accept timed out");
252             _connected = false;
253             _errno = ETIMEDOUT;
254             _polling = AppEvent.NONE;
255             _loop.stopPoll(_fileno, AppEvent.IN);
256             _loop.detach(_fileno);
257             _state = State.IDLE;
258             if ( e & AppEvent.SHUTDOWN)
259             {
260                 _state = State.SHUTDOWN;
261             }
262             _accept_callback(null);
263             return;
264         case State.IO:
265             assert(0);
266         }
267     }
268     override void eventHandler(int fd, AppEvent e) @safe {
269         debug(hiosocket) tracef("event %s in state %s", appeventToString(e), _state);
270         final switch ( _state ) {
271         case State.SHUTDOWN:
272             return;
273             assert(0);
274         case State.NEW:
275             if ( e & AppEvent.SHUTDOWN)
276             {
277                 _state = State.SHUTDOWN;
278                 return;
279             }
280             assert(0);
281         case State.IDLE:
282             if ( e & AppEvent.SHUTDOWN)
283             {
284                 _state = State.SHUTDOWN;
285                 return;
286             }
287             assert(0);
288         case State.CONNECTING:
289             debug(hiosocket) tracef("connection event: %s", appeventToString(e));
290             if ( e & AppEvent.SHUTDOWN)
291             {
292                 _state = State.SHUTDOWN;
293                 _connected = false;
294                 if ( _connect_timer )
295                 {
296                     _loop.stopTimer(_connect_timer);
297                     _connect_timer = null;
298                 }
299                 _polling = AppEvent.NONE;
300                 _loop.stopPoll(_fileno, AppEvent.OUT);
301                 _callback(e);
302                 return;
303             }
304             assert(e & (AppEvent.OUT|AppEvent.HUP|AppEvent.ERR), "We can handle only OUT event in connecting state, but got %s".format(e));
305             if ( e & AppEvent.OUT )
306             {
307                 _connected = true;
308             }
309             if ( (e & (AppEvent.HUP|AppEvent.ERR)) ) {
310                 int err;
311                 uint err_s = err.sizeof;
312                 auto rc = (() @trusted => .getsockopt(_fileno, SOL_SOCKET, SO_ERROR, &err, &err_s))();
313                 if ( rc == 0 ) {
314                     _errno = err;
315                     debug(hiosocket) tracef("error connecting: %s", s_strerror(err));
316                 }
317                 _connected = false;
318             }
319             _polling = AppEvent.NONE;
320             if ( e & AppEvent.SHUTDOWN)
321             {
322                 _state = State.SHUTDOWN;
323             }
324             else
325             {
326                 _state = State.IDLE;
327             }
328             if ( _connect_timer ) {
329                 _loop.stopTimer(_connect_timer);
330                 _connect_timer = null;
331             }
332             _loop.stopPoll(_fileno, AppEvent.OUT);
333             _callback(e);
334             return;
335             //_handler(e);
336         case State.ACCEPTING:
337             assert(e == AppEvent.IN, "We can handle only IN event in accepting state");
338             foreach(_; 0.._accepts_in_a_row) {
339                 if ( _fileno == -1 ) {
340                     break; // socket can be closed in handler
341                 }
342                 sockaddr sa;
343                 uint sa_len = sa.sizeof;
344               retry:
345                 int new_s = (() @trusted => .accept(_fileno, &sa, &sa_len))();
346                 if ( new_s == -1 ) {
347                     auto err = errno();
348                     if ( err == EINTR )
349                     {
350                         // restart accept
351                         goto retry;
352                     }
353                     if ( err == EWOULDBLOCK || err == EAGAIN ) {
354                         // POSIX.1-2001 and POSIX.1-2008 allow
355                         // either error to be returned for this case, and do not require
356                         // these constants to have the same value, so a portable
357                         // application should check for both possibilities.
358                         break;
359                     }
360                     throw new Exception(s_strerror(err));
361                 }
362                 debug(hiosocket) tracef("New socket fd: %d", new_s);
363                 immutable int flag = 1;
364                 auto rc = (() @trusted => .setsockopt(new_s, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof))();
365                 if ( rc != 0 ) {
366                      throw new Exception(s_strerror(errno()));
367                 }
368                 version(OSX) {
369                     rc = (() @trusted => .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof))();
370                     if ( rc != 0 ) {
371                          throw new Exception(s_strerror(errno()));
372                     }
373                 }
374                 auto flags = (() @trusted => fcntl(new_s, F_GETFL, 0) | O_NONBLOCK)();
375                 (() @trusted => fcntl(new_s, F_SETFL, flags))();
376                 if ( _connect_timer ) {
377                     _loop.stopTimer(_connect_timer);
378                     _connect_timer = null;
379                 }
380                 hlSocket ns = new hlSocket(new_s, _af, _sock_type);
381                 ns._flags &= ~Flags.EXTERNALLY_MANAGED_FD;
382                 ns._connected = true;
383                 if ( e & AppEvent.SHUTDOWN)
384                 {
385                     _state = State.SHUTDOWN;
386                 }
387                 _accept_callback(ns);
388                 debug(hiosocket) tracef("accept_callback for fd: %d - done", new_s);
389             }
390             //_handler(e);
391             return;
392         case State.IO:
393             io_handler(e);
394             return;
395         }
396     }
397 
398     public bool open() @trusted {
399         immutable flag = 1;
400         if (_fileno != -1) {
401             throw new SocketException("You can't open already opened socket: fileno(%d)".format(_fileno));
402         }
403         _fileno = socket(_af, _sock_type, 0);
404         if ( _fileno < 0 )
405         {
406             _errno = errno();
407             return false;
408         }
409         debug(hiosocket) tracef("new socket created, %s", this);
410         _polling = AppEvent.NONE;
411         //fd2so[_fileno] = this;
412         auto rc = .setsockopt(_fileno, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof);
413         if ( rc != 0 ) {
414              throw new Exception(to!string(strerror(errno())));
415         }
416         version(OSX) {
417             rc = .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof);
418             if ( rc != 0 ) {
419                  throw new Exception(to!string(strerror(errno())));
420             }
421         }
422         auto flags = fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK;
423         fcntl(_fileno, F_SETFL, flags);
424         return true;
425     }
426 
427     public void close() @safe {
428         //assert(_state == State.IDLE);
429         if ( _fileno != -1 ) {
430             debug(hiosocket) tracef("closing %d, polling: %x", _fileno, _polling);
431             if ( _loop  )
432             {
433                 if ( _polling != AppEvent.NONE )
434                 {
435                     debug(hiosocket) tracef("detach from polling for %s", appeventToString(_polling));
436                     _loop.stopPoll(_fileno, _polling);
437                     _polling = AppEvent.NONE;
438                 }
439                 _loop.detach(_fileno);
440             }
441             if ( (_flags & Flags.EXTERNALLY_MANAGED_FD) == 0 )
442             {
443                 .close(_fileno);
444             }
445             _fileno = -1;
446         }
447         if ( _connect_timer )
448         {
449             debug(hiosocket) tracef("also stop connect timer: %s", _connect_timer);
450             _loop.stopTimer(_connect_timer);
451             _connect_timer = null;
452         }
453         if ( _io_timer )
454         {
455             debug(hiosocket) tracef("also stop io timer: %s", _io_timer);
456             _loop.stopTimer(_io_timer);
457             _io_timer = null;
458         }
459         _iorq = IORequest();
460         _result = IOResult();
461     }
462     public void bind(Address addr) @safe
463     {
464          switch (_af) {
465              case AF_INET:
466                 {
467                     import core.sys.posix.netinet.in_;
468                     InternetAddress ia = cast(InternetAddress)addr;
469                     static int flag = 1;
470                     int rc;
471                     () @trusted {
472                         debug(hiosocket) tracef("binding fileno %s to %s", _fileno, ia);
473                         auto rc = .setsockopt(_fileno, SOL_SOCKET, SO_REUSEADDR, &flag, flag.sizeof);
474                         debug(hiosocket) tracef("setsockopt for bind result: %d", rc);
475                         if ( rc != 0 ) {
476                             throw new Exception(to!string(strerror(errno())));
477                         }
478                         sockaddr_in sa;
479                         sa.sin_family = AF_INET;
480                         sa.sin_port = htons(ia.port);
481                         sa.sin_addr.s_addr = htonl(ia.addr);
482                         rc = .bind(_fileno, cast(sockaddr*)&sa, cast(uint)sa.sizeof);
483                         debug(hiosocket) {
484                             tracef("bind result: %d", rc);
485                         }
486                         if ( rc != 0 ) {
487                             throw new SocketException(to!string(strerror(errno())));
488                         }
489                     }();
490                 }
491                 break;
492              case AF_UNIX:
493              default:
494                  throw new SocketException("unsupported address family");
495          }
496 
497     }
498     public void bind(string addr) @trusted {
499          debug(hiosocket) {
500              tracef("binding to %s", addr);
501          }
502          switch (_af) {
503              case AF_INET:
504                 {
505                     import core.sys.posix.netinet.in_;
506                     // addr must be "host:port"
507                     auto internet_addr = str2inetaddr(addr);
508                     sockaddr_in sin;
509                     sin.sin_family = _af;
510                     sin.sin_port = internet_addr[1];
511                     sin.sin_addr = in_addr(internet_addr[0]);
512                     int flag = 1;
513                     auto rc = .setsockopt(_fileno, SOL_SOCKET, SO_REUSEADDR, &flag, flag.sizeof);
514                     debug(hiosocket) tracef("setsockopt for bind result: %d", rc);
515                     if ( rc != 0 ) {
516                     throw new Exception(to!string(strerror(errno())));
517                     }
518                     rc = .bind(_fileno, cast(sockaddr*)&sin, cast(uint)sin.sizeof);
519                     debug(hiosocket) {
520                         tracef("bind result: %d", rc);
521                     }
522                     if ( rc != 0 ) {
523                         throw new SocketException(to!string(strerror(errno())));
524                     }
525                 }
526                 break;
527              case AF_UNIX:
528              default:
529                  throw new SocketException("unsupported address family");
530          }
531     }
532 
533     public void listen(int backlog = 10) @trusted {
534         int rc = .listen(_fileno, backlog);
535         if ( rc != 0 ) {
536             throw new SocketException(to!string(strerror(errno())));
537         }
538         debug(hiosocket) tracef("listen on %d ok", _fileno);
539     }
540 
541     public void stopPolling(L)(L loop) @safe {
542         debug(hiosocket) tracef("Stop polling on %d", _fileno);
543         loop.stopPoll(_fileno, _polling);
544     }
545 
546     private auto getSndRcvTimeouts() @safe {
547 
548         timeval sndtmo, rcvtmo;
549         socklen_t tmolen = sndtmo.sizeof;
550         auto rc = () @trusted {
551             return .getsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, &tmolen);
552         }();
553         enforce(rc==0, "Failed to get sndtmo");
554         debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo);
555         rc = () @trusted {
556             return .getsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, &tmolen);
557         }();
558         enforce(rc == 0, "Failed to get rcvtmo");
559         debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo);
560         return Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo")(sndtmo, rcvtmo);
561     }
562 
563     private void setSndRcvTimeouts(Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo") timeouts) @safe {
564 
565         timeval sndtmo = timeouts.sndtimeo, rcvtmo = timeouts.rcvtimeo;
566         socklen_t tmolen = sndtmo.sizeof;
567 
568         auto rc = () @trusted {
569             return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, tmolen);
570         }();
571         debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo);
572         rc = () @trusted {
573             return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, tmolen);
574         }();
575         debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo);
576     }
577 
578     private void setSndRcvTimeouts(Duration timeout) @safe {
579         timeval     ntmo;
580         socklen_t   stmo;
581         auto vals = timeout.split!("seconds", "usecs")();
582         ntmo.tv_sec = cast(typeof(timeval.tv_sec)) vals.seconds;
583         ntmo.tv_usec = cast(typeof(timeval.tv_usec)) vals.usecs;
584         auto rc = () @trusted {
585             return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &ntmo, ntmo.sizeof);
586         }();
587         debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, ntmo);
588         rc = () @trusted {
589             return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &ntmo, ntmo.sizeof);
590         }();
591         debug(hiosocket) tracef("got setsockopt rcvtimeo: %d: %s", rc, ntmo);
592     }
593 
594     ///
595     /// connect synchronously (no loop, no fibers)
596     /// in blocked mode
597     ///
598     public bool connect(string addr, Duration timeout) @safe {
599         switch (_af) {
600         case AF_INET: {
601                 import core.sys.posix.netinet.in_;
602                 import core.sys.posix.sys.time: timeval;
603                 // addr must be "host:port"
604                 auto internet_addr = str2inetaddr(addr);
605 
606                 // save old timeout values and set new
607                 auto old_timeouts = getSndRcvTimeouts();
608                 setSndRcvTimeouts(timeout);
609 
610                 auto old_blocking = blocking();
611                 blocking(true);
612 
613                 sockaddr_in sin;
614                 sin.sin_family = _af;
615                 sin.sin_port = internet_addr[1];
616                 sin.sin_addr = in_addr(internet_addr[0]);
617                 uint sa_len = sin.sizeof;
618                 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))();
619                 auto connerrno = errno();
620 
621                 blocking(old_blocking);
622                 // restore timeouts
623                 setSndRcvTimeouts(old_timeouts);
624                 if (rc == -1 ) {
625                     debug(hiosocket) tracef("connect errno: %s %s", s_strerror(connerrno), sin);
626                     _connected = false;
627                     _state = State.IDLE;
628                     return false;
629                 }
630                 _state = State.IDLE;
631                 _connected = true;
632                 return true;
633             }
634         default:
635             throw new SocketException("unsupported address family");
636         }
637     }
638     ///
639     /// Return true if connect delayed
640     ///
641     public bool connect(string addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe {
642         assert(timeout > 0.seconds);
643         debug(hiosocket) tracef("enter connect to: %s", addr);
644         switch (_af) {
645             case AF_INET:
646                 {
647                     import core.sys.posix.netinet.in_;
648                     // addr must be "host:port"
649                     auto internet_addr = str2inetaddr(addr);
650                     sockaddr_in sin;
651                     sin.sin_family = _af;
652                     sin.sin_port = internet_addr[1];
653                     sin.sin_addr = in_addr(internet_addr[0]);
654                     uint sa_len = sin.sizeof;
655                     auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))();
656                     if ( rc == -1 && errno() != EINPROGRESS ) {
657                         debug(hiosocket) tracef("connect so %d to %s errno: %s", _fileno, addr, s_strerror(errno()));
658                         _connected = false;
659                         _state = State.IDLE;
660                         _errno = errno();
661                         f(AppEvent.ERR|AppEvent.IMMED);
662                         return false;
663                     }
664                     if ( rc == 0 )
665                     {
666                         debug(hiosocket) tracef("connected %d immediately to %s", _fileno, addr);
667                         _connected = true;
668                         _state = State.IDLE;
669                         f(AppEvent.OUT|AppEvent.IMMED);
670                         return true;
671                     }
672                     debug(hiosocket) tracef("connect %d to %s - wait for event", _fileno, addr);
673                     _loop = loop;
674                     _state = State.CONNECTING;
675                     _callback = f;
676                     _polling |= AppEvent.OUT;
677                     _connect_timer = new Timer(timeout, &timeoutHandler);
678                     _loop.startTimer(_connect_timer);
679                     loop.startPoll(_fileno, AppEvent.OUT, this);
680                 }
681                 break;
682             default:
683                 throw new SocketException("unsupported address family");
684         }
685         return true;
686     }
687 
688     public bool connect(Address addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe {
689         assert(timeout > 0.seconds);
690         switch (_af) {
691         case AF_INET: {
692                 debug(hiosocket) tracef("connecting to %s", addr);
693                 import core.sys.posix.netinet.in_;
694                 sockaddr_in *sin = cast(sockaddr_in*)(addr.name);
695                 uint sa_len = sockaddr.sizeof;
696                 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)sin, sa_len))();
697                 if (rc == -1 && errno() != EINPROGRESS) {
698                     debug(hiosocket) tracef("connect to %s errno: %s", addr, s_strerror(errno()));
699                     _connected = false;
700                     _errno = errno();
701                     _state = State.IDLE;
702                     f(AppEvent.ERR | AppEvent.IMMED);
703                     return false;
704                 }
705                 _loop = loop;
706                 _state = State.CONNECTING;
707                 _callback = f;
708                 _polling |= AppEvent.OUT;
709                 loop.startPoll(_fileno, AppEvent.OUT, this);
710             }
711             break;
712         default:
713             throw new SocketException("unsupported address family");
714         }
715         _connect_timer = new Timer(timeout, &timeoutHandler);
716         _loop.startTimer(_connect_timer);
717         return true;
718     }
719 
720     override public void accept(hlEvLoop loop, Duration timeout, void delegate(AsyncSocketLike) @safe f) {
721         _loop = loop;
722         _accept_callback = f;
723         _state = State.ACCEPTING;
724         _polling |= AppEvent.IN;
725         _connect_timer = new Timer(timeout, &timeoutHandler);
726         _loop.startTimer(_connect_timer);
727         loop.startPoll(_fileno, AppEvent.IN|AppEvent.EXT_EPOLLEXCLUSIVE, this);
728     }
729 
730     private auto w(ref Nbuff b) @trusted
731     {
732         import core.sys.posix.sys.uio: iovec, writev;
733         iovec[8] iov;
734         int n = b.toIoVec(&iov[0], 8);
735         msghdr hdr;
736         hdr.msg_iov = &iov[0];
737         hdr.msg_iovlen = n;
738         version(linux)
739         {
740             int flags = MSG_NOSIGNAL;
741         }
742         else
743         {
744             int flags;
745         }
746         long r = sendmsg(_fileno, &hdr, flags);
747         // long r = .writev(_fileno, &iov[0], n);
748         return r;
749     }
750 
751     void io_handler(AppEvent ev) @safe {
752         debug(hiosocket) tracef("event %s on fd %d", appeventToString(ev), _fileno);
753         if ( ev & AppEvent.SHUTDOWN )
754         {
755             _state = State.SHUTDOWN;
756             if ( _io_timer ) {
757                 _loop.stopTimer(_io_timer);
758                 _io_timer = null;
759             }
760             _result.error = true;
761             _iorq.callback(_result);
762             return;
763         }
764         if ( ev == AppEvent.TMO ) {
765             debug(hiosocket) tracef("io timedout, %s, %s", _iorq.output, this);
766             _loop.stopPoll(_fileno, _pollingFor);
767             _polling = AppEvent.NONE;
768             _state = State.IDLE;
769             if ( !_input.isNull() )
770             {
771                 _result.input = NbuffChunk(_input, _input.length);
772             }
773             _io_timer = null;
774             // return timeout flag
775             _result.timedout = true;
776             // make callback
777             _iorq.callback(_result);
778             return;
779         }
780         if ( ev & AppEvent.IN )
781         {
782             size_t _will_read = min(_buffer_size, _iorq.to_read);
783             debug(hiosocket) tracef("on read: _input.length: %d, _will_read: %d, _input_size: %d", _input_length, _will_read, _input.size);
784             assert(_input_length + _will_read <= _input.size);
785             auto rc = (() @trusted => recv(_fileno, &_input.data[_input_length], _will_read, 0))();
786             debug(hiosocket) tracef("recv on fd %d returned %d", _fileno, rc);
787             if ( rc < 0 )
788             {
789                 _state = State.IDLE;
790                 _result.error = true;
791                 _polling &= _pollingFor ^ AppEvent.ALL;
792                 _loop.stopPoll(_fileno, _pollingFor);
793                 if ( _io_timer ) {
794                     _loop.stopTimer(_io_timer);
795                     _io_timer = null;
796                 }
797                 _result.input = NbuffChunk(_input, _input.length);
798                 _iorq.callback(_result);
799                 return;
800             }
801             if ( rc > 0 )
802             {
803                 // b.length = rc;
804                 // debug(hiosocket) tracef("adding data %s", b);
805                 // _input ~= b;
806                 // b = null;
807                 _input_length += rc;
808                 _iorq.to_read -= rc;
809                 debug(hiosocket) tracef("after adding data have space for %s bytes more, allowPartialInput: %s", _iorq.to_read, _iorq.allowPartialInput);
810                 if ( _iorq.to_read == 0 || _iorq.allowPartialInput ) {
811                     _loop.stopPoll(_fileno, _pollingFor);
812                     _polling = AppEvent.NONE;
813                     _state = State.IDLE;
814                     if ( _io_timer ) {
815                         _loop.stopTimer(_io_timer);
816                         _io_timer = null;
817                     }
818                     _result.input = NbuffChunk(_input, _input_length);
819                     _iorq.callback(_result);
820                     return;
821                 }
822                 return;
823             }
824             if ( rc == 0 )
825             {
826                 // socket closed
827                 _loop.stopPoll(_fileno, _pollingFor);
828                 if ( _io_timer ) {
829                     _loop.stopTimer(_io_timer);
830                     _io_timer = null;
831                 }
832                 _result.input = NbuffChunk(_input, _input_length);
833                 _polling = AppEvent.NONE;
834                 _state = State.IDLE;
835                 _iorq.callback(_result);
836                 return;
837             }
838         }
839         if ( ev & AppEvent.OUT ) {
840             //debug(hiosocket) tracef("sending %s", _iorq.output);
841             assert(_result.output.length>0);
842             long rc;
843             do
844             {
845                 rc = w(_result.output);
846                 debug(hiosocket) tracef("sent %d bytes", rc);
847                 if ( rc > 0)
848                 {
849                     _result.output.pop(rc);
850                 }
851             }
852             while(rc > 0 && _result.output.length > 0);
853             assert(rc != 0);
854             if ( rc < 0 && errno() == EAGAIN )
855             {
856                 // can't send right now, just retry, wait for next event
857                 return;
858             }
859             if ( rc < 0 ) {
860                 // error sending
861                 _loop.stopPoll(_fileno, _pollingFor);
862                 if ( _io_timer ) {
863                     _loop.stopTimer(_io_timer);
864                     _io_timer = null;
865                 }
866                 if ( !_input.isNull() )
867                 {
868                     _result.input = NbuffChunk(_input, _input_length);
869                 }
870                 _polling = AppEvent.NONE;
871                 _state = State.IDLE;
872                 _result.error = true;
873                 _iorq.callback(_result);
874                 debug(hiosocket) tracef("send completed with error");
875                 return;
876             }
877             if ( _result.output.length == 0 ) {
878                 _loop.stopPoll(_fileno, _pollingFor);
879                 if ( _io_timer ) {
880                     _loop.stopTimer(_io_timer);
881                     _io_timer = null;
882                 }
883                 if ( !_input.isNull() )
884                 {
885                     _result.input = NbuffChunk(_input, _input_length);
886                 }
887                 _polling = AppEvent.NONE;
888                 _state = State.IDLE;
889                 _iorq.callback(_result);
890                 debug(hiosocket) tracef("send completed");
891                 return;
892             }
893         }
894         else
895         {
896             debug(hiosocket) tracef("Unhandled event on %d", _fileno);
897         }
898     
899     }
900     ///
901     /// Make blocking IO without evelnt loop.
902     /// Can be called from non-fiber context
903     ///
904     /// return IOResult
905     ///
906     IOResult io(IORequest iorq, in Duration timeout) @safe {
907         IOResult result;
908 
909         version (linux) {
910             immutable uint flags = MSG_NOSIGNAL;
911         } else {
912             immutable uint flags = 0;
913         }
914 
915         auto old_timeouts = getSndRcvTimeouts();
916         setSndRcvTimeouts(timeout);
917 
918         scope(exit) {
919             setSndRcvTimeouts(old_timeouts);
920         }
921 
922         debug(hiosocket) tracef("Blocked io request %s", iorq);
923 
924         // handle requested output
925         result.output = iorq.output;
926         while(result.output.length > 0)
927         {
928             auto rc = w(result.output);
929             if ( rc > 0)
930             {
931                 result.output.pop(rc);
932                 continue;
933             }
934             assert(rc<0);
935             result.error = true;
936             return result;
937         }
938         // handle requested input
939         size_t to_read = iorq.to_read;
940         if ( to_read > 0 ) {
941             //ubyte[] buffer = new ubyte[](to_read);
942             auto   buffer = Nbuff.get(to_read);
943             size_t ptr, l;
944 
945             while(to_read>0) {
946                 auto rc = () @trusted {
947                     return .recv(_fileno, cast(void*)&buffer.data[ptr], to_read, 0);
948                 }();
949                 //debug(hiosocket) tracef("got %d bytes to: %s", rc, buffer);
950                 if ( rc == 0 || (rc > 0 && iorq.allowPartialInput) ) {
951                     // client closed connection
952                     l += rc;
953                     buffer.length = l;
954                     result.input = NbuffChunk(buffer, l);
955                     debug(hiosocket) tracef("Blocked io returned %s", result);
956                     return result;
957                 }
958                 if ( rc < 0 ) {
959                     buffer.length = l;
960                     result.error = true;
961                     result.input = NbuffChunk(buffer, l);
962                     debug(hiosocket) tracef("Blocked io returned %s (%s)", result, s_strerror(errno()));
963                     return result;
964                 }
965                 to_read -= rc;
966                 ptr += rc;
967                 l += rc;
968             }
969             buffer.length = l;
970             result.input = NbuffChunk(buffer, l);
971             debug(hiosocket) tracef("Blocked io returned %s", result);
972             return result;
973         }
974         debug(hiosocket) tracef("Blocked io returned %s", result);
975         return result;
976     }
977     ///
978     /// Make unblocked IO using loop
979     ///
980     int io(hlEvLoop loop, ref IORequest iorq, in Duration timeout) @safe {
981 
982         _loop = loop;
983 
984         _iorq = iorq;
985         _result = IOResult();
986         _state = State.IO;
987 
988         AppEvent ev = AppEvent.NONE;
989         if ( iorq.output.length ) {
990             ev |= AppEvent.OUT;
991             _result.output = _iorq.output;
992         }
993         if ( iorq.to_read > 0 ) {
994             ev |= AppEvent.IN;
995             _input = Nbuff.get(iorq.to_read);
996             _input_length = 0;
997         }
998         _pollingFor = ev;
999         assert(_pollingFor != AppEvent.NONE, "No read or write requested");
1000 
1001         if (_io_timer && timeout<=0.seconds) {
1002             debug(hiosocket) tracef("closing prev timer: %s", _io_timer);
1003             _loop.stopTimer(_io_timer);
1004             _io_timer = null;
1005         }
1006 
1007         if ( timeout > 0.seconds ) {
1008             if ( _io_timer )
1009             {
1010                 _io_timer.rearm(timeout);
1011             } else
1012             {
1013                 _io_timer = new Timer(timeout, &io_handler);
1014             }
1015             _loop.startTimer(_io_timer);
1016         }
1017         _loop.startPoll(_fileno, _pollingFor, this);
1018         return 0;
1019     }
1020     /**
1021     * just send, no callbacks, no timeouts, nothing
1022     * returns what os-level send returns
1023     **/
1024     long send(immutable(ubyte)[] data) @trusted {
1025         version(linux)
1026         {
1027             int flags = MSG_NOSIGNAL;
1028         }
1029         else
1030         {
1031             int flags;
1032         }
1033         return .send(_fileno, data.ptr, data.length, flags);
1034     }
1035     /**************************************************************************
1036      * Send data from data buffer
1037      * input: data     - data to send
1038      *        timeout  - how long to wait until timedout
1039      *        callback - callback which accept IOResult
1040      * 1. try to send as much as possible. If complete data sent, then return
1041      *    IOresult with empty output and clean timeout and error fileds.
1042      * 2. If we can't send complete buffer, then prepare io call and return 
1043      *    nonempty result output.
1044      * So at return user have to check:
1045      * a) if result.error == true - send failed
1046      * b) if result.data.empty - data send completed
1047      * c) otherwise io call were issued, user will receive callback
1048      **************************************************************************/
1049     IOResult send(hlEvLoop loop, immutable(ubyte)[] data, Duration timeout, void delegate(ref IOResult) @safe callback) @safe {
1050 
1051         enforce!SocketException(data.length > 0, "You must have non-empty 'data' when calling 'send'");
1052 
1053         IOResult result;
1054         Nbuff o;
1055         NbuffChunk d = NbuffChunk(data);
1056         o.append(d);
1057         result.output = o;
1058 
1059         uint flags = 0;
1060         version(linux) {
1061             flags = MSG_NOSIGNAL;
1062         }
1063         auto rc = (() @trusted => .send(_fileno, &data[0], data.length, flags))();
1064 	debug(hiosocket) tracef("fast .send so %d rc = %d", _fileno, rc);
1065         if ( rc < 0 ) {
1066             auto err = errno();
1067             if ( err != EWOULDBLOCK && err != EAGAIN ) {
1068                 // case a.
1069                 result.error = true;
1070                 return result;
1071             }
1072             rc = 0; // like we didn't sent anything
1073         }
1074         //data = data[rc..$];
1075         debug(hiosocket) tracef(".send result %d", rc);
1076         result.output = result.output[rc..$];
1077         if ( result.output.empty ) {
1078             // case b. send comleted
1079             debug(hiosocket) tracef("fast send to %d completed", _fileno);
1080             return result;
1081         }
1082         // case c. - we have to use event loop
1083         IORequest iorq;
1084         iorq.output = result.output;
1085         iorq.callback = callback;
1086         io(loop, iorq, timeout);
1087         result.output = iorq.output;
1088         return result;
1089     }
1090 }
1091 
1092 
1093 private auto str2inetaddr(string addr) @safe pure {
1094     auto pos = indexOf(addr, ':');
1095     if ( pos == -1 ) {
1096         throw new Exception("incorrect addr %s, expect host:port", addr);
1097     }
1098     auto host = addr[0..pos].split('.');
1099     auto port = addr[pos+1..$];
1100     // auto s = addr.split(":");
1101     // if ( s.length != 2 ) {
1102     //     throw new Exception("incorrect addr %s, expect host:port", addr);
1103     // }
1104     // host = s[0].split(".");
1105     if ( host.length != 4 ) {
1106         throw new Exception("addr must be in form a.b.c.d:p, got: " ~ addr);
1107     }
1108     uint   a = to!ubyte(host[0]) << 24 | to!ubyte(host[1]) << 16 | to!ubyte(host[2]) << 8 | to!ubyte(host[3]);
1109     ushort p = to!ushort(port);
1110     return tuple(core.sys.posix.arpa.inet.htonl(a), core.sys.posix.arpa.inet.htons(p));
1111 }
1112 
1113 @safe unittest {
1114     import core.sys.posix.arpa.inet;
1115     assert(str2inetaddr("0.0.0.0:1") == tuple(0, htons(1)));
1116     assert(str2inetaddr("1.0.0.0:0") == tuple(htonl(0x01000000),0 ));
1117     assert(str2inetaddr("255.255.255.255:0") == tuple(0xffffffff, 0));
1118 }
1119 
1120 @safe unittest {
1121     globalLogLevel = LogLevel.info;
1122 
1123     hlSocket s0 = new hlSocket();
1124     s0.open();
1125     hlSocket s1 = s0;
1126     s0.close();
1127     s1.close();
1128 }
1129 
1130 @safe unittest {
1131     globalLogLevel = LogLevel.info;
1132 
1133     hlSocket s = new hlSocket();
1134     s.open();
1135     s.close();
1136 }
1137 
1138 class HioSocket
1139 {
1140     import core.thread;
1141 
1142     struct InputStream {
1143         private {
1144             size_t      _buffer_size = 16*1024;
1145             Duration    _timeout = 1.seconds;
1146             HioSocket   _socket;
1147             bool        _started;
1148             bool        _done;
1149             NbuffChunk  _data;
1150         }
1151         this(HioSocket s, Duration t = 10.seconds) @safe {
1152             _socket = s;
1153             _timeout = t;
1154             _buffer_size = s._socket._buffer_size;
1155         }
1156         bool empty() {
1157             return _started && _done;
1158         }
1159         auto front() {
1160             if ( _done ) {
1161                 _data = _data[0..0];
1162                 return _data;
1163             }
1164             if (!_started ) {
1165                 _started = true;
1166                 auto r = _socket.recv(_buffer_size, _timeout);
1167                 if (r.timedout || r.error || r.input.length == 0) {
1168                     _done = true;
1169                 } else {
1170                     _data = r.input;
1171                 }
1172             }
1173             debug(hiosocket) tracef("InputStream front: %s", _data);
1174             return _data;
1175         }
1176         void popFront() {
1177             auto r = _socket.recv(_buffer_size, _timeout);
1178             if (r.timedout || r.error || r.input.length == 0) {
1179                 _done = true;
1180             }
1181             else {
1182                 _data = r.input;
1183             }
1184         }
1185     }
1186     private {
1187         hlSocket _socket;
1188         Fiber    _fiber;
1189     }
1190     this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) @safe {
1191         _socket = new hlSocket(af, sock_type, f, l);
1192         if ( _socket.open() == false )
1193         {
1194             throw new SocketException("Can't open socket: %s".format(s_strerror(_socket._errno)));
1195         }
1196     }
1197 
1198     this(int fileno, ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) {
1199         _socket = new hlSocket(fileno, af, sock_type, f, l);
1200     }
1201 
1202     this(hlSocket so)
1203     {
1204         _socket = so;
1205     }
1206 
1207     ~this() {
1208         // if ( _socket ) {
1209         //     _socket.close();
1210         // }
1211     }
1212     override string toString() {
1213         return _socket.toString();
1214     }
1215     void bufferSize(int s)
1216     {
1217         _socket._buffer_size = s;
1218     }
1219     auto fileno()
1220     {
1221         return _socket.fileno;
1222     }
1223 
1224     void bind(string addr) {
1225         _socket.bind(addr);
1226     }
1227     void handler(AppEvent e) @safe {
1228         debug
1229         {
1230             tracef("HioSocket handler enter %s", e);
1231         }
1232         assert(_fiber !is null);
1233         (()@trusted{_fiber.call();})();
1234     }
1235     void connect(Address addr, Duration timeout) @trusted {
1236         if ( _socket._state == hlSocket.State.SHUTDOWN )
1237         {
1238             throw new LoopShutdownException("shutdown before connect");
1239         }
1240         auto loop = getDefaultLoop();
1241         _fiber = Fiber.getThis();
1242         bool connected;
1243         void callback(AppEvent e) {
1244             if ( e & AppEvent.OUT )
1245             {
1246                 connected = true;
1247             }
1248             if (!(e & AppEvent.IMMED)) {
1249                 if ( e & AppEvent.TMO )
1250                 {
1251                     // timedout;
1252                     _socket._errno = ETIMEDOUT;
1253                 }
1254                 (() @trusted { _fiber.call(); })();
1255             }
1256         }
1257 
1258         if (_socket.connect(addr, loop, &callback, timeout) && !connected)
1259         {
1260             Fiber.yield();
1261         }
1262         if ( _socket._state == hlSocket.State.SHUTDOWN )
1263         {
1264             throw new LoopShutdownException("shutdown while connect");
1265         }
1266         if ( _socket._errno == ECONNREFUSED ) {
1267             throw new ConnectionRefused("Unable to connect socket: connection refused on %s".format(addr));
1268         }
1269         if ( ! _socket.connected )
1270         {
1271             throw new SocketException("failed to connect to %s: %s".format(addr, s_strerror(_socket._errno)));
1272         }
1273     }
1274     ///
1275     void connect(string addr, Duration timeout) @trusted {
1276         assert(_socket);
1277         if ( _socket._state == hlSocket.State.SHUTDOWN )
1278         {
1279             throw new LoopShutdownException("shutdown before connect for %s".format(_socket));
1280         }
1281         auto loop = getDefaultLoop();
1282         _fiber = Fiber.getThis();
1283         if ( _fiber is null ) {
1284             // we are not in context of any task, connect synchronously
1285             // 1. set blocking mode, socket timeout
1286             // 2. call connect, throw if faied
1287             // 3. set unblocking mode
1288             // 4. return
1289             _socket.blocking = true;
1290             _socket.connect(addr, timeout);
1291             _socket.blocking = false;
1292             return;
1293         }
1294         bool connected;
1295         void callback(AppEvent e) {
1296             debug(hiosocket) tracef("connect so %d - got event %s", _socket._fileno, appeventToString(e));
1297             if ( e & AppEvent.OUT )
1298             {
1299                 connected = true;
1300             }
1301             if ( !(e & AppEvent.IMMED) ) {
1302                 // we called yield
1303                 if ( e & AppEvent.TMO )
1304                 {
1305                     // timedout;
1306                     _socket._errno = ETIMEDOUT;
1307                 }
1308                 (() @trusted { _fiber.call(); })();
1309             }
1310         }
1311         if ( _socket.connect(addr, loop, &callback, timeout) && !connected)
1312         {
1313             debug(hiosocket) tracef("connect so %d - wait for event", _socket._fileno);
1314             Fiber.yield();
1315         }
1316         assert(_socket);
1317         if ( _socket._state == hlSocket.State.SHUTDOWN )
1318         {
1319             throw new LoopShutdownException("shutdown while connect for %s".format(_socket));
1320         }
1321         if ( _socket._errno == ECONNREFUSED ) {
1322             throw new ConnectionRefused("Unable to connect socket: connection refused on " ~ addr);
1323         }
1324     }
1325     ///
1326     bool connected() const @safe {
1327         return _socket.connected;
1328     }
1329     auto strerror()
1330     {
1331         return s_strerror(errno());
1332     }
1333     ///
1334     auto errno() @safe {
1335         return _socket.socket_errno();
1336     }
1337     ///
1338     auto listen(int backlog = 10) @safe {
1339         return _socket.listen(backlog);
1340     }
1341     ///
1342     void close() @safe {
1343         if ( _socket ) {
1344             assert(_socket._state == hlSocket.State.IDLE
1345                 || _socket._state == hlSocket.State.SHUTDOWN
1346                 || _socket._state == hlSocket.State.NEW,
1347                 "You can call close() on socket only when it is in IDLE state, not in %s(%s)".format(_socket._state, _socket));
1348             _socket.close();
1349             _socket = null;
1350         }
1351     }
1352     ///
1353     private HioSocket _accept_socket;
1354     void accept_callback(AsyncSocketLike so) scope @trusted {
1355         debug(hiosocket) tracef("Got %s on accept", so);
1356         if ( so is null ) {
1357             _accept_socket = null;
1358             _fiber.call();
1359             return;
1360         }
1361         debug(hiosocket) tracef("got accept callback for socket %d", fileno);
1362         if ( _socket._polling & AppEvent.IN ) {
1363             getDefaultLoop.stopPoll(_socket.fileno, AppEvent.IN);
1364             _socket._polling &= ~AppEvent.IN;
1365         }
1366         _socket._state = hlSocket.State.IDLE;
1367         _accept_socket = new HioSocket(cast(hlSocket)so);
1368         _fiber.call();
1369     }
1370     auto accept(Duration timeout = Duration.max) {
1371         HioSocket s;
1372 
1373         auto loop = getDefaultLoop();
1374         _fiber = Fiber.getThis();
1375 
1376         _socket._accepts_in_a_row = 10;
1377         _socket.accept(loop, timeout, &accept_callback);
1378         Fiber.yield();
1379         if ( _socket._state == hlSocket.State.SHUTDOWN )
1380         {
1381             throw new LoopShutdownException("shutdown while accepting");
1382         }
1383         return _accept_socket;
1384     }
1385     ///
1386     IOResult recv(size_t n, Duration timeout = 10.seconds, Flag!"allowPartialInput" allowPartialInput = Yes.allowPartialInput) @trusted {
1387         assert(_socket);
1388         IORequest ioreq;
1389         IOResult  iores;
1390         import core.stdc.stdio;
1391 
1392         ioreq.allowPartialInput = cast(bool)allowPartialInput;
1393 
1394         _fiber = Fiber.getThis();
1395         if ( _fiber is null) {
1396             // read not in context of any fiber. Blocked read.
1397             _socket.blocking = true;
1398             ioreq.to_read = n;
1399             iores = _socket.io(ioreq, timeout);
1400             _socket.blocking = false;
1401             return iores;
1402         }
1403         void callback(ref IOResult ior) @trusted {
1404             //debug(hiosocket) tracef("got ior on recv: %s", ior);
1405             iores = ior;
1406             _fiber.call();
1407         }
1408 
1409         ioreq.to_read = n;
1410         ioreq.callback = &callback;
1411         _socket.io(getDefaultLoop(), ioreq, timeout);
1412         //debug(hiosocket) infof("recv yielding on %s", _socket);
1413         Fiber.yield();
1414         return iores;
1415     }
1416     ///
1417     size_t send(ref Nbuff data, Duration timeout = 1.seconds) @trusted {
1418         _fiber = Fiber.getThis();
1419         IOResult  ioresult;
1420         IORequest iorq;
1421 
1422         if ( _fiber is null ) {
1423             IORequest ioreq;
1424             _socket.blocking = true;
1425             ioreq.output = data;
1426             ioresult = _socket.io(ioreq, timeout);
1427             _socket.blocking = false;
1428             if ( ioresult.error ) {
1429                 return -1;
1430             }
1431             return 0;
1432         }
1433 
1434         void callback(ref IOResult ior) @trusted {
1435             ioresult = ior;
1436             _fiber.call();
1437         }
1438         iorq.callback = &callback;
1439         iorq.output = data;
1440 
1441         // ioresult = _socket.send(getDefaultLoop(), data.data.data, timeout, &callback);
1442         // if ( ioresult.error ) {
1443         //     return -1;
1444         // }
1445         // if ( ioresult.output.empty ) {
1446         //     return data.length;
1447         // }
1448         _socket.io(getDefaultLoop(), iorq, timeout);
1449         Fiber.yield();
1450         if (ioresult.error) {
1451             return -1;
1452         }
1453         return data.length - ioresult.output.length;
1454     }
1455     ///
1456     size_t send(immutable (ubyte)[] data, Duration timeout = 1.seconds) @trusted {
1457         assert(_socket);
1458         _fiber = Fiber.getThis();
1459         IOResult ioresult;
1460         debug(hiosocket) tracef("enter send to so %d", _socket._fileno);
1461 
1462         if ( _fiber is null ) {
1463             IORequest ioreq;
1464             _socket.blocking = true;
1465             ioreq.output = Nbuff(data);
1466             ioresult = _socket.io(ioreq, timeout);
1467             _socket.blocking = false;
1468             if ( ioresult.error ) {
1469                 return -1;
1470             }
1471             return 0;
1472         }
1473 
1474         void callback(ref IOResult ior) @trusted {
1475             ioresult = ior;
1476             _fiber.call();
1477         }
1478         ioresult = _socket.send(getDefaultLoop(), data, timeout, &callback);
1479         if ( ioresult.error ) {
1480             return -1;
1481         }
1482         if ( ioresult.output.empty ) {
1483             return data.length;
1484         }
1485         Fiber.yield();
1486         assert(_socket);
1487         if (ioresult.error) {
1488             return -1;
1489         }
1490         return data.length - ioresult.output.length;
1491     }
1492     ///
1493     InputStream inputStream(Duration t=10.seconds) @safe {
1494         return InputStream(this, t);
1495     }
1496 }
1497 struct LineReader
1498 {
1499     private
1500     {
1501         enum    NL = "\n".representation;
1502         HioSocket   _socket;
1503         Nbuff       _buff;
1504         size_t      _last_position = 0;
1505         bool        _done;
1506         ushort      _buffer_size;
1507     }
1508     /// constructor
1509     this(HioSocket s, ushort bs = 16*1024)
1510     {
1511         _socket = s;
1512         _buffer_size = bs;
1513     }
1514     /// if input stream closed or errored
1515     bool done()
1516     {
1517         return _done;
1518     }
1519     /// what we have in the buffer after fetching last line
1520     auto rest()
1521     {
1522         return _buff;
1523     }
1524     /// read next line
1525     Nbuff readLine(Duration timeout = 10.seconds)
1526     {
1527         while(!_done)
1528         {
1529             //debug(hiosocket) tracef("count until NL starting from %d", _last_position);
1530             long p = _buff.countUntil(NL, _last_position);
1531             if (p>=0)
1532             {
1533                 Nbuff line = _buff[0 .. p];
1534                 _last_position = 0;
1535                 _buff.pop(p+1);
1536                 debug(hiosocket) tracef("got line %s", line.data);
1537                 return line;
1538             }
1539             _last_position = _buff.length;
1540             auto r = _socket.recv(_buffer_size, timeout);
1541             if (r.timedout || r.error || r.input.length == 0) {
1542                 debug(hiosocket) tracef("got terminal result %s", r);
1543                 _done = true;
1544                 return _buff;
1545             } else {
1546                 debug(hiosocket) tracef("append %d bytes", r.input.length);
1547                 _buff.append(r.input, 0, r.input.length);
1548             }
1549         }
1550         return Nbuff();
1551     }
1552 }
1553 
1554 unittest {
1555     import core.thread;
1556     import hio.scheduler;
1557     void server(ushort port) {
1558         auto s = new HioSocket();
1559         scope(exit)
1560         {
1561             s.close();
1562         }
1563         s.bind("127.0.0.1:%s".format(port));
1564         s.listen();
1565         auto c = s.accept(2.seconds);
1566         if ( c is null ) {
1567             s.close();
1568             throw new Exception("Accept failed");
1569         }
1570         auto io = c.recv(64, 1.seconds);
1571         assert(io.input == "hello".representation);
1572         c.send("world".representation);
1573         c.close();
1574         s.close();
1575     }
1576     void client(ushort port) {
1577         auto s = new HioSocket();
1578         scope(exit) {
1579             s.close();
1580         }
1581         s.connect("127.0.0.1:%d".format(port), 1.seconds);
1582         if ( s.connected ) {
1583             auto rq = s.send("hello".representation, 1.seconds);
1584             auto rs = s.recv(64, 1.seconds);
1585             assert(rs.input == "world".representation);
1586         } else {
1587             throw new Exception("Can't connect to server");
1588         }
1589     }
1590     void hlClient(ushort port)
1591     {
1592         auto s = new hlSocket();
1593         s.open();
1594         scope(exit)
1595         {
1596             s.close();
1597         }
1598         auto ok = s.connect("127.0.0.1:%d".format(port), 1.seconds);
1599         IORequest iorq;
1600         iorq.output = Nbuff("hello");
1601         iorq.to_read = 8;
1602         s.blocking = true;
1603         IOResult iors = s.io(iorq, 1.seconds);
1604         assert(iors.input == "world".representation);
1605         infof("hlClient done");
1606     }
1607     globalLogLevel = LogLevel.info;
1608     info("Test hlSocket");
1609     auto t = new Thread({
1610         scope(exit)
1611         {
1612             uninitializeLoops();
1613         }
1614         try{
1615             App(&server, cast(ushort)12345);
1616         } catch (Exception e) {
1617             infof("Got %s in server", e);
1618         }
1619     }).start;
1620     Thread.sleep(500.msecs);
1621     hlClient(12345);
1622     t.join;
1623     globalLogLevel = LogLevel.info;
1624 
1625     info("Test HioSockets 0");
1626 
1627     // all ok case
1628     t = new Thread({
1629         scope(exit)
1630         {
1631             uninitializeLoops();
1632         }
1633         try{
1634             App(&server, cast(ushort)12345);
1635         } catch (Exception e) {
1636             infof("Got %s in server", e);
1637         }
1638     }).start;
1639     Thread.sleep(500.msecs);
1640     client(12345);
1641     t.join;
1642 
1643     info("Test HioSockets 1");
1644     // all fail case - everything should throw
1645     t = new Thread({
1646         scope(exit)
1647         {
1648             uninitializeLoops();
1649         }
1650         App(&server, cast(ushort) 12345);
1651     }).start;
1652     assertThrown!Exception(client(12346));
1653     assertThrown!Exception(t.join);
1654 
1655     info("Test HioSockets 2");
1656     // the same but client in App
1657     // all ok case
1658     globalLogLevel = LogLevel.info;
1659     t = new Thread({
1660         scope(exit)
1661         {
1662             uninitializeLoops();
1663         }
1664         App(&server, cast(ushort) 12345);
1665     }).start;
1666     Thread.sleep(500.msecs);
1667     App(&client, cast(ushort)12345);
1668     t.join;
1669 
1670     info("Test HioSockets 3");
1671     // all fail case - everything should throw
1672     t = new Thread({
1673         scope(exit)
1674         {
1675             uninitializeLoops();
1676         }
1677         App(&server, cast(ushort) 12345);
1678     }).start;
1679     assertThrown!Exception(App(&client, cast(ushort)12346));
1680     assertThrown!Exception(t.join);
1681 
1682     info("Test lineReader");
1683     globalLogLevel = LogLevel.info;
1684     t = new Thread({
1685         scope(exit)
1686         {
1687             uninitializeLoops();
1688         }
1689         App({
1690             auto s = new HioSocket();
1691             s.bind("127.0.0.1:12345");
1692             s.listen();
1693             auto c = s.accept(2.seconds);
1694             if ( c is null ) {
1695                 s.close();
1696                 return;
1697             }
1698             foreach(l; ["\n", "a\n", "bb\n"])
1699             {
1700                 c.send(l.representation, 1.seconds);
1701             }
1702             hlSleep(100.msecs);
1703             c.send("ccc\nrest".representation, 100.msecs);
1704             c.recv(64, 1.seconds);
1705             c.close();
1706             s.close();
1707         });
1708     }).start;
1709     Thread.sleep(100.msecs);
1710     App({
1711         auto s = new HioSocket();
1712         s.connect("127.0.0.1:12345", 10.seconds);
1713         auto reader = LineReader(s);
1714         auto e = reader.readLine();
1715         auto a = reader.readLine();
1716         auto b = reader.readLine();
1717         auto c = reader.readLine();
1718         auto rest = reader.rest();
1719         s.send("die".representation, 1.seconds);
1720         assert(e.length == 0);
1721         assert(a.data == "a".representation);
1722         assert(b.data == "bb".representation);
1723         assert(c.data == "ccc".representation);
1724         assert(rest.data == "rest".representation);
1725         s.close();
1726     });
1727     t.join;
1728 }