1 module hio.socket; 2 3 import std.typecons; 4 import std.string; 5 import std.conv; 6 import std.traits; 7 import std.datetime; 8 import std.exception; 9 import std.algorithm; 10 11 import std.algorithm.comparison: min; 12 13 import std.experimental.logger; 14 15 import core.memory: pureMalloc, GC; 16 import core.exception : onOutOfMemoryError; 17 18 import std.experimental.allocator; 19 import std.experimental.allocator.building_blocks; 20 21 //static import std.socket; 22 23 import std.socket; 24 25 import core.sys.posix.sys.socket; 26 import core.sys.posix.unistd; 27 import core.sys.posix.arpa.inet; 28 import core.sys.posix.netinet.tcp; 29 import core.sys.posix.netinet.in_; 30 import core.sys.posix.sys.time : timeval; 31 32 import core.sys.posix.fcntl; 33 34 import core.stdc.string; 35 import core.stdc.errno; 36 import core.stdc.stdio: printf; 37 38 public import hio.events; 39 import hio.common; 40 import nbuff; 41 42 import hio; 43 44 //alias Socket = RefCounted!SocketImpl; 45 46 alias AcceptFunction = void function(int fileno); 47 alias AcceptDelegate = void delegate(hlSocket); 48 49 // static ~this() { 50 // trace("deinit"); 51 // } 52 53 //hlSocket[int] fd2so; 54 // 55 //void loopCallback(int fd, AppEvent ev) @safe { 56 // debug(hiosocket) tracef("loopCallback for %d", fd); 57 // hlSocket s = fd2so[fd]; 58 // if ( s && s._fileno >= 0) { 59 // debug(hiosocket) tracef("calling handler(%s) for %s", appeventToString(ev), s); 60 // s._handler(ev); 61 // } else { 62 // infof("impossible event %s on fd: %d", appeventToString(ev), fd); 63 // } 64 //} 65 66 class SocketException : Exception { 67 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe { 68 super(msg, file, line); 69 } 70 } 71 72 class ConnectionRefused : Exception { 73 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe { 74 super(msg, file, line); 75 } 76 } 77 78 class Timeout : Exception { 79 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe { 80 super(msg, file, line); 81 } 82 } 83 84 bool isLinux() pure nothrow @nogc @safe { 85 version(linux) { 86 return true; 87 } else { 88 return false; 89 } 90 } 91 92 interface AsyncSocketLike 93 { 94 bool open() @safe; 95 void close() @safe; 96 bool connected() @safe; 97 void bind(Address addr); 98 bool connect(Address addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe; 99 void accept(hlEvLoop loop, Duration timeout, void delegate(AsyncSocketLike) @safe f) @safe; 100 int io(hlEvLoop, ref IORequest, Duration) @safe; 101 } 102 103 // callback based socket 104 class hlSocket : FileEventHandler, AsyncSocketLike { 105 private { 106 enum State { 107 NEW = 0, 108 IDLE, 109 CONNECTING, 110 ACCEPTING, 111 IO, 112 SHUTDOWN, 113 } 114 enum Flags 115 { 116 EXTERNALLY_MANAGED_FD = 1, // socket fd created and managed outside of this class 117 } 118 ushort _flags; 119 immutable ubyte _af = AF_INET; 120 immutable int _sock_type = SOCK_STREAM; 121 int _fileno = -1; 122 int _errno; 123 HandlerDelegate _handler; 124 AppEvent _polling = AppEvent.NONE; 125 size_t _buffer_size = 16*1024; 126 hlEvLoop _loop; 127 immutable string _file; 128 immutable int _line; 129 State _state = State.NEW; 130 HandlerDelegate _callback; 131 // accept related fields 132 void delegate(AsyncSocketLike) @safe _accept_callback; 133 // io related fields 134 IORequest _iorq; 135 IOResult _result; 136 Timer _connect_timer; 137 Timer _io_timer; 138 AppEvent _pollingFor = AppEvent.NONE; 139 MutableNbuffChunk _input; 140 size_t _input_length; 141 bool _connected; 142 uint _accepts_in_a_row = 10; 143 } 144 override string describe() @safe 145 { 146 return "hlSocket: " 147 ~"_state: %s; ".format(_state) 148 ~"_file(_line): %s:%s; ".format(_file, _line) 149 ~"_fileno: %s; ".format(_fileno) 150 ~"_polling: %s; ".format(appeventToString(_polling)) 151 ~"_connected: %s; ".format(_connected) 152 ~"_conn_timer: [%s]; ".format(_connect_timer) 153 ~"_io_timer: [%s]; ".format(_io_timer) 154 ~"_callback: %s; ".format(_callback) 155 ; 156 } 157 this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) @safe { 158 debug(hiosocket) tracef("create socket"); 159 _af = af; 160 _sock_type = sock_type; 161 _file = f; 162 _line = l; 163 } 164 165 this(int s, ubyte af = AF_INET, int sock_type = 0, string f = __FILE__, int l = __LINE__) @safe 166 in {assert(s>=0);} 167 body { 168 _af = af; 169 _sock_type = sock_type; 170 _fileno = s; 171 _flags |= Flags.EXTERNALLY_MANAGED_FD; 172 _file = f; 173 _line = l; 174 auto flags = (() @trusted => fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK)(); 175 (() @trusted => fcntl(_fileno, F_SETFL, flags))(); 176 } 177 178 179 ~this() { 180 // if ( _fileno != -1 ) { 181 // if ( _loop && _polling != AppEvent.NONE ) { 182 // _loop.stopPoll(_fileno, _polling); 183 // _loop.detach(_fileno); 184 // } 185 // //fd2so[_fileno] = null; 186 // .close(_fileno); 187 // _fileno = -1; 188 // } 189 // close(); 190 assert(_io_timer is null); 191 assert(_connect_timer is null); 192 } 193 194 override string toString() const @safe { 195 import std.format: format; 196 return "socket: fileno: %d, (%s:%d)".format(_fileno, _file, _line); 197 } 198 public bool connected() const pure @safe nothrow { 199 return _connected; 200 } 201 202 public auto fileno() const pure @safe nothrow { 203 return _fileno; 204 } 205 206 public auto socket_errno() const pure @safe nothrow { 207 return _errno; 208 } 209 210 bool blocking() @property @safe 211 { 212 auto flags = () @trusted {return fcntl(_fileno, F_GETFL, 0);}(); 213 return cast(bool)(flags & O_NONBLOCK); 214 } 215 void blocking(bool blocking) @property @safe { 216 auto flags = () @trusted {return fcntl(_fileno, F_GETFL, 0);}(); 217 if ( blocking ) { 218 (() @trusted => fcntl(_fileno, F_SETFL, flags & ~O_NONBLOCK))(); 219 } else { 220 (() @trusted => fcntl(_fileno, F_SETFL, flags | O_NONBLOCK))(); 221 } 222 } 223 void timeoutHandler(AppEvent e) @safe { 224 debug 225 { 226 tracef("Timeout handler: %s", appeventToString(e)); 227 } 228 final switch (_state) { 229 case State.SHUTDOWN: 230 assert(0); 231 case State.NEW: 232 assert(0); 233 case State.IDLE: 234 assert(0); 235 case State.CONNECTING: 236 debug(hiosocket) tracef("connection timed out"); 237 _connected = false; 238 _errno = ETIMEDOUT; 239 _polling = AppEvent.NONE; 240 _loop.stopPoll(_fileno, AppEvent.OUT); 241 _loop.detach(_fileno); 242 _state = State.IDLE; 243 _connect_timer = null; 244 if ( e & AppEvent.SHUTDOWN) 245 { 246 _state = State.SHUTDOWN; 247 } 248 _callback(e); 249 return; 250 case State.ACCEPTING: 251 debug(hiosocket) tracef("accept timed out"); 252 _connected = false; 253 _errno = ETIMEDOUT; 254 _polling = AppEvent.NONE; 255 _loop.stopPoll(_fileno, AppEvent.IN); 256 _loop.detach(_fileno); 257 _state = State.IDLE; 258 if ( e & AppEvent.SHUTDOWN) 259 { 260 _state = State.SHUTDOWN; 261 } 262 _accept_callback(null); 263 return; 264 case State.IO: 265 assert(0); 266 } 267 } 268 override void eventHandler(int fd, AppEvent e) @safe { 269 debug(hiosocket) tracef("event %s in state %s", appeventToString(e), _state); 270 final switch ( _state ) { 271 case State.SHUTDOWN: 272 return; 273 assert(0); 274 case State.NEW: 275 if ( e & AppEvent.SHUTDOWN) 276 { 277 _state = State.SHUTDOWN; 278 return; 279 } 280 assert(0); 281 case State.IDLE: 282 if ( e & AppEvent.SHUTDOWN) 283 { 284 _state = State.SHUTDOWN; 285 return; 286 } 287 assert(0); 288 case State.CONNECTING: 289 debug(hiosocket) tracef("connection event: %s", appeventToString(e)); 290 if ( e & AppEvent.SHUTDOWN) 291 { 292 _state = State.SHUTDOWN; 293 _connected = false; 294 if ( _connect_timer ) 295 { 296 _loop.stopTimer(_connect_timer); 297 _connect_timer = null; 298 } 299 _polling = AppEvent.NONE; 300 _loop.stopPoll(_fileno, AppEvent.OUT); 301 _callback(e); 302 return; 303 } 304 assert(e & (AppEvent.OUT|AppEvent.HUP|AppEvent.ERR), "We can handle only OUT event in connecting state, but got %s".format(e)); 305 if ( e & AppEvent.OUT ) 306 { 307 _connected = true; 308 } 309 if ( (e & (AppEvent.HUP|AppEvent.ERR)) ) { 310 int err; 311 uint err_s = err.sizeof; 312 auto rc = (() @trusted => .getsockopt(_fileno, SOL_SOCKET, SO_ERROR, &err, &err_s))(); 313 if ( rc == 0 ) { 314 _errno = err; 315 debug(hiosocket) tracef("error connecting: %s", s_strerror(err)); 316 } 317 _connected = false; 318 } 319 _polling = AppEvent.NONE; 320 if ( e & AppEvent.SHUTDOWN) 321 { 322 _state = State.SHUTDOWN; 323 } 324 else 325 { 326 _state = State.IDLE; 327 } 328 if ( _connect_timer ) { 329 _loop.stopTimer(_connect_timer); 330 _connect_timer = null; 331 } 332 _loop.stopPoll(_fileno, AppEvent.OUT); 333 _callback(e); 334 return; 335 //_handler(e); 336 case State.ACCEPTING: 337 assert(e == AppEvent.IN, "We can handle only IN event in accepting state"); 338 foreach(_; 0.._accepts_in_a_row) { 339 if ( _fileno == -1 ) { 340 break; // socket can be closed in handler 341 } 342 sockaddr sa; 343 uint sa_len = sa.sizeof; 344 retry: 345 int new_s = (() @trusted => .accept(_fileno, &sa, &sa_len))(); 346 if ( new_s == -1 ) { 347 auto err = errno(); 348 if ( err == EINTR ) 349 { 350 // restart accept 351 goto retry; 352 } 353 if ( err == EWOULDBLOCK || err == EAGAIN ) { 354 // POSIX.1-2001 and POSIX.1-2008 allow 355 // either error to be returned for this case, and do not require 356 // these constants to have the same value, so a portable 357 // application should check for both possibilities. 358 break; 359 } 360 throw new Exception(s_strerror(err)); 361 } 362 debug(hiosocket) tracef("New socket fd: %d", new_s); 363 immutable int flag = 1; 364 auto rc = (() @trusted => .setsockopt(new_s, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof))(); 365 if ( rc != 0 ) { 366 throw new Exception(s_strerror(errno())); 367 } 368 version(OSX) { 369 rc = (() @trusted => .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof))(); 370 if ( rc != 0 ) { 371 throw new Exception(s_strerror(errno())); 372 } 373 } 374 auto flags = (() @trusted => fcntl(new_s, F_GETFL, 0) | O_NONBLOCK)(); 375 (() @trusted => fcntl(new_s, F_SETFL, flags))(); 376 if ( _connect_timer ) { 377 _loop.stopTimer(_connect_timer); 378 _connect_timer = null; 379 } 380 hlSocket ns = new hlSocket(new_s, _af, _sock_type); 381 ns._flags &= ~Flags.EXTERNALLY_MANAGED_FD; 382 ns._connected = true; 383 if ( e & AppEvent.SHUTDOWN) 384 { 385 _state = State.SHUTDOWN; 386 } 387 _accept_callback(ns); 388 debug(hiosocket) tracef("accept_callback for fd: %d - done", new_s); 389 } 390 //_handler(e); 391 return; 392 case State.IO: 393 io_handler(e); 394 return; 395 } 396 } 397 398 public bool open() @trusted { 399 immutable flag = 1; 400 if (_fileno != -1) { 401 throw new SocketException("You can't open already opened socket: fileno(%d)".format(_fileno)); 402 } 403 _fileno = socket(_af, _sock_type, 0); 404 if ( _fileno < 0 ) 405 { 406 _errno = errno(); 407 return false; 408 } 409 debug(hiosocket) tracef("new socket created, %s", this); 410 _polling = AppEvent.NONE; 411 //fd2so[_fileno] = this; 412 auto rc = .setsockopt(_fileno, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof); 413 if ( rc != 0 ) { 414 throw new Exception(to!string(strerror(errno()))); 415 } 416 version(OSX) { 417 rc = .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof); 418 if ( rc != 0 ) { 419 throw new Exception(to!string(strerror(errno()))); 420 } 421 } 422 auto flags = fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK; 423 fcntl(_fileno, F_SETFL, flags); 424 return true; 425 } 426 427 public void close() @safe { 428 //assert(_state == State.IDLE); 429 if ( _fileno != -1 ) { 430 debug(hiosocket) tracef("closing %d, polling: %x", _fileno, _polling); 431 if ( _loop ) 432 { 433 if ( _polling != AppEvent.NONE ) 434 { 435 debug(hiosocket) tracef("detach from polling for %s", appeventToString(_polling)); 436 _loop.stopPoll(_fileno, _polling); 437 _polling = AppEvent.NONE; 438 } 439 _loop.detach(_fileno); 440 } 441 if ( (_flags & Flags.EXTERNALLY_MANAGED_FD) == 0 ) 442 { 443 .close(_fileno); 444 } 445 _fileno = -1; 446 } 447 if ( _connect_timer ) 448 { 449 debug(hiosocket) tracef("also stop connect timer: %s", _connect_timer); 450 _loop.stopTimer(_connect_timer); 451 _connect_timer = null; 452 } 453 if ( _io_timer ) 454 { 455 debug(hiosocket) tracef("also stop io timer: %s", _io_timer); 456 _loop.stopTimer(_io_timer); 457 _io_timer = null; 458 } 459 _iorq = IORequest(); 460 _result = IOResult(); 461 } 462 public void bind(Address addr) @safe 463 { 464 switch (_af) { 465 case AF_INET: 466 { 467 import core.sys.posix.netinet.in_; 468 InternetAddress ia = cast(InternetAddress)addr; 469 static int flag = 1; 470 int rc; 471 () @trusted { 472 debug(hiosocket) tracef("binding fileno %s to %s", _fileno, ia); 473 auto rc = .setsockopt(_fileno, SOL_SOCKET, SO_REUSEADDR, &flag, flag.sizeof); 474 debug(hiosocket) tracef("setsockopt for bind result: %d", rc); 475 if ( rc != 0 ) { 476 throw new Exception(to!string(strerror(errno()))); 477 } 478 sockaddr_in sa; 479 sa.sin_family = AF_INET; 480 sa.sin_port = htons(ia.port); 481 sa.sin_addr.s_addr = htonl(ia.addr); 482 rc = .bind(_fileno, cast(sockaddr*)&sa, cast(uint)sa.sizeof); 483 debug(hiosocket) { 484 tracef("bind result: %d", rc); 485 } 486 if ( rc != 0 ) { 487 throw new SocketException(to!string(strerror(errno()))); 488 } 489 }(); 490 } 491 break; 492 case AF_UNIX: 493 default: 494 throw new SocketException("unsupported address family"); 495 } 496 497 } 498 public void bind(string addr) @trusted { 499 debug(hiosocket) { 500 tracef("binding to %s", addr); 501 } 502 switch (_af) { 503 case AF_INET: 504 { 505 import core.sys.posix.netinet.in_; 506 // addr must be "host:port" 507 auto internet_addr = str2inetaddr(addr); 508 sockaddr_in sin; 509 sin.sin_family = _af; 510 sin.sin_port = internet_addr[1]; 511 sin.sin_addr = in_addr(internet_addr[0]); 512 int flag = 1; 513 auto rc = .setsockopt(_fileno, SOL_SOCKET, SO_REUSEADDR, &flag, flag.sizeof); 514 debug(hiosocket) tracef("setsockopt for bind result: %d", rc); 515 if ( rc != 0 ) { 516 throw new Exception(to!string(strerror(errno()))); 517 } 518 rc = .bind(_fileno, cast(sockaddr*)&sin, cast(uint)sin.sizeof); 519 debug(hiosocket) { 520 tracef("bind result: %d", rc); 521 } 522 if ( rc != 0 ) { 523 throw new SocketException(to!string(strerror(errno()))); 524 } 525 } 526 break; 527 case AF_UNIX: 528 default: 529 throw new SocketException("unsupported address family"); 530 } 531 } 532 533 public void listen(int backlog = 10) @trusted { 534 int rc = .listen(_fileno, backlog); 535 if ( rc != 0 ) { 536 throw new SocketException(to!string(strerror(errno()))); 537 } 538 debug(hiosocket) tracef("listen on %d ok", _fileno); 539 } 540 541 public void stopPolling(L)(L loop) @safe { 542 debug(hiosocket) tracef("Stop polling on %d", _fileno); 543 loop.stopPoll(_fileno, _polling); 544 } 545 546 private auto getSndRcvTimeouts() @safe { 547 548 timeval sndtmo, rcvtmo; 549 socklen_t tmolen = sndtmo.sizeof; 550 auto rc = () @trusted { 551 return .getsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, &tmolen); 552 }(); 553 enforce(rc==0, "Failed to get sndtmo"); 554 debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo); 555 rc = () @trusted { 556 return .getsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, &tmolen); 557 }(); 558 enforce(rc == 0, "Failed to get rcvtmo"); 559 debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo); 560 return Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo")(sndtmo, rcvtmo); 561 } 562 563 private void setSndRcvTimeouts(Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo") timeouts) @safe { 564 565 timeval sndtmo = timeouts.sndtimeo, rcvtmo = timeouts.rcvtimeo; 566 socklen_t tmolen = sndtmo.sizeof; 567 568 auto rc = () @trusted { 569 return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, tmolen); 570 }(); 571 debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo); 572 rc = () @trusted { 573 return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, tmolen); 574 }(); 575 debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo); 576 } 577 578 private void setSndRcvTimeouts(Duration timeout) @safe { 579 timeval ntmo; 580 socklen_t stmo; 581 auto vals = timeout.split!("seconds", "usecs")(); 582 ntmo.tv_sec = cast(typeof(timeval.tv_sec)) vals.seconds; 583 ntmo.tv_usec = cast(typeof(timeval.tv_usec)) vals.usecs; 584 auto rc = () @trusted { 585 return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &ntmo, ntmo.sizeof); 586 }(); 587 debug(hiosocket) tracef("got setsockopt sndtimeo: %d: %s", rc, ntmo); 588 rc = () @trusted { 589 return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &ntmo, ntmo.sizeof); 590 }(); 591 debug(hiosocket) tracef("got setsockopt rcvtimeo: %d: %s", rc, ntmo); 592 } 593 594 /// 595 /// connect synchronously (no loop, no fibers) 596 /// in blocked mode 597 /// 598 public bool connect(string addr, Duration timeout) @safe { 599 switch (_af) { 600 case AF_INET: { 601 import core.sys.posix.netinet.in_; 602 import core.sys.posix.sys.time: timeval; 603 // addr must be "host:port" 604 auto internet_addr = str2inetaddr(addr); 605 606 // save old timeout values and set new 607 auto old_timeouts = getSndRcvTimeouts(); 608 setSndRcvTimeouts(timeout); 609 610 auto old_blocking = blocking(); 611 blocking(true); 612 613 sockaddr_in sin; 614 sin.sin_family = _af; 615 sin.sin_port = internet_addr[1]; 616 sin.sin_addr = in_addr(internet_addr[0]); 617 uint sa_len = sin.sizeof; 618 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))(); 619 auto connerrno = errno(); 620 621 blocking(old_blocking); 622 // restore timeouts 623 setSndRcvTimeouts(old_timeouts); 624 if (rc == -1 ) { 625 debug(hiosocket) tracef("connect errno: %s %s", s_strerror(connerrno), sin); 626 _connected = false; 627 _state = State.IDLE; 628 return false; 629 } 630 _state = State.IDLE; 631 _connected = true; 632 return true; 633 } 634 default: 635 throw new SocketException("unsupported address family"); 636 } 637 } 638 /// 639 /// Return true if connect delayed 640 /// 641 public bool connect(string addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe { 642 assert(timeout > 0.seconds); 643 debug(hiosocket) tracef("enter connect to: %s", addr); 644 switch (_af) { 645 case AF_INET: 646 { 647 import core.sys.posix.netinet.in_; 648 // addr must be "host:port" 649 auto internet_addr = str2inetaddr(addr); 650 sockaddr_in sin; 651 sin.sin_family = _af; 652 sin.sin_port = internet_addr[1]; 653 sin.sin_addr = in_addr(internet_addr[0]); 654 uint sa_len = sin.sizeof; 655 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))(); 656 if ( rc == -1 && errno() != EINPROGRESS ) { 657 debug(hiosocket) tracef("connect so %d to %s errno: %s", _fileno, addr, s_strerror(errno())); 658 _connected = false; 659 _state = State.IDLE; 660 _errno = errno(); 661 f(AppEvent.ERR|AppEvent.IMMED); 662 return false; 663 } 664 if ( rc == 0 ) 665 { 666 debug(hiosocket) tracef("connected %d immediately to %s", _fileno, addr); 667 _connected = true; 668 _state = State.IDLE; 669 f(AppEvent.OUT|AppEvent.IMMED); 670 return true; 671 } 672 debug(hiosocket) tracef("connect %d to %s - wait for event", _fileno, addr); 673 _loop = loop; 674 _state = State.CONNECTING; 675 _callback = f; 676 _polling |= AppEvent.OUT; 677 _connect_timer = new Timer(timeout, &timeoutHandler); 678 _loop.startTimer(_connect_timer); 679 loop.startPoll(_fileno, AppEvent.OUT, this); 680 } 681 break; 682 default: 683 throw new SocketException("unsupported address family"); 684 } 685 return true; 686 } 687 688 public bool connect(Address addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe { 689 assert(timeout > 0.seconds); 690 switch (_af) { 691 case AF_INET: { 692 debug(hiosocket) tracef("connecting to %s", addr); 693 import core.sys.posix.netinet.in_; 694 sockaddr_in *sin = cast(sockaddr_in*)(addr.name); 695 uint sa_len = sockaddr.sizeof; 696 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)sin, sa_len))(); 697 if (rc == -1 && errno() != EINPROGRESS) { 698 debug(hiosocket) tracef("connect to %s errno: %s", addr, s_strerror(errno())); 699 _connected = false; 700 _errno = errno(); 701 _state = State.IDLE; 702 f(AppEvent.ERR | AppEvent.IMMED); 703 return false; 704 } 705 _loop = loop; 706 _state = State.CONNECTING; 707 _callback = f; 708 _polling |= AppEvent.OUT; 709 loop.startPoll(_fileno, AppEvent.OUT, this); 710 } 711 break; 712 default: 713 throw new SocketException("unsupported address family"); 714 } 715 _connect_timer = new Timer(timeout, &timeoutHandler); 716 _loop.startTimer(_connect_timer); 717 return true; 718 } 719 720 override public void accept(hlEvLoop loop, Duration timeout, void delegate(AsyncSocketLike) @safe f) { 721 _loop = loop; 722 _accept_callback = f; 723 _state = State.ACCEPTING; 724 _polling |= AppEvent.IN; 725 _connect_timer = new Timer(timeout, &timeoutHandler); 726 _loop.startTimer(_connect_timer); 727 loop.startPoll(_fileno, AppEvent.IN|AppEvent.EXT_EPOLLEXCLUSIVE, this); 728 } 729 730 private auto w(ref Nbuff b) @trusted 731 { 732 import core.sys.posix.sys.uio: iovec, writev; 733 iovec[8] iov; 734 int n = b.toIoVec(&iov[0], 8); 735 msghdr hdr; 736 hdr.msg_iov = &iov[0]; 737 hdr.msg_iovlen = n; 738 version(linux) 739 { 740 int flags = MSG_NOSIGNAL; 741 } 742 else 743 { 744 int flags; 745 } 746 long r = sendmsg(_fileno, &hdr, flags); 747 // long r = .writev(_fileno, &iov[0], n); 748 return r; 749 } 750 751 void io_handler(AppEvent ev) @safe { 752 debug(hiosocket) tracef("event %s on fd %d", appeventToString(ev), _fileno); 753 if ( ev & AppEvent.SHUTDOWN ) 754 { 755 _state = State.SHUTDOWN; 756 if ( _io_timer ) { 757 _loop.stopTimer(_io_timer); 758 _io_timer = null; 759 } 760 _result.error = true; 761 _iorq.callback(_result); 762 return; 763 } 764 if ( ev == AppEvent.TMO ) { 765 debug(hiosocket) tracef("io timedout, %s, %s", _iorq.output, this); 766 _loop.stopPoll(_fileno, _pollingFor); 767 _polling = AppEvent.NONE; 768 _state = State.IDLE; 769 if ( !_input.isNull() ) 770 { 771 _result.input = NbuffChunk(_input, _input.length); 772 } 773 _io_timer = null; 774 // return timeout flag 775 _result.timedout = true; 776 // make callback 777 _iorq.callback(_result); 778 return; 779 } 780 if ( ev & AppEvent.IN ) 781 { 782 size_t _will_read = min(_buffer_size, _iorq.to_read); 783 debug(hiosocket) tracef("on read: _input.length: %d, _will_read: %d, _input_size: %d", _input_length, _will_read, _input.size); 784 assert(_input_length + _will_read <= _input.size); 785 auto rc = (() @trusted => recv(_fileno, &_input.data[_input_length], _will_read, 0))(); 786 debug(hiosocket) tracef("recv on fd %d returned %d", _fileno, rc); 787 if ( rc < 0 ) 788 { 789 _state = State.IDLE; 790 _result.error = true; 791 _polling &= _pollingFor ^ AppEvent.ALL; 792 _loop.stopPoll(_fileno, _pollingFor); 793 if ( _io_timer ) { 794 _loop.stopTimer(_io_timer); 795 _io_timer = null; 796 } 797 _result.input = NbuffChunk(_input, _input.length); 798 _iorq.callback(_result); 799 return; 800 } 801 if ( rc > 0 ) 802 { 803 // b.length = rc; 804 // debug(hiosocket) tracef("adding data %s", b); 805 // _input ~= b; 806 // b = null; 807 _input_length += rc; 808 _iorq.to_read -= rc; 809 debug(hiosocket) tracef("after adding data have space for %s bytes more, allowPartialInput: %s", _iorq.to_read, _iorq.allowPartialInput); 810 if ( _iorq.to_read == 0 || _iorq.allowPartialInput ) { 811 _loop.stopPoll(_fileno, _pollingFor); 812 _polling = AppEvent.NONE; 813 _state = State.IDLE; 814 if ( _io_timer ) { 815 _loop.stopTimer(_io_timer); 816 _io_timer = null; 817 } 818 _result.input = NbuffChunk(_input, _input_length); 819 _iorq.callback(_result); 820 return; 821 } 822 return; 823 } 824 if ( rc == 0 ) 825 { 826 // socket closed 827 _loop.stopPoll(_fileno, _pollingFor); 828 if ( _io_timer ) { 829 _loop.stopTimer(_io_timer); 830 _io_timer = null; 831 } 832 _result.input = NbuffChunk(_input, _input_length); 833 _polling = AppEvent.NONE; 834 _state = State.IDLE; 835 _iorq.callback(_result); 836 return; 837 } 838 } 839 if ( ev & AppEvent.OUT ) { 840 //debug(hiosocket) tracef("sending %s", _iorq.output); 841 assert(_result.output.length>0); 842 long rc; 843 do 844 { 845 rc = w(_result.output); 846 debug(hiosocket) tracef("sent %d bytes", rc); 847 if ( rc > 0) 848 { 849 _result.output.pop(rc); 850 } 851 } 852 while(rc > 0 && _result.output.length > 0); 853 assert(rc != 0); 854 if ( rc < 0 && errno() == EAGAIN ) 855 { 856 // can't send right now, just retry, wait for next event 857 return; 858 } 859 if ( rc < 0 ) { 860 // error sending 861 _loop.stopPoll(_fileno, _pollingFor); 862 if ( _io_timer ) { 863 _loop.stopTimer(_io_timer); 864 _io_timer = null; 865 } 866 if ( !_input.isNull() ) 867 { 868 _result.input = NbuffChunk(_input, _input_length); 869 } 870 _polling = AppEvent.NONE; 871 _state = State.IDLE; 872 _result.error = true; 873 _iorq.callback(_result); 874 debug(hiosocket) tracef("send completed with error"); 875 return; 876 } 877 if ( _result.output.length == 0 ) { 878 _loop.stopPoll(_fileno, _pollingFor); 879 if ( _io_timer ) { 880 _loop.stopTimer(_io_timer); 881 _io_timer = null; 882 } 883 if ( !_input.isNull() ) 884 { 885 _result.input = NbuffChunk(_input, _input_length); 886 } 887 _polling = AppEvent.NONE; 888 _state = State.IDLE; 889 _iorq.callback(_result); 890 debug(hiosocket) tracef("send completed"); 891 return; 892 } 893 } 894 else 895 { 896 debug(hiosocket) tracef("Unhandled event on %d", _fileno); 897 } 898 899 } 900 /// 901 /// Make blocking IO without evelnt loop. 902 /// Can be called from non-fiber context 903 /// 904 /// return IOResult 905 /// 906 IOResult io(IORequest iorq, in Duration timeout) @safe { 907 IOResult result; 908 909 version (linux) { 910 immutable uint flags = MSG_NOSIGNAL; 911 } else { 912 immutable uint flags = 0; 913 } 914 915 auto old_timeouts = getSndRcvTimeouts(); 916 setSndRcvTimeouts(timeout); 917 918 scope(exit) { 919 setSndRcvTimeouts(old_timeouts); 920 } 921 922 debug(hiosocket) tracef("Blocked io request %s", iorq); 923 924 // handle requested output 925 result.output = iorq.output; 926 while(result.output.length > 0) 927 { 928 auto rc = w(result.output); 929 if ( rc > 0) 930 { 931 result.output.pop(rc); 932 continue; 933 } 934 assert(rc<0); 935 result.error = true; 936 return result; 937 } 938 // handle requested input 939 size_t to_read = iorq.to_read; 940 if ( to_read > 0 ) { 941 //ubyte[] buffer = new ubyte[](to_read); 942 auto buffer = Nbuff.get(to_read); 943 size_t ptr, l; 944 945 while(to_read>0) { 946 auto rc = () @trusted { 947 return .recv(_fileno, cast(void*)&buffer.data[ptr], to_read, 0); 948 }(); 949 //debug(hiosocket) tracef("got %d bytes to: %s", rc, buffer); 950 if ( rc == 0 || (rc > 0 && iorq.allowPartialInput) ) { 951 // client closed connection 952 l += rc; 953 buffer.length = l; 954 result.input = NbuffChunk(buffer, l); 955 debug(hiosocket) tracef("Blocked io returned %s", result); 956 return result; 957 } 958 if ( rc < 0 ) { 959 buffer.length = l; 960 result.error = true; 961 result.input = NbuffChunk(buffer, l); 962 debug(hiosocket) tracef("Blocked io returned %s (%s)", result, s_strerror(errno())); 963 return result; 964 } 965 to_read -= rc; 966 ptr += rc; 967 l += rc; 968 } 969 buffer.length = l; 970 result.input = NbuffChunk(buffer, l); 971 debug(hiosocket) tracef("Blocked io returned %s", result); 972 return result; 973 } 974 debug(hiosocket) tracef("Blocked io returned %s", result); 975 return result; 976 } 977 /// 978 /// Make unblocked IO using loop 979 /// 980 int io(hlEvLoop loop, ref IORequest iorq, in Duration timeout) @safe { 981 982 _loop = loop; 983 984 _iorq = iorq; 985 _result = IOResult(); 986 _state = State.IO; 987 988 AppEvent ev = AppEvent.NONE; 989 if ( iorq.output.length ) { 990 ev |= AppEvent.OUT; 991 _result.output = _iorq.output; 992 } 993 if ( iorq.to_read > 0 ) { 994 ev |= AppEvent.IN; 995 _input = Nbuff.get(iorq.to_read); 996 _input_length = 0; 997 } 998 _pollingFor = ev; 999 assert(_pollingFor != AppEvent.NONE, "No read or write requested"); 1000 1001 if (_io_timer && timeout<=0.seconds) { 1002 debug(hiosocket) tracef("closing prev timer: %s", _io_timer); 1003 _loop.stopTimer(_io_timer); 1004 _io_timer = null; 1005 } 1006 1007 if ( timeout > 0.seconds ) { 1008 if ( _io_timer ) 1009 { 1010 _io_timer.rearm(timeout); 1011 } else 1012 { 1013 _io_timer = new Timer(timeout, &io_handler); 1014 } 1015 _loop.startTimer(_io_timer); 1016 } 1017 _loop.startPoll(_fileno, _pollingFor, this); 1018 return 0; 1019 } 1020 /** 1021 * just send, no callbacks, no timeouts, nothing 1022 * returns what os-level send returns 1023 **/ 1024 long send(immutable(ubyte)[] data) @trusted { 1025 version(linux) 1026 { 1027 int flags = MSG_NOSIGNAL; 1028 } 1029 else 1030 { 1031 int flags; 1032 } 1033 return .send(_fileno, data.ptr, data.length, flags); 1034 } 1035 /************************************************************************** 1036 * Send data from data buffer 1037 * input: data - data to send 1038 * timeout - how long to wait until timedout 1039 * callback - callback which accept IOResult 1040 * 1. try to send as much as possible. If complete data sent, then return 1041 * IOresult with empty output and clean timeout and error fileds. 1042 * 2. If we can't send complete buffer, then prepare io call and return 1043 * nonempty result output. 1044 * So at return user have to check: 1045 * a) if result.error == true - send failed 1046 * b) if result.data.empty - data send completed 1047 * c) otherwise io call were issued, user will receive callback 1048 **************************************************************************/ 1049 IOResult send(hlEvLoop loop, immutable(ubyte)[] data, Duration timeout, void delegate(ref IOResult) @safe callback) @safe { 1050 1051 enforce!SocketException(data.length > 0, "You must have non-empty 'data' when calling 'send'"); 1052 1053 IOResult result; 1054 Nbuff o; 1055 NbuffChunk d = NbuffChunk(data); 1056 o.append(d); 1057 result.output = o; 1058 1059 uint flags = 0; 1060 version(linux) { 1061 flags = MSG_NOSIGNAL; 1062 } 1063 auto rc = (() @trusted => .send(_fileno, &data[0], data.length, flags))(); 1064 debug(hiosocket) tracef("fast .send so %d rc = %d", _fileno, rc); 1065 if ( rc < 0 ) { 1066 auto err = errno(); 1067 if ( err != EWOULDBLOCK && err != EAGAIN ) { 1068 // case a. 1069 result.error = true; 1070 return result; 1071 } 1072 rc = 0; // like we didn't sent anything 1073 } 1074 //data = data[rc..$]; 1075 debug(hiosocket) tracef(".send result %d", rc); 1076 result.output = result.output[rc..$]; 1077 if ( result.output.empty ) { 1078 // case b. send comleted 1079 debug(hiosocket) tracef("fast send to %d completed", _fileno); 1080 return result; 1081 } 1082 // case c. - we have to use event loop 1083 IORequest iorq; 1084 iorq.output = result.output; 1085 iorq.callback = callback; 1086 io(loop, iorq, timeout); 1087 result.output = iorq.output; 1088 return result; 1089 } 1090 } 1091 1092 1093 private auto str2inetaddr(string addr) @safe pure { 1094 auto pos = indexOf(addr, ':'); 1095 if ( pos == -1 ) { 1096 throw new Exception("incorrect addr %s, expect host:port", addr); 1097 } 1098 auto host = addr[0..pos].split('.'); 1099 auto port = addr[pos+1..$]; 1100 // auto s = addr.split(":"); 1101 // if ( s.length != 2 ) { 1102 // throw new Exception("incorrect addr %s, expect host:port", addr); 1103 // } 1104 // host = s[0].split("."); 1105 if ( host.length != 4 ) { 1106 throw new Exception("addr must be in form a.b.c.d:p, got: " ~ addr); 1107 } 1108 uint a = to!ubyte(host[0]) << 24 | to!ubyte(host[1]) << 16 | to!ubyte(host[2]) << 8 | to!ubyte(host[3]); 1109 ushort p = to!ushort(port); 1110 return tuple(core.sys.posix.arpa.inet.htonl(a), core.sys.posix.arpa.inet.htons(p)); 1111 } 1112 1113 @safe unittest { 1114 import core.sys.posix.arpa.inet; 1115 assert(str2inetaddr("0.0.0.0:1") == tuple(0, htons(1))); 1116 assert(str2inetaddr("1.0.0.0:0") == tuple(htonl(0x01000000),0 )); 1117 assert(str2inetaddr("255.255.255.255:0") == tuple(0xffffffff, 0)); 1118 } 1119 1120 @safe unittest { 1121 globalLogLevel = LogLevel.info; 1122 1123 hlSocket s0 = new hlSocket(); 1124 s0.open(); 1125 hlSocket s1 = s0; 1126 s0.close(); 1127 s1.close(); 1128 } 1129 1130 @safe unittest { 1131 globalLogLevel = LogLevel.info; 1132 1133 hlSocket s = new hlSocket(); 1134 s.open(); 1135 s.close(); 1136 } 1137 1138 class HioSocket 1139 { 1140 import core.thread; 1141 1142 struct InputStream { 1143 private { 1144 size_t _buffer_size = 16*1024; 1145 Duration _timeout = 1.seconds; 1146 HioSocket _socket; 1147 bool _started; 1148 bool _done; 1149 NbuffChunk _data; 1150 } 1151 this(HioSocket s, Duration t = 10.seconds) @safe { 1152 _socket = s; 1153 _timeout = t; 1154 _buffer_size = s._socket._buffer_size; 1155 } 1156 bool empty() { 1157 return _started && _done; 1158 } 1159 auto front() { 1160 if ( _done ) { 1161 _data = _data[0..0]; 1162 return _data; 1163 } 1164 if (!_started ) { 1165 _started = true; 1166 auto r = _socket.recv(_buffer_size, _timeout); 1167 if (r.timedout || r.error || r.input.length == 0) { 1168 _done = true; 1169 } else { 1170 _data = r.input; 1171 } 1172 } 1173 debug(hiosocket) tracef("InputStream front: %s", _data); 1174 return _data; 1175 } 1176 void popFront() { 1177 auto r = _socket.recv(_buffer_size, _timeout); 1178 if (r.timedout || r.error || r.input.length == 0) { 1179 _done = true; 1180 } 1181 else { 1182 _data = r.input; 1183 } 1184 } 1185 } 1186 private { 1187 hlSocket _socket; 1188 Fiber _fiber; 1189 } 1190 this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) @safe { 1191 _socket = new hlSocket(af, sock_type, f, l); 1192 if ( _socket.open() == false ) 1193 { 1194 throw new SocketException("Can't open socket: %s".format(s_strerror(_socket._errno))); 1195 } 1196 } 1197 1198 this(int fileno, ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) { 1199 _socket = new hlSocket(fileno, af, sock_type, f, l); 1200 } 1201 1202 this(hlSocket so) 1203 { 1204 _socket = so; 1205 } 1206 1207 ~this() { 1208 // if ( _socket ) { 1209 // _socket.close(); 1210 // } 1211 } 1212 override string toString() { 1213 return _socket.toString(); 1214 } 1215 void bufferSize(int s) 1216 { 1217 _socket._buffer_size = s; 1218 } 1219 auto fileno() 1220 { 1221 return _socket.fileno; 1222 } 1223 1224 void bind(string addr) { 1225 _socket.bind(addr); 1226 } 1227 void handler(AppEvent e) @safe { 1228 debug 1229 { 1230 tracef("HioSocket handler enter %s", e); 1231 } 1232 assert(_fiber !is null); 1233 (()@trusted{_fiber.call();})(); 1234 } 1235 void connect(Address addr, Duration timeout) @trusted { 1236 if ( _socket._state == hlSocket.State.SHUTDOWN ) 1237 { 1238 throw new LoopShutdownException("shutdown before connect"); 1239 } 1240 auto loop = getDefaultLoop(); 1241 _fiber = Fiber.getThis(); 1242 bool connected; 1243 void callback(AppEvent e) { 1244 if ( e & AppEvent.OUT ) 1245 { 1246 connected = true; 1247 } 1248 if (!(e & AppEvent.IMMED)) { 1249 if ( e & AppEvent.TMO ) 1250 { 1251 // timedout; 1252 _socket._errno = ETIMEDOUT; 1253 } 1254 (() @trusted { _fiber.call(); })(); 1255 } 1256 } 1257 1258 if (_socket.connect(addr, loop, &callback, timeout) && !connected) 1259 { 1260 Fiber.yield(); 1261 } 1262 if ( _socket._state == hlSocket.State.SHUTDOWN ) 1263 { 1264 throw new LoopShutdownException("shutdown while connect"); 1265 } 1266 if ( _socket._errno == ECONNREFUSED ) { 1267 throw new ConnectionRefused("Unable to connect socket: connection refused on %s".format(addr)); 1268 } 1269 if ( ! _socket.connected ) 1270 { 1271 throw new SocketException("failed to connect to %s: %s".format(addr, s_strerror(_socket._errno))); 1272 } 1273 } 1274 /// 1275 void connect(string addr, Duration timeout) @trusted { 1276 assert(_socket); 1277 if ( _socket._state == hlSocket.State.SHUTDOWN ) 1278 { 1279 throw new LoopShutdownException("shutdown before connect for %s".format(_socket)); 1280 } 1281 auto loop = getDefaultLoop(); 1282 _fiber = Fiber.getThis(); 1283 if ( _fiber is null ) { 1284 // we are not in context of any task, connect synchronously 1285 // 1. set blocking mode, socket timeout 1286 // 2. call connect, throw if faied 1287 // 3. set unblocking mode 1288 // 4. return 1289 _socket.blocking = true; 1290 _socket.connect(addr, timeout); 1291 _socket.blocking = false; 1292 return; 1293 } 1294 bool connected; 1295 void callback(AppEvent e) { 1296 debug(hiosocket) tracef("connect so %d - got event %s", _socket._fileno, appeventToString(e)); 1297 if ( e & AppEvent.OUT ) 1298 { 1299 connected = true; 1300 } 1301 if ( !(e & AppEvent.IMMED) ) { 1302 // we called yield 1303 if ( e & AppEvent.TMO ) 1304 { 1305 // timedout; 1306 _socket._errno = ETIMEDOUT; 1307 } 1308 (() @trusted { _fiber.call(); })(); 1309 } 1310 } 1311 if ( _socket.connect(addr, loop, &callback, timeout) && !connected) 1312 { 1313 debug(hiosocket) tracef("connect so %d - wait for event", _socket._fileno); 1314 Fiber.yield(); 1315 } 1316 assert(_socket); 1317 if ( _socket._state == hlSocket.State.SHUTDOWN ) 1318 { 1319 throw new LoopShutdownException("shutdown while connect for %s".format(_socket)); 1320 } 1321 if ( _socket._errno == ECONNREFUSED ) { 1322 throw new ConnectionRefused("Unable to connect socket: connection refused on " ~ addr); 1323 } 1324 } 1325 /// 1326 bool connected() const @safe { 1327 return _socket.connected; 1328 } 1329 auto strerror() 1330 { 1331 return s_strerror(errno()); 1332 } 1333 /// 1334 auto errno() @safe { 1335 return _socket.socket_errno(); 1336 } 1337 /// 1338 auto listen(int backlog = 10) @safe { 1339 return _socket.listen(backlog); 1340 } 1341 /// 1342 void close() @safe { 1343 if ( _socket ) { 1344 assert(_socket._state == hlSocket.State.IDLE 1345 || _socket._state == hlSocket.State.SHUTDOWN 1346 || _socket._state == hlSocket.State.NEW, 1347 "You can call close() on socket only when it is in IDLE state, not in %s(%s)".format(_socket._state, _socket)); 1348 _socket.close(); 1349 _socket = null; 1350 } 1351 } 1352 /// 1353 private HioSocket _accept_socket; 1354 void accept_callback(AsyncSocketLike so) scope @trusted { 1355 debug(hiosocket) tracef("Got %s on accept", so); 1356 if ( so is null ) { 1357 _accept_socket = null; 1358 _fiber.call(); 1359 return; 1360 } 1361 debug(hiosocket) tracef("got accept callback for socket %d", fileno); 1362 if ( _socket._polling & AppEvent.IN ) { 1363 getDefaultLoop.stopPoll(_socket.fileno, AppEvent.IN); 1364 _socket._polling &= ~AppEvent.IN; 1365 } 1366 _socket._state = hlSocket.State.IDLE; 1367 _accept_socket = new HioSocket(cast(hlSocket)so); 1368 _fiber.call(); 1369 } 1370 auto accept(Duration timeout = Duration.max) { 1371 HioSocket s; 1372 1373 auto loop = getDefaultLoop(); 1374 _fiber = Fiber.getThis(); 1375 1376 _socket._accepts_in_a_row = 10; 1377 _socket.accept(loop, timeout, &accept_callback); 1378 Fiber.yield(); 1379 if ( _socket._state == hlSocket.State.SHUTDOWN ) 1380 { 1381 throw new LoopShutdownException("shutdown while accepting"); 1382 } 1383 return _accept_socket; 1384 } 1385 /// 1386 IOResult recv(size_t n, Duration timeout = 10.seconds, Flag!"allowPartialInput" allowPartialInput = Yes.allowPartialInput) @trusted { 1387 assert(_socket); 1388 IORequest ioreq; 1389 IOResult iores; 1390 import core.stdc.stdio; 1391 1392 ioreq.allowPartialInput = cast(bool)allowPartialInput; 1393 1394 _fiber = Fiber.getThis(); 1395 if ( _fiber is null) { 1396 // read not in context of any fiber. Blocked read. 1397 _socket.blocking = true; 1398 ioreq.to_read = n; 1399 iores = _socket.io(ioreq, timeout); 1400 _socket.blocking = false; 1401 return iores; 1402 } 1403 void callback(ref IOResult ior) @trusted { 1404 //debug(hiosocket) tracef("got ior on recv: %s", ior); 1405 iores = ior; 1406 _fiber.call(); 1407 } 1408 1409 ioreq.to_read = n; 1410 ioreq.callback = &callback; 1411 _socket.io(getDefaultLoop(), ioreq, timeout); 1412 //debug(hiosocket) infof("recv yielding on %s", _socket); 1413 Fiber.yield(); 1414 return iores; 1415 } 1416 /// 1417 size_t send(ref Nbuff data, Duration timeout = 1.seconds) @trusted { 1418 _fiber = Fiber.getThis(); 1419 IOResult ioresult; 1420 IORequest iorq; 1421 1422 if ( _fiber is null ) { 1423 IORequest ioreq; 1424 _socket.blocking = true; 1425 ioreq.output = data; 1426 ioresult = _socket.io(ioreq, timeout); 1427 _socket.blocking = false; 1428 if ( ioresult.error ) { 1429 return -1; 1430 } 1431 return 0; 1432 } 1433 1434 void callback(ref IOResult ior) @trusted { 1435 ioresult = ior; 1436 _fiber.call(); 1437 } 1438 iorq.callback = &callback; 1439 iorq.output = data; 1440 1441 // ioresult = _socket.send(getDefaultLoop(), data.data.data, timeout, &callback); 1442 // if ( ioresult.error ) { 1443 // return -1; 1444 // } 1445 // if ( ioresult.output.empty ) { 1446 // return data.length; 1447 // } 1448 _socket.io(getDefaultLoop(), iorq, timeout); 1449 Fiber.yield(); 1450 if (ioresult.error) { 1451 return -1; 1452 } 1453 return data.length - ioresult.output.length; 1454 } 1455 /// 1456 size_t send(immutable (ubyte)[] data, Duration timeout = 1.seconds) @trusted { 1457 assert(_socket); 1458 _fiber = Fiber.getThis(); 1459 IOResult ioresult; 1460 debug(hiosocket) tracef("enter send to so %d", _socket._fileno); 1461 1462 if ( _fiber is null ) { 1463 IORequest ioreq; 1464 _socket.blocking = true; 1465 ioreq.output = Nbuff(data); 1466 ioresult = _socket.io(ioreq, timeout); 1467 _socket.blocking = false; 1468 if ( ioresult.error ) { 1469 return -1; 1470 } 1471 return 0; 1472 } 1473 1474 void callback(ref IOResult ior) @trusted { 1475 ioresult = ior; 1476 _fiber.call(); 1477 } 1478 ioresult = _socket.send(getDefaultLoop(), data, timeout, &callback); 1479 if ( ioresult.error ) { 1480 return -1; 1481 } 1482 if ( ioresult.output.empty ) { 1483 return data.length; 1484 } 1485 Fiber.yield(); 1486 assert(_socket); 1487 if (ioresult.error) { 1488 return -1; 1489 } 1490 return data.length - ioresult.output.length; 1491 } 1492 /// 1493 InputStream inputStream(Duration t=10.seconds) @safe { 1494 return InputStream(this, t); 1495 } 1496 } 1497 struct LineReader 1498 { 1499 private 1500 { 1501 enum NL = "\n".representation; 1502 HioSocket _socket; 1503 Nbuff _buff; 1504 size_t _last_position = 0; 1505 bool _done; 1506 ushort _buffer_size; 1507 } 1508 /// constructor 1509 this(HioSocket s, ushort bs = 16*1024) 1510 { 1511 _socket = s; 1512 _buffer_size = bs; 1513 } 1514 /// if input stream closed or errored 1515 bool done() 1516 { 1517 return _done; 1518 } 1519 /// what we have in the buffer after fetching last line 1520 auto rest() 1521 { 1522 return _buff; 1523 } 1524 /// read next line 1525 Nbuff readLine(Duration timeout = 10.seconds) 1526 { 1527 while(!_done) 1528 { 1529 //debug(hiosocket) tracef("count until NL starting from %d", _last_position); 1530 long p = _buff.countUntil(NL, _last_position); 1531 if (p>=0) 1532 { 1533 Nbuff line = _buff[0 .. p]; 1534 _last_position = 0; 1535 _buff.pop(p+1); 1536 debug(hiosocket) tracef("got line %s", line.data); 1537 return line; 1538 } 1539 _last_position = _buff.length; 1540 auto r = _socket.recv(_buffer_size, timeout); 1541 if (r.timedout || r.error || r.input.length == 0) { 1542 debug(hiosocket) tracef("got terminal result %s", r); 1543 _done = true; 1544 return _buff; 1545 } else { 1546 debug(hiosocket) tracef("append %d bytes", r.input.length); 1547 _buff.append(r.input, 0, r.input.length); 1548 } 1549 } 1550 return Nbuff(); 1551 } 1552 } 1553 1554 unittest { 1555 import core.thread; 1556 import hio.scheduler; 1557 void server(ushort port) { 1558 auto s = new HioSocket(); 1559 scope(exit) 1560 { 1561 s.close(); 1562 } 1563 s.bind("127.0.0.1:%s".format(port)); 1564 s.listen(); 1565 auto c = s.accept(2.seconds); 1566 if ( c is null ) { 1567 s.close(); 1568 throw new Exception("Accept failed"); 1569 } 1570 auto io = c.recv(64, 1.seconds); 1571 assert(io.input == "hello".representation); 1572 c.send("world".representation); 1573 c.close(); 1574 s.close(); 1575 } 1576 void client(ushort port) { 1577 auto s = new HioSocket(); 1578 scope(exit) { 1579 s.close(); 1580 } 1581 s.connect("127.0.0.1:%d".format(port), 1.seconds); 1582 if ( s.connected ) { 1583 auto rq = s.send("hello".representation, 1.seconds); 1584 auto rs = s.recv(64, 1.seconds); 1585 assert(rs.input == "world".representation); 1586 } else { 1587 throw new Exception("Can't connect to server"); 1588 } 1589 } 1590 void hlClient(ushort port) 1591 { 1592 auto s = new hlSocket(); 1593 s.open(); 1594 scope(exit) 1595 { 1596 s.close(); 1597 } 1598 auto ok = s.connect("127.0.0.1:%d".format(port), 1.seconds); 1599 IORequest iorq; 1600 iorq.output = Nbuff("hello"); 1601 iorq.to_read = 8; 1602 s.blocking = true; 1603 IOResult iors = s.io(iorq, 1.seconds); 1604 assert(iors.input == "world".representation); 1605 infof("hlClient done"); 1606 } 1607 globalLogLevel = LogLevel.info; 1608 info("Test hlSocket"); 1609 auto t = new Thread({ 1610 scope(exit) 1611 { 1612 uninitializeLoops(); 1613 } 1614 try{ 1615 App(&server, cast(ushort)12345); 1616 } catch (Exception e) { 1617 infof("Got %s in server", e); 1618 } 1619 }).start; 1620 Thread.sleep(500.msecs); 1621 hlClient(12345); 1622 t.join; 1623 globalLogLevel = LogLevel.info; 1624 1625 info("Test HioSockets 0"); 1626 1627 // all ok case 1628 t = new Thread({ 1629 scope(exit) 1630 { 1631 uninitializeLoops(); 1632 } 1633 try{ 1634 App(&server, cast(ushort)12345); 1635 } catch (Exception e) { 1636 infof("Got %s in server", e); 1637 } 1638 }).start; 1639 Thread.sleep(500.msecs); 1640 client(12345); 1641 t.join; 1642 1643 info("Test HioSockets 1"); 1644 // all fail case - everything should throw 1645 t = new Thread({ 1646 scope(exit) 1647 { 1648 uninitializeLoops(); 1649 } 1650 App(&server, cast(ushort) 12345); 1651 }).start; 1652 assertThrown!Exception(client(12346)); 1653 assertThrown!Exception(t.join); 1654 1655 info("Test HioSockets 2"); 1656 // the same but client in App 1657 // all ok case 1658 globalLogLevel = LogLevel.info; 1659 t = new Thread({ 1660 scope(exit) 1661 { 1662 uninitializeLoops(); 1663 } 1664 App(&server, cast(ushort) 12345); 1665 }).start; 1666 Thread.sleep(500.msecs); 1667 App(&client, cast(ushort)12345); 1668 t.join; 1669 1670 info("Test HioSockets 3"); 1671 // all fail case - everything should throw 1672 t = new Thread({ 1673 scope(exit) 1674 { 1675 uninitializeLoops(); 1676 } 1677 App(&server, cast(ushort) 12345); 1678 }).start; 1679 assertThrown!Exception(App(&client, cast(ushort)12346)); 1680 assertThrown!Exception(t.join); 1681 1682 info("Test lineReader"); 1683 globalLogLevel = LogLevel.info; 1684 t = new Thread({ 1685 scope(exit) 1686 { 1687 uninitializeLoops(); 1688 } 1689 App({ 1690 auto s = new HioSocket(); 1691 s.bind("127.0.0.1:12345"); 1692 s.listen(); 1693 auto c = s.accept(2.seconds); 1694 if ( c is null ) { 1695 s.close(); 1696 return; 1697 } 1698 foreach(l; ["\n", "a\n", "bb\n"]) 1699 { 1700 c.send(l.representation, 1.seconds); 1701 } 1702 hlSleep(100.msecs); 1703 c.send("ccc\nrest".representation, 100.msecs); 1704 c.recv(64, 1.seconds); 1705 c.close(); 1706 s.close(); 1707 }); 1708 }).start; 1709 Thread.sleep(100.msecs); 1710 App({ 1711 auto s = new HioSocket(); 1712 s.connect("127.0.0.1:12345", 10.seconds); 1713 auto reader = LineReader(s); 1714 auto e = reader.readLine(); 1715 auto a = reader.readLine(); 1716 auto b = reader.readLine(); 1717 auto c = reader.readLine(); 1718 auto rest = reader.rest(); 1719 s.send("die".representation, 1.seconds); 1720 assert(e.length == 0); 1721 assert(a.data == "a".representation); 1722 assert(b.data == "bb".representation); 1723 assert(c.data == "ccc".representation); 1724 assert(rest.data == "rest".representation); 1725 s.close(); 1726 }); 1727 t.join; 1728 }