1 /// 2 module hio.redisd.client; 3 4 import std.typecons; 5 import std.stdio; 6 import std.algorithm; 7 import std.range; 8 import std.string; 9 import std.datetime; 10 11 import std.meta: allSatisfy; 12 import std.traits: isSomeString; 13 14 import std.experimental.logger; 15 16 import hio.socket; 17 import hio.resolver; 18 import hio.redisd.codec; 19 20 import hio.http.common: URL, parse_url; 21 22 private immutable bufferSize = 2*1024; 23 24 /// 25 class NotAuthenticated : Exception { 26 /// 27 this(string msg) { 28 super(msg); 29 } 30 } 31 /// client API 32 class Client { 33 34 private { 35 URL _url; 36 HioSocket _connection; 37 Decoder _input_stream; 38 Duration _timeout = 1.seconds; 39 } 40 /// Constructor 41 this(string url="redis://localhost:6379") { 42 _url = parse_url(url); 43 _connection = new HioSocket(); 44 _input_stream = new Decoder(); 45 if ( _url.userinfo ) { 46 auto v = execCommand("AUTH", _url.userinfo[1..$]); 47 if ( v.svar != "OK" ) { 48 throw new NotAuthenticated("Can't authenticate"); 49 } 50 } 51 } 52 auto strerror() 53 { 54 return _connection.strerror(); 55 } 56 auto errno() 57 { 58 return _connection.errno(); 59 } 60 61 void connect() 62 { 63 debug(hioredis) tracef("connecting to %s:%s", _url.host, _url.port); 64 auto r = hio_gethostbyname(_url.host, _url.port); 65 66 if (r.status == ARES_SUCCESS) 67 { 68 _connection.connect(r.addresses[0], _timeout); 69 } 70 else 71 { 72 throw new SocketException("can't resolve %s".format(_url.host)); 73 } 74 } 75 private void reconnect() { 76 if ( _connection ) 77 { 78 _connection.close(); 79 } 80 _connection = new HioSocket(); 81 connect(); 82 _input_stream = new Decoder; 83 if (_url.userinfo) { 84 auto auth = execCommand("AUTH", _url.userinfo[1..$]); 85 if (auth.svar != "OK") { 86 throw new NotAuthenticated("Can't authenticate"); 87 } 88 } 89 } 90 /// Build redis command from command name and args. 91 /// All args must be of type string. 92 RedisdValue makeCommand(A...)(A args) { 93 static assert(allSatisfy!(isSomeString, A), "all command parameters must be of type string"); 94 return redisdValue(tuple(args)); 95 } 96 /// Build and execute redis transaction from command array. 97 RedisdValue transaction(RedisdValue[] commands) { 98 RedisdValue[] results; 99 RedisdValue r = this.execCommand("MULTI"); 100 foreach (c; commands) { 101 exec(c); 102 } 103 r = this.execCommand("EXEC"); 104 return r; 105 } 106 /// build and execute redis pipeline from commands array. 107 RedisdValue[] pipeline(RedisdValue[] commands) { 108 immutable(ubyte)[] data = commands.map!encode.join(); 109 _connection.send(data); 110 RedisdValue[] response; 111 while (response.length < commands.length) { 112 debug(hioredis) tracef("response length=%d, commands.length=%d", response.length, commands.length); 113 auto r = _connection.recv(bufferSize); 114 if (r.timedout || r.error || r.input.length == 0) 115 { 116 break; 117 } 118 _input_stream.put(r.input); 119 while(true) { 120 auto v = _input_stream.get(); 121 if (v.type == ValueType.Incomplete) { 122 break; 123 } 124 response ~= v; 125 if (v.type == ValueType.Error 126 && cast(string) v.svar[4 .. 18] == "Protocol error") { 127 debug(hioredis) 128 trace("reopen connection"); 129 _connection.close(); 130 reconnect(); 131 return response; 132 } 133 } 134 } 135 return response; 136 } 137 138 private RedisdValue exec(RedisdValue command) { 139 RedisdValue response; 140 _connection.send(command.encode); 141 while (true) { 142 auto r = _connection.recv(bufferSize); 143 if ( r.error || r.timedout || r.input.length == 0) 144 { 145 break; 146 } 147 _input_stream.put(r.input); 148 response = _input_stream.get(); 149 if (response.type != ValueType.Incomplete) { 150 break; 151 } 152 } 153 if (response.type == ValueType.Error && response.svar[4 .. 18] == "Protocol error") { 154 _connection.close(); 155 debug(hioredis) 156 trace("reopen connection"); 157 reconnect(); 158 } 159 if (response.type == ValueType.Error && response.svar[0..6] == "NOAUTH" ) { 160 throw new NotAuthenticated("Auth required"); 161 } 162 return response; 163 } 164 /// build and execute single redis command. 165 RedisdValue execCommand(A...)(A args) { 166 immutable(ubyte)[][] data; 167 RedisdValue request = makeCommand(args); 168 RedisdValue response; 169 debug(hioredis) tracef("send request %s", request); 170 _connection.send(request.encode); 171 while(true) { 172 auto r = _connection.recv(bufferSize); 173 if (r.error || r.timedout || r.input.length == 0) 174 { 175 break; 176 } 177 auto b = r.input; 178 _input_stream.put(b); 179 response = _input_stream.get(); 180 if (response.type != ValueType.Incomplete) { 181 break; 182 } 183 } 184 if ( response.type == ValueType.Error && 185 cast(string)response.svar[4..18] == "Protocol error") { 186 _connection.close(); 187 debug(hioredis) trace("reopen connection"); 188 reconnect(); 189 } 190 if (response.type == ValueType.Error && response.svar[0 .. 6] == "NOAUTH") { 191 throw new NotAuthenticated("Auth required"); 192 } 193 debug(hioredis) tracef("got response %s", response); 194 return response; 195 } 196 /// Simple key/value set 197 RedisdValue set(K, V)(K k, V v) { 198 return execCommand("SET", k, v); 199 } 200 /// Simple key/value get 201 RedisdValue get(K)(K k) { 202 return execCommand("GET", k); 203 } 204 /// Consume reply 205 RedisdValue read() { 206 RedisdValue response; 207 response = _input_stream.get(); 208 while(response.type == ValueType.Incomplete) { 209 auto r = _connection.recv(bufferSize); 210 if ( r.error || r.timedout || r.input.length == 0) 211 { 212 break; 213 } 214 _input_stream.put(r.input); 215 response = _input_stream.get(); 216 if (response.type != ValueType.Incomplete) { 217 break; 218 } 219 } 220 if (response.type == ValueType.Error && cast(string) response.svar[4 .. 18] 221 == "Protocol error") { 222 _connection.close(); 223 debug(hioredis) 224 trace("reopen connection"); 225 reconnect(); 226 } 227 return response; 228 } 229 230 bool connected() 231 { 232 return _connection.connected; 233 } 234 235 void close() 236 { 237 _connection.close(); 238 _connection = null; 239 debug(hioredis) trace("connection closed"); 240 } 241 }