1 ///
2 module hio.redisd.client;
3 
4 import std.typecons;
5 import std.stdio;
6 import std.algorithm;
7 import std.range;
8 import std.string;
9 import std.datetime;
10 
11 import std.meta: allSatisfy;
12 import std.traits: isSomeString;
13 
14 import std.experimental.logger;
15 
16 import hio.socket;
17 import hio.resolver;
18 import hio.redisd.codec;
19 
20 import hio.http.common: URL, parse_url;
21 
22 private immutable bufferSize = 2*1024;
23 
24 ///
25 class NotAuthenticated : Exception {
26     ///
27     this(string msg) {
28         super(msg);
29     }
30 }
31 /// client API
32 class Client {
33 
34     private {
35         URL         _url;
36         HioSocket   _connection;
37         Decoder     _input_stream;
38         Duration    _timeout = 1.seconds;
39     }
40     /// Constructor
41     this(string url="redis://localhost:6379") {
42         _url = parse_url(url);
43         _connection = new HioSocket();
44         _input_stream = new Decoder();
45         if ( _url.userinfo ) {
46             auto v = execCommand("AUTH", _url.userinfo[1..$]);
47             if ( v.svar != "OK" ) {
48                 throw new NotAuthenticated("Can't authenticate");
49             }
50         }
51     }
52     auto strerror()
53     {
54         return _connection.strerror();
55     }
56     auto errno()
57     {
58         return _connection.errno();
59     }
60 
61     void connect()
62     {
63         debug(hioredis) tracef("connecting to %s:%s", _url.host, _url.port);
64         auto r = hio_gethostbyname(_url.host, _url.port);
65 
66         if (r.status == ARES_SUCCESS)
67         {
68             _connection.connect(r.addresses[0], _timeout);
69         }
70         else
71         {
72             throw new SocketException("can't resolve %s".format(_url.host));
73         }
74     }
75     private void reconnect() {
76         if ( _connection )
77         {
78             _connection.close();
79         }
80         _connection = new HioSocket();
81         connect();
82         _input_stream = new Decoder;
83         if (_url.userinfo) {
84             auto auth = execCommand("AUTH", _url.userinfo[1..$]);
85             if (auth.svar != "OK") {
86                 throw new NotAuthenticated("Can't authenticate");
87             }
88         }
89     }
90     /// Build redis command from command name and args.
91     /// All args must be of type string.
92     RedisdValue makeCommand(A...)(A args) {
93         static assert(allSatisfy!(isSomeString, A), "all command parameters must be of type string");
94         return redisdValue(tuple(args));
95     }
96     /// Build and execute redis transaction from command array.
97     RedisdValue transaction(RedisdValue[] commands) {
98         RedisdValue[] results;
99         RedisdValue r = this.execCommand("MULTI");
100         foreach (c; commands) {
101             exec(c);
102         }
103         r = this.execCommand("EXEC");
104         return r;
105     }
106     /// build and execute redis pipeline from commands array.
107     RedisdValue[] pipeline(RedisdValue[] commands) {
108         immutable(ubyte)[] data = commands.map!encode.join();
109         _connection.send(data);
110         RedisdValue[] response;
111         while (response.length < commands.length) {
112             debug(hioredis) tracef("response length=%d, commands.length=%d", response.length, commands.length);
113             auto r = _connection.recv(bufferSize);
114             if (r.timedout || r.error || r.input.length == 0)
115             {
116                 break;
117             }
118             _input_stream.put(r.input);
119             while(true) {
120                 auto v = _input_stream.get();
121                 if (v.type == ValueType.Incomplete) {
122                     break;
123                 }
124                 response ~= v;
125                 if (v.type == ValueType.Error
126                         && cast(string) v.svar[4 .. 18] == "Protocol error") {
127                     debug(hioredis)
128                         trace("reopen connection");
129                     _connection.close();
130                     reconnect();
131                     return response;
132                 }
133             }
134         }
135         return response;
136     }
137 
138     private RedisdValue exec(RedisdValue command) {
139         RedisdValue response;
140         _connection.send(command.encode);
141         while (true) {
142             auto r = _connection.recv(bufferSize);
143             if ( r.error || r.timedout || r.input.length == 0) 
144             {
145                 break;
146             }
147             _input_stream.put(r.input);
148             response = _input_stream.get();
149             if (response.type != ValueType.Incomplete) {
150                 break;
151             }
152         }
153         if (response.type == ValueType.Error && response.svar[4 .. 18] == "Protocol error") {
154             _connection.close();
155             debug(hioredis)
156                 trace("reopen connection");
157             reconnect();
158         }
159         if (response.type == ValueType.Error && response.svar[0..6] == "NOAUTH" ) {
160             throw new NotAuthenticated("Auth required");
161         }
162         return response;
163     }
164     /// build and execute single redis command.
165     RedisdValue execCommand(A...)(A args) {
166         immutable(ubyte)[][] data;
167         RedisdValue request = makeCommand(args);
168         RedisdValue response;
169         debug(hioredis) tracef("send request %s", request);
170         _connection.send(request.encode);
171         while(true) {
172             auto r = _connection.recv(bufferSize);
173             if (r.error || r.timedout || r.input.length == 0)
174             {
175                 break;
176             }
177             auto b = r.input;
178             _input_stream.put(b);
179             response = _input_stream.get();
180             if (response.type != ValueType.Incomplete) {
181                 break;
182             }
183         }
184         if ( response.type == ValueType.Error && 
185                 cast(string)response.svar[4..18] == "Protocol error") {
186             _connection.close();
187             debug(hioredis) trace("reopen connection");
188             reconnect();
189         }
190         if (response.type == ValueType.Error && response.svar[0 .. 6] == "NOAUTH") {
191             throw new NotAuthenticated("Auth required");
192         }
193         debug(hioredis) tracef("got response %s", response);
194         return response;
195     }
196     /// Simple key/value set
197     RedisdValue set(K, V)(K k, V v) {
198         return execCommand("SET", k, v);
199     }
200     /// Simple key/value get
201     RedisdValue get(K)(K k) {
202         return execCommand("GET", k);
203     }
204     /// Consume reply
205     RedisdValue read() {
206         RedisdValue response;
207         response = _input_stream.get();
208         while(response.type == ValueType.Incomplete) {
209             auto r = _connection.recv(bufferSize);
210             if ( r.error || r.timedout || r.input.length == 0)
211             {
212                 break;
213             }
214             _input_stream.put(r.input);
215             response = _input_stream.get();
216             if (response.type != ValueType.Incomplete) {
217                 break;
218             }
219         }
220         if (response.type == ValueType.Error && cast(string) response.svar[4 .. 18]
221                 == "Protocol error") {
222             _connection.close();
223             debug(hioredis)
224                 trace("reopen connection");
225             reconnect();
226         }
227         return response;
228     }
229 
230     bool connected()
231     {
232         return _connection.connected;
233     }
234 
235     void close()
236     {
237         _connection.close();
238         _connection = null;
239         debug(hioredis) trace("connection closed");
240     }
241 }