1 module hio.http.client;
2 
3 import std.experimental.logger;
4 import std.stdio;
5 import std.socket;
6 import std.string;
7 import std.array;
8 import std.datetime;
9 import std.algorithm;
10 import std.exception;
11 import std.typecons: Tuple;
12 import std.uni: toLower;
13 import std.zlib: UnCompress, HeaderFormat;
14 
15 import hio.socket;
16 import hio.resolver;
17 import hio.loop;
18 import hio.zlib;
19 import hio.tls;
20 
21 public import hio.http.common;
22 import hio.http.http_parser;
23 
24 import ikod.containers.hashmap: HashMap, hash_function;
25 import ikod.containers.unrolledlist;
26 
27 import nbuff: Nbuff, NbuffChunk, SmartPtr;
28 
29 AsyncSocketLike socketFabric(URL url) @safe
30 {
31     if ( url.schemaCode == SchCode.HTTP )
32     {
33         auto s = new hlSocket();
34         s.open();
35         return s;
36     }
37     if ( url.schemaCode == SchCode.HTTPS )
38     {
39         debug(hiohttp) trace("use ssl socket");
40         auto s = new AsyncSSLSocket();
41         s.open();
42         s.set_host(url.host);
43         return s;
44     }
45     assert(0);
46 }
47 
48 package struct ConnectionTriple
49 {
50     string  schema;
51     string  host;
52     ushort  port;
53     bool opEquals(R)(const R other) const
54     {
55         return schema == other.schema && port == other.port && host == other.host;
56     }
57     hash_t toHash() const @safe pure nothrow
58     {
59         return hash_function(schema) + hash_function(host) + hash_function(port);
60     }
61 }
62 
63 // handle all logics like: proxy, redirects, connection pool,...
64 class AsyncHTTPClient
65 {
66     enum State
67     {
68         INIT,
69         RESOLVING,
70         CONNECTING,
71         HANDLING,
72         ERROR
73     }
74     private
75     {
76         State                                       _state = State.INIT;
77         HashMap!(ConnectionTriple, AsyncSocketLike) _conn_pool;
78         void delegate(AsyncHTTPResult) @safe        _callback;
79         AsyncSocketLike function(URL) @safe         _conn_factory = &socketFabric;
80         AsyncHTTP                                   _handler;
81         Duration                                    _connect_timeout = 30.seconds;
82         Duration                                    _send_timeout = 30.seconds;
83         Duration                                    _receive_timeout = 30.seconds;
84         int                                         _buffer_size = 16*1024;
85         int                                         _max_redirects = 10;
86         bool                                        _keep_alive = true;
87         int                                         _verbosity = 0;
88         //
89         hlEvLoop                                    _loop;
90         // lifetime - request
91         Request                                     _request;       // request headers, body, etc
92         URL                                         _original_url;  // save at begin
93         URL                                         _current_url;   // current url - follow redirects
94         AsyncSocketLike                             _connection;
95         InternetAddress[]                           _endpoints;
96         int                                         _redirects_counter;
97     }
98     ~this()
99     {
100         close();
101     }
102 
103     void close() @safe
104     {
105         if ( _connection )
106         {
107             _connection.close();
108             _connection = null;
109         }
110         // close everyting in pool
111         foreach (AsyncSocketLike s; _conn_pool.byValue())
112         {
113             s.close();
114         }
115         _conn_pool.clear();
116     }
117 
118     private bool use_proxy() @safe
119     {
120         return false;
121     }
122 
123     bool redirectAllowed() pure @safe @nogc
124     {
125         if ( _redirects_counter < _max_redirects )
126         {
127             return true;
128         }
129         return false;
130     }
131 
132     private void prepareRedirect(string location) @safe
133     {
134         _redirects_counter++;
135         // return conn to pool
136         _conn_pool[ConnectionTriple(_current_url.schema, _current_url.host, _current_url.port)] = _connection;
137 
138         debug(hiohttp) tracef("new location: %s", location);
139         _current_url = parse_url(location);
140         debug(hiohttp) tracef("new url: %s", _current_url);
141     }
142     private void _http_handler_callback(AsyncHTTPResult r) @safe
143     {
144         if ( r.status_code == 302)
145         {
146             if ( !redirectAllowed() )
147             {
148                 debug(hiohttp) tracef("Can't follow redirect");
149                 r.error = AsyncHTTPErrors.MaxRedirectsReached;
150                 r.status_code = -1;
151                 _callback(r);
152                 return;
153             }
154             string location = r.getHeader("location").toString;
155             prepareRedirect(location);
156             _execute();
157             return;
158         }
159         // reset state
160         _state = State.INIT;
161         // return connection to pool
162         _conn_pool.put(ConnectionTriple(_current_url.schema, _current_url.host, _current_url.port), _connection);
163         _connection = null;
164         _callback(r);
165     }
166     private void _http_handler_call(AsyncSocketLike c) @safe
167     {
168         assert(_handler._state == AsyncHTTP.State.INIT);
169         _handler._connection = _connection;
170         _handler._callback = &_http_handler_callback;
171         _handler.execute();
172     }
173 
174     private void _connect_callback(AppEvent e) @safe
175     {
176         if ( _connection is null )
177         {
178             debug(hiohttp) tracef("connect_callback on closed connection, you probably closed it already?");
179             return;
180         }
181         debug(hiohttp) tracef("connect_callback %s", e);
182         if ( _connection.connected )
183         {
184             _http_handler_call(_connection);
185         }
186         else
187         {
188             // retry with next address if possible
189             if ( _endpoints.length > 0)
190             {
191                 _connection.close();
192                 _connection = _conn_factory(_current_url);
193                 _connect_call();
194             }
195             else
196             {
197                 // no more endpoints to connect
198                 AsyncHTTPResult r;
199                 r.error = AsyncHTTPErrors.ConnFailed;
200                 _state = State.INIT;
201                 _callback(r);
202             }
203         }
204     }
205     private void _connect_call() @safe
206     {
207         // connect to first endpoint
208         auto endpoint = _endpoints[0];
209         _endpoints = _endpoints[1..$];
210         _connection.connect(endpoint, _loop, &_connect_callback, _connect_timeout);
211     }
212 
213     private void _resolver_callback(ResolverResult4 r) @safe
214     {
215         auto status = r.status;
216         debug(hiohttp) tracef("resolve calback");
217         if ( status != ARES_SUCCESS)
218         {
219             AsyncHTTPResult result;
220             result.error = AsyncHTTPErrors.ResolveFailed;
221             _state = State.INIT;
222             _callback(result);
223             return;
224         }
225         _state = State.CONNECTING;
226         _endpoints = r.addresses.map!(a => cast(InternetAddress)a).array;
227         _connect_call();
228     }
229     private void _resolver_call(URL url) @safe
230     {
231         debug(hiohttp) tracef("resolve %s", url);
232         auto r = hio_gethostbyname(url.host, &_resolver_callback, url.port);
233         if ( !r.isEmpty )
234         {
235             _resolver_callback(r);
236         }
237     }
238 
239     /// build request line.
240     /// Take proxy into account. 
241     private NbuffChunk _build_request_line() @trusted
242     {
243         if (!use_proxy)
244         {
245             ulong ml = _request.method.length;
246             auto pqs = _current_url.path_off >= 0 ? _current_url.url[_current_url.path_off..$] : "/";
247             ulong pl = pqs.length;
248             auto q = Nbuff.get(ml + 1 + pl + 10);
249             copy(_request.method, q.data[0 .. ml]);
250             copy(" ".representation, q.data[ml .. ml+1]);
251             copy(pqs.representation, q.data[ml+1..ml+1+pl]);
252             copy(" HTTP/1.1\n".representation, q.data[ml+1+pl..ml + 1 + pl + 10]);
253             debug(hiohttp) tracef("rq_line");
254             return NbuffChunk(q, ml + 1 + pl + 10);
255         }
256         else
257         {
258             assert(0);
259         }
260     }
261     /// build all headers based on reuest url, headers, etc.
262     private Nbuff _build_request_header() @safe
263     {
264         Nbuff message_header;
265         auto request_line = _build_request_line();
266         message_header.append(request_line);
267         if (_verbosity >= 1) writef("-> %s", request_line.toString);
268 
269         if ( !_request.user_headers_flags.AcceptEncoding )
270         {
271             message_header.append("Accept-Encoding: gzip,deflate\n");
272             if (_verbosity >= 1) writefln("-> %s: %s", "Accept-Encoding", "gzip,deflate");
273         }
274         if ( !_request.user_headers_flags.Host )
275         {
276             message_header.append("Host: ");
277             if ( _current_url.port == standard_port(_current_url.schema) )
278             {
279                 message_header.append(_current_url.host);
280                 if (_verbosity >= 1) writefln("-> Host: %s", _current_url.host);
281             }
282             else
283             {
284                 message_header.append(_current_url.host ~ ":%d".format(_current_url.port));
285                 if (_verbosity >= 1) writefln("-> Host: %s", _current_url.host ~ ":%d".format(_current_url.port));
286             }
287             message_header.append("\n");
288         }
289         if (!_request.user_headers_flags.UserAgent)
290         {
291             message_header.append("UserAgent: hio\n");
292             if (_verbosity >= 1) writefln("-> %s: %s", "UserAgent", "hio");
293         }
294         if (!_request.user_headers_flags.Connection && !_keep_alive )
295         {
296             message_header.append("Connection: Close\n");
297             if (_verbosity >= 1) writefln("-> Connection: Close");
298         }
299         foreach (ref h; _request.user_headers)
300         {
301             message_header.append(h.FieldName);
302             message_header.append(": ");
303             message_header.append(h.FieldValue);
304             message_header.append("\n");
305             if (_verbosity >= 1)
306             {
307                 writefln("-> %s: %s", h.FieldName, h.FieldValue);
308             }
309         }
310         message_header.append("\n");
311         return message_header;
312     }
313 
314     private void _execute() @safe
315     {
316         import std.stdio;
317         debug(hiohttp) writefln("pool: %s", _conn_pool);
318         debug(hiohttp) writefln("url: %s", _current_url);
319         auto f = _conn_pool.fetch(ConnectionTriple(_current_url.schema, _current_url.host, _current_url.port));
320         debug(hiohttp) writefln("f: %s", f);
321         if (f.ok)
322         {
323             debug(hiohttp) tracef("Use connection from pool");
324             _connection = f.value;
325         }
326         else
327         {
328             debug(hiohttp) tracef("Create new connection");
329             _state = State.CONNECTING;
330             _connection = _conn_factory(_current_url);
331         }
332 
333         assert(_connection !is null);
334 
335         if ( _connection.connected )
336         {
337             debug(hiohttp) tracef("Connected, start http");
338             _state = State.HANDLING;
339             _http_handler_call(_connection);
340             return;
341         }
342         else
343         {
344             // resolve, connect and then call handler
345             // reset resolving
346             _endpoints.length = 0;
347             _state = State.HANDLING;
348             _resolver_call(_current_url);
349             return;
350         }
351         assert(0);
352     }
353     public void verbosity(int v) @safe @property @nogc nothrow
354     {
355         _verbosity = v;
356     }
357     public void connect_timeout(Duration v) @safe @property @nogc nothrow
358     {
359         _connect_timeout = v;
360     }
361     public void addHeader(Header h) @safe
362     {
363         _request.addHeader(h);
364     }
365 
366     void execute(Method method, URL url, void delegate(AsyncHTTPResult) @safe callback) @safe
367     {
368         enforce(_state == State.INIT, "You can't reenter this");
369         enforce(_connection is null, "You can't reenter this");
370         _loop = getDefaultLoop();
371         _original_url = url;
372         _current_url = url;
373         _request.method = method;
374         _redirects_counter = 0;
375 
376         _callback = callback;
377         _handler._client = this;
378         _handler._verbosity = _verbosity;
379         _execute();
380     }
381 }
382 
383 unittest
384 {
385     import std.experimental.logger;
386     globalLogLevel = LogLevel.info;
387     import std.stdio;
388     AsyncHTTPClient c = new AsyncHTTPClient();
389     c._request.method = Method("GET");
390     c._current_url = parse_url("http://example.com:8080/path");
391     auto q = c._build_request_line();
392     
393     assert(q == "GET /path HTTP/1.1\n".representation);
394 
395     c.addHeader(Header("X-header", "x-value"));
396     c.addHeader(Header("Content-Length", "100"));
397 
398     Nbuff header = c._build_request_header();
399     //writeln(cast(string)header.data.data);
400 }
401 extern(C)
402 {
403     private int on_header_field(http_parser* parser, const char* at, size_t length)
404     {
405         AsyncHTTP* c = cast(AsyncHTTP*)parser.data;
406         assert(c._state == AsyncHTTP.State.HEADERS_RECEIVE);
407         assert(c._current_buffer_length > 0);
408         long position = cast(immutable(ubyte)*)at - c._current_buffer_ptr;
409         debug(hiohttp) tracef("on header field in state %s", c._expected_part);
410         debug(hiohttp) tracef("on header field: %d", c._offset + position);
411         if (length>0)
412             debug(hiohttp) tracef("on header field: %s",
413                 cast(string)c._response_headers_buffer[c._offset + position..c._offset + position+length].data.data);
414         final switch(c._expected_part)
415         {
416             case AsyncHTTP.HeaderPart.Value:
417                 // finalize value and start next Header
418                 debug(hiohttp) tracef("got <%s: %s>",
419                     cast(string)c._response_headers_buffer.data.data[c._field_beg..c._field_end],
420                     cast(string)c._response_headers_buffer.data.data[c._value_beg..c._value_end]);
421                 c.processHeader();
422                 goto case;
423             case AsyncHTTP.HeaderPart.None:
424                 c._expected_part = AsyncHTTP.HeaderPart.Field;
425                 c._field_beg = c._offset + position;
426                 c._field_end = c._field_beg + length;
427                 break;
428             case AsyncHTTP.HeaderPart.Field:
429                 // continue receiving current header
430                 c._field_end += length;
431                 break;
432         }
433         return 0;
434     }
435     private int on_header_value(http_parser* parser, const char* at, size_t length)
436     {
437         AsyncHTTP* c = cast(AsyncHTTP*)parser.data;
438         assert(c._state == AsyncHTTP.State.HEADERS_RECEIVE);
439         debug(hiohttp) tracef("on header value in state %s", c._expected_part);
440         long position = cast(immutable(ubyte)*)at - c._current_buffer_ptr;
441         if (length>0)
442             debug(hiohttp) tracef("on header value: %s",
443                 cast(string)c._response_headers_buffer[c._offset + position..c._offset + position+length].data.data);
444         // async_http_connection c = cast(async_http_connection)parser.data;
445         // c._state = State.IDLE;
446         final switch(c._expected_part)
447         {
448             case AsyncHTTP.HeaderPart.None:
449                 assert(0);
450             case AsyncHTTP.HeaderPart.Value:
451                 c._value_end += length;
452                 break;
453             case AsyncHTTP.HeaderPart.Field:
454                 // finalize header and start next value
455                 debug(hiohttp) tracef("got Field: %s", cast(string)c._response_headers_buffer.data.data[c._field_beg..c._field_end]);
456                 c._expected_part = AsyncHTTP.HeaderPart.Value;
457                 c._value_beg = c._offset + position;
458                 c._value_end = c._value_beg + length;
459                 break;
460         }
461         return 0;
462     }
463     private int on_headers_complete(http_parser* parser)
464     {
465         debug(hiohttp) tracef("headers complete");
466         AsyncHTTP* c = cast(AsyncHTTP*)parser.data;
467         debug(hiohttp) tracef("got <%s: %s>",
468             cast(string)c._response_headers_buffer.data.data[c._field_beg..c._field_end],
469             cast(string)c._response_headers_buffer.data.data[c._value_beg..c._value_end]);
470         c.processHeader();
471         c._state = c.onHeadersComplete();
472         debug(hiohttp) tracef("next state: %s", c._state);
473         return 0;
474     }
475     private int on_body(http_parser* parser, const char* at, size_t length)
476     {
477         debug(hiohttp) tracef("on body");
478         AsyncHTTP* c = cast(AsyncHTTP*)parser.data;
479         long position = cast(immutable(ubyte)*)at - c._current_buffer_ptr;
480         assert(c._state == AsyncHTTP.State.BODY_RECEIVE, "Expected state BODY_RECEIVE, got %s".format(c._state));
481         c.onBody(position, length);
482         return 0;
483     }
484     private int on_message_complete(http_parser* parser)
485     {
486         debug(hiohttp) tracef("message complete");
487         // async_http_connection c = cast(async_http_connection)parser.data;
488         // c._state = State.IDLE;
489         AsyncHTTP* c = cast(AsyncHTTP*)parser.data;
490         c.onBodyComplete();
491         return 0;
492     }
493 }
494 
495 // handle strait forward http request-response
496 struct AsyncHTTP
497 {
498     private
499     {
500         enum State
501         {
502             INIT,
503             SENDING_REQUEST,
504             HEADERS_RECEIVE,
505             HEADERS_COMPLETED,
506             BODY_RECEIVE,
507             DONE,
508             ERROR
509         }
510         enum HeaderPart
511         {
512             None = 0,
513             Field,
514             Value
515         }
516         enum ContentEncoding
517         {
518             NONE,
519             GZIP,
520             DEFLATE
521         }
522         State                   _state;
523         AsyncSocketLike         _connection;
524         void delegate(AsyncHTTPResult) @safe
525                                 _callback;
526         http_parser             _parser;
527         http_parser_settings    _parser_settings = {
528                 on_header_field:     &on_header_field,
529                 on_header_value:     &on_header_value,
530                 on_headers_complete: &on_headers_complete,
531                 on_body:             &on_body,
532                 on_message_complete: &on_message_complete,
533         };
534         int                     _verbosity;
535         AsyncHTTPClient         _client;
536         AsyncHTTPResult         _result;
537         HeaderPart              _expected_part;
538         size_t                  _offset;                    // offset of each socket input
539         size_t                  _field_beg, _field_end;     // beg and end of headers field and value
540         size_t                  _value_beg, _value_end;     // beg and end of headers field and value
541         NbuffChunk              _current_input;
542         immutable(ubyte)*       _current_buffer_ptr;
543         size_t                  _current_buffer_length;
544         Nbuff                   _response_body;
545         MessageHeaders          _response_headers;
546         Nbuff                   _response_headers_buffer;
547         ContentEncoding         _content_encoding;
548 
549         ZLib                    _zlib;
550     }
551     ~this()
552     {
553         // we can destroy it only when it is idle
554         assert(_state == State.INIT, "You can't destroy in %s".format(_state));
555     }
556     private void reset() @safe
557     {
558         _response_body.clear;
559         _response_headers.clear;
560         _state = State.INIT;
561         if (_content_encoding == ContentEncoding.GZIP || _content_encoding==ContentEncoding.DEFLATE)
562         {
563             _zlib.zFlush();
564         }
565         _content_encoding = ContentEncoding.NONE;
566     }
567 
568     private void processHeader()
569     {
570         import std.stdio;
571         // header field: _response_headers_buffer[_field_beg.._field_end]
572         // header value: _response_headers_buffer[_value_beg.._value_end]
573         NbuffChunk field = _response_headers_buffer.data(_field_beg, _field_end);
574         NbuffChunk value = _response_headers_buffer.data(_value_beg, _value_end);
575         if ( _verbosity >= 1 ) {
576             writefln("<- %s: %s", cast(string)field.data, cast(string)value.data);
577         }
578         if (field.toString.toLower == "content-encoding" )
579         {
580             string v = value.toString.toLower;
581             switch (v)
582             {
583                 case "gzip":
584                     debug(hiohttp) tracef("decode from gzip");
585                     _content_encoding = ContentEncoding.GZIP;
586                     _zlib.zInit(_client._buffer_size);
587                     break;
588                 case "deflate":
589                     debug(hiohttp) tracef("decode from deflate");
590                     _content_encoding = ContentEncoding.DEFLATE;
591                     _zlib.zInit(_client._buffer_size);
592                     break;
593                 default:
594                     break;
595             }
596 
597         }
598         else
599         {
600             _response_headers.pushBack(MessageHeader(field, value));
601         }
602     }
603 
604     private void _receiving_response(ref IOResult res) @safe
605     {
606         assert(_state == State.HEADERS_RECEIVE || _state == State.BODY_RECEIVE);
607 
608         if (res.timedout)
609         {
610             onTimeout();
611             return;
612         }
613         if ( res.error )
614         {
615             onError();
616             return;
617         }
618 
619         _current_input = res.input;
620 
621         size_t r;
622         switch(_state)
623         {
624             case State.HEADERS_RECEIVE:
625                 _response_headers_buffer.append(_current_input);
626                 () @trusted {
627                     // ptr used in http_parser_execute callback
628                     _current_buffer_ptr = _current_input.data.ptr;
629                     _current_buffer_length = _current_input.data.length;
630                     r = http_parser_execute(&_parser, &_parser_settings, cast(char*)_current_buffer_ptr, _current_buffer_length);
631                 }();
632                 assert(r == _current_buffer_length);
633                 _offset += _current_buffer_length;
634                 _current_buffer_length = 0;
635                 break;
636             case State.BODY_RECEIVE:
637                 () @trusted {
638                     // ptr used in http_parser_execute callback
639                     _current_buffer_ptr = _current_input.data.ptr;
640                     _current_buffer_length = _current_input.data.length;
641                     r = http_parser_execute(&_parser, &_parser_settings, cast(char*)_current_buffer_ptr, _current_buffer_length);
642                 }();
643                 assert(r == _current_buffer_length, "r=%d on body receive, instead of %d".format(r, _current_buffer_length));
644                 break;
645             case State.DONE:
646                 break;
647             default:
648                 assert(0);
649         }
650         // we processed data, what we have to do in new state
651         switch(_state)
652         {
653             case State.BODY_RECEIVE, State.HEADERS_RECEIVE:
654                 IORequest iorq;
655                 iorq.to_read = _client._buffer_size;
656                 iorq.callback = &_receiving_response;
657                 _parser.data = &_client._handler;
658                 _connection.io(_client._loop, iorq, _client._receive_timeout);
659                 return;
660             default:
661                 return;
662         }
663     }
664 
665     private void _send_request_done(ref IOResult res) @safe
666     {
667         //debug(hiohttp) tracef("result: %s", res);
668         if ( res.error || res.timedout )
669         {
670             _state = State.ERROR;
671             _callback(_result);
672             return;
673         }
674         IORequest iorq;
675         iorq.to_read = _client._buffer_size;
676         iorq.callback = &_receiving_response;
677         _parser.data = &_client._handler;
678         _state = State.HEADERS_RECEIVE;
679         _connection.io(_client._loop, iorq, _client._receive_timeout);
680     }
681     private void _send_request() @safe
682     {
683         assert(_state == State.SENDING_REQUEST);
684         IORequest iorq;
685         iorq.output = _client._build_request_header();
686         iorq.callback = &_send_request_done;
687         _connection.io(_client._loop, iorq, _client._send_timeout);
688     }
689     private State onHeadersComplete()
690     {
691         debug(hiohttp) tracef("Checking headers");
692         _result.status_code = _parser.status_code;
693         _response_body.clear;
694         _result.response_headers = _response_headers;
695         return State.BODY_RECEIVE;
696     }
697     private void onBody(long off, size_t len)
698     {
699         import std.stdio;
700         NbuffChunk b = _current_input[off..off+len];
701         if ( _content_encoding == ContentEncoding.GZIP || _content_encoding == ContentEncoding.DEFLATE )
702         {
703             int processed = 0;
704             while(processed < b.length)
705             {
706                 auto r = _zlib.zInflate(b[processed..$]);
707                 processed += r.consumed;
708                 if (r.result.length)
709                 {
710                     _response_body.append(r.result);
711                 }
712                 if ( r.status != 0 )
713                 {
714                     break;
715                 }
716             }
717         }
718         else
719         {
720             _response_body.append(b);
721         }
722     }
723     private void onBodyComplete()
724     {
725         debug(hiohttp) tracef("Body complete");
726         _result.response_body = _response_body;
727         AsyncHTTPResult result = _result;
728         auto cb = _callback;
729         reset();
730         cb(result);
731         return;
732     }
733     private void onTimeout() @safe
734     {
735         _result.status_code = -1;
736         _result.error = AsyncHTTPErrors.Timeout;
737         AsyncHTTPResult result = _result;
738         auto cb = _callback;
739         reset();
740         cb(result);
741         return;
742     }
743     private void onError() @safe
744     {
745         _result.status_code = -1;
746         _result.error = AsyncHTTPErrors.DataError;
747         AsyncHTTPResult result = _result;
748         auto cb = _callback;
749         reset();
750         cb(result);
751         return;
752     }
753     void execute() @safe
754     {
755         assert(_connection.connected);
756         assert(_state == State.INIT);
757         debug(hiohttp) tracef("handling %s", _client._current_url);
758         _state = State.SENDING_REQUEST;
759         http_parser_init(&_parser, http_parser_type.HTTP_RESPONSE);
760         _send_request();
761     }
762 }
763 
764 class HTTPClient
765 {
766     import core.thread: Fiber;
767     private
768     {
769         AsyncHTTPClient _async_client;
770     }
771     this()
772     {
773         _async_client = new AsyncHTTPClient();
774     }
775     AsyncHTTPResult execute(Method method, URL url)  @safe
776     {
777         AsyncHTTPResult result;
778         Fiber f = Fiber.getThis();
779         assert(f !is null, "You can call this only inside from task/fiber");
780         void callback(AsyncHTTPResult r) @trusted
781         {
782             result = r;
783             f.call();
784         }
785         _async_client.execute(method, url, &callback);
786         () @trusted {
787             Fiber.yield();
788         }();
789         return result;
790     }
791     void close()
792     {
793         if ( _async_client !is null )
794         {
795             _async_client.close();
796         }
797     }
798 }
799 
800 
801 struct AsyncHTTPResult
802 {
803     int             status_code = -1;
804     AsyncHTTPErrors error = AsyncHTTPErrors.None;
805     Nbuff           response_body;
806     MessageHeaders  response_headers;
807     private
808     {
809         HashMap!(string, NbuffChunk) cached_headers;
810     }
811     NbuffChunk getHeader(string h) @safe
812     {
813         auto hl = h.toLower;
814         auto f = cached_headers.fetch(hl);
815         if ( f.ok )
816         {
817             return f.value;
818         }
819         // iterate over non-cached headers, store in cache and return value if found
820         while(!response_headers.empty)
821         {
822             auto hdr = response_headers.front;
823             response_headers.popFront;
824             string lowered_field = hdr.field.toLower();
825             cached_headers.put(lowered_field, hdr.value);
826             if ( lowered_field == hl )
827             {
828                 return hdr.value;
829             }
830         }
831         return NbuffChunk();
832     }
833 }
834 
835 unittest
836 {
837     AsyncHTTPResult r;
838     r.response_headers.pushBack(MessageHeader(NbuffChunk("Abc"), NbuffChunk("abc")));
839     r.response_headers.pushBack(MessageHeader(NbuffChunk("Def"), NbuffChunk("bbb")));
840     auto abc = r.getHeader("abc");
841     assert(abc.data == "abc");
842     abc = r.getHeader("abc");
843     assert(abc.data == "abc");
844     abc = r.getHeader("def");
845     assert(abc.data == "bbb");
846     abc = r.getHeader("none");
847     assert(abc.empty);
848 }
849 
850 unittest
851 {
852     import std.stdio;
853     import hio.scheduler;
854     import std.experimental.logger;
855     globalLogLevel = LogLevel.info;
856     App({
857         info("Test httpclient in task");
858         HTTPClient c = new HTTPClient();
859         auto r = c.execute(Method("GET"), parse_url("https://httpbin.org/get"));
860         writefln("result: %s", r);
861         r = c.execute(Method("GET"), parse_url("https://httpbin.org/get"));
862         writefln("result: %s", r);
863         c.close();
864     });
865     uninitializeLoops();
866 }
867 
868 unittest
869 {
870     import std.stdio;
871     import hio.scheduler;
872     import std.datetime;
873     import std.experimental.logger;
874     globalLogLevel = LogLevel.info;
875     info("Test http.client");
876     App({
877         info("test stream");
878         AsyncHTTPClient client = new AsyncHTTPClient();
879         void callback(AsyncHTTPResult result) @safe
880         {
881             //infof("Client called back with result: %s", result);
882             () @trusted {
883                 if ( result.status_code < 0 )
884                 {
885                     //writefln("<<<error: %s>>>", result.error.msg);
886                     //writefln("<<<erro %s>>>", result.error.msg);
887                     return;
888                 }
889                 //writefln("<<<body %s>>>", cast(string)result.response_body.data.data);
890                 //writefln("<<<erro %s>>>", result.error.msg);
891             }();
892             client.close();
893             getDefaultLoop.stop();
894         }
895         URL url = parse_url("http://httpbin.org/stream/100");
896         client.verbosity = 1;
897         client.execute(Method("GET"), url, &callback);
898         getDefaultLoop.run();
899     });
900     globalLogLevel = LogLevel.info;
901     App({
902         info("test conn reuse");
903         int requests = 3;
904         AsyncHTTPClient client = new AsyncHTTPClient();
905         URL url = parse_url("http://httpbin.org/get");
906         void callback(AsyncHTTPResult result) @safe
907         {
908             if ( result.status_code != 200 )
909             {
910                 debug(hiohttp) tracef("status code = %d", result.status_code);
911             }
912             debug writefln("<%s>", cast(string)result.response_body.data.data);
913             if (--requests == 0)
914             {
915                 client.close();
916                 getDefaultLoop.stop();
917                 return;
918             }
919             client.addHeader(Header("X-Request-No", "%d".format(requests)));
920             client.execute(Method("GET"), url, &callback);
921         }
922         client.verbosity = 1;
923         client.addHeader(Header("X-Request-No", "%d".format(requests)));
924         client.execute(Method("GET"), url, &callback);
925         hlSleep(25.seconds);
926     });
927     // test redirects - disabled while httpbin.org give 404 on this url
928     // App({
929     //     info("test absolute redirects");
930     //     AsyncHTTPClient client = new AsyncHTTPClient();
931     //     void callback(AsyncHTTPResult result) @safe
932     //     {
933     //         assert(result.status_code == 200);
934     //         debug writefln("<%s>", cast(string)result.response_body.data.data);
935     //         client.close();
936     //         getDefaultLoop.stop();
937     //     }
938     //     URL url = parse_url("http://httpbin.org/absolute-redirect/3");
939     //     client.verbosity = 1;
940     //     client.execute(Method("GET"), url, &callback);
941     //     hlSleep(25.seconds);
942     // });
943 
944     // App({
945     //     info("test absolute redirects with limited redirects number");
946     //     AsyncHTTPClient client = new AsyncHTTPClient();
947     //     void callback(AsyncHTTPResult result) @safe
948     //     {
949     //         assert(result.status_code == -1);
950     //         assert(result.error == AsyncHTTPErrors.MaxRedirectsReached);
951     //         debug writefln("<%s>", cast(string)result.response_body.data.data);
952     //         client.close();
953     //         getDefaultLoop.stop();
954     //     }
955     //     URL url = parse_url("http://httpbin.org/absolute-redirect/3");
956     //     client.verbosity = 1;
957     //     client._max_redirects = 1;
958     //     client.execute(Method("GET"), url, &callback);
959     //     hlSleep(25.seconds);
960     // });
961     globalLogLevel = LogLevel.info;
962     App({
963         AsyncHTTPClient client = new AsyncHTTPClient();
964         void callback(AsyncHTTPResult result) @safe
965         {
966             () @trusted {
967                 assert(result.status_code == -1);
968                 assert(result.error == AsyncHTTPErrors.ConnFailed);
969             }();
970             client.close();
971             getDefaultLoop.stop();
972         }
973         URL url = parse_url("http://2.2.2.2/");
974         client.connect_timeout = 1.seconds;
975         client.execute(Method("GET"), url, &callback);
976         hlSleep(10.seconds);
977     });
978 
979     globalLogLevel = LogLevel.info;
980     App({
981         AsyncHTTPClient client = new AsyncHTTPClient();
982         void callback(AsyncHTTPResult result) @safe
983         {
984             () @trusted {
985                 assert(result.status_code == 200);
986                 writefln("<%s>", cast(string)result.response_body.data.data);
987             }();
988             client.close();
989             getDefaultLoop.stop();
990         }
991         URL url = parse_url("http://httpbin.org/gzip");
992         client.connect_timeout = 1.seconds;
993         client.verbosity = 1;
994         client.addHeader(Header("Accept-Encoding","gzip"));
995         client.execute(Method("GET"), url, &callback);
996         hlSleep(10.seconds);
997     });
998     globalLogLevel = LogLevel.info;
999     App({
1000         AsyncHTTPClient client = new AsyncHTTPClient();
1001         void callback(AsyncHTTPResult result) @safe
1002         {
1003             () @trusted {
1004                 assert(result.status_code == 200);
1005                 writefln("<%s>", cast(string)result.response_body.data.data);
1006             }();
1007             client.close();
1008             getDefaultLoop.stop();
1009         }
1010         URL url = parse_url("http://httpbin.org/deflate");
1011         client.connect_timeout = 1.seconds;
1012         client.verbosity = 0;
1013         client.addHeader(Header("Accept-Encoding","deflate"));
1014         client.execute(Method("GET"), url, &callback);
1015         hlSleep(10.seconds);
1016     });
1017     globalLogLevel = LogLevel.info;
1018     App({
1019         AsyncHTTPClient client = new AsyncHTTPClient();
1020         void callback(AsyncHTTPResult result) @safe
1021         {
1022             () @trusted {
1023                 assert(result.status_code == 200);
1024                 writefln("<%s>", cast(string)result.response_body.data.data);
1025             }();
1026             client.close();
1027             getDefaultLoop.stop();
1028         }
1029         URL url = parse_url("https://httpbin.org/gzip");
1030         client.connect_timeout = 1.seconds;
1031         client.verbosity = 0;
1032         client.addHeader(Header("Accept-Encoding","gzip"));
1033         client.execute(Method("GET"), url, &callback);
1034         hlSleep(10.seconds);
1035     });
1036     uninitializeLoops();
1037 }