1 /** Network transport management implementation for JSON-RPC data.
2
3 You attach a transport to your RPCClient and a listener to your RPCServers,
4 but you do not need to use the APIs directly.
5
6 Example:
7 ---
8 interface IMyFuncs { void f(); }
9 class MyFuncs : IMyFuncs { void f() { return; }
10
11 // TCP sockets are the default - you don't have to name them explicitly...
12 auto server = new RPCServer!(MyFuncs, TCPListener!MyFuncs)
13 ("127.0.0.1", 54321);
14 auto client = new RPCClient!(IMyFuncs, TCPTransport)
15 ("127.0.0.1", 54321);
16
17 client.f();
18 ---
19
20 Authors:
21 Ryan Frame
22
23 Copyright:
24 Copyright 2018 Ryan Frame
25
26 License:
27 MIT
28 */
29 module jsonrpc.transport; @safe:
30
31 import std.socket;
32 import std.traits : ReturnType;
33 import jsonrpc.exception;
34
35 version(Have_tested) import tested : test = name;
36 else private struct test { string name; }
37
38 private enum SocketBufSize = 4096;
39
40 /** Check whether the specified object is a valid transport for RPCClients. */
41 enum bool isTransport(T) =
42 is(T == struct) &&
43 is(ReturnType!((T t) => t.send([])) == size_t) &&
44 is(ReturnType!((T t) => t.receiveJSONObjectOrArray) == char[]) &&
45 is(typeof((T t) => t.close)) &&
46 is(ReturnType!((T t) => t.isAlive) == bool);
47
48 /** Check whether the specified object is a valid listener for RPCServers. */
49 enum bool isListener(T) =
50 is(T == struct);
51
52
53 /** Receive a JSON object or array. Mixin template for transport implementations.
54
55 If your transport provides a `receiveData` function defined as
56 $(D_INLINECODE char[] receiveData(); ) the receiveJSONObjectOrArray will
57 call it and return the first complete JSON object or array from the char
58 stream. Any trailing data is thrown away.
59 */
60 mixin template ReceiveJSON() {
61 /** Receive a single JSON object or array from the socket stream.
62
63 Any trailing data is thrown away.
64 */
65 char[] receiveJSONObjectOrArray() {
66 auto data = receiveData();
67
68 char startBrace;
69 char endBrace;
70 if (data[0] == '{') {
71 startBrace = '{';
72 endBrace = '}';
73 } else if (data[0] == '[') {
74 startBrace = '[';
75 endBrace = ']';
76 } else {
77 raise!(InvalidDataReceived, data)
78 ("Expected to receive a JSON object or array.");
79 }
80
81 // Count the braces we receive. If we don't have a full object/array,
82 // receive until we do.
83 int braceCount = 0;
84 size_t loc = 0;
85 while(true) {
86 for (; loc < data.length; ++loc) {
87 if (data[loc] == startBrace) ++braceCount;
88 else if (data[loc] == endBrace) --braceCount;
89 }
90
91 // If we receive an incomplete object, get more data and repeat as
92 // needed.
93 if (braceCount > 0) {
94 data ~= receiveData();
95 } else return data;
96 }
97 }
98 }
99
100
101 /** Manage TCP transport connection details and tasks. */
102 struct TCPTransport {
103 static assert(isTransport!TCPTransport);
104
105 package:
106
107 /** Instantiate a TCPTransport object.
108
109 Params:
110 host = The hostname to connect to.
111 port = The port number of the host to connect to.
112 */
113 this(string host, ushort port) in {
114 assert(host.length > 0);
115 } body {
116 this(new TcpSocket(getAddress(host, port)[0]));
117 }
118
119 /** Send the provided data and return the number of bytes sent.
120
121 If the return value is not equal to the length of the input in bytes,
122 there was a transmission error.
123
124 Params:
125 data = The string data to send.
126 */
127 size_t send(const char[] data) {
128 ptrdiff_t bytesSent = 0;
129 while (bytesSent < data.length) {
130 auto sent = _socket.send(data[bytesSent..$]);
131 if (sent == Socket.ERROR || sent == 0) break;
132 bytesSent += sent;
133 }
134 return bytesSent;
135 }
136
137 mixin ReceiveJSON;
138
139 /** Close the transport's underlying socket. */
140 void close() {
141 _socket.shutdown(SocketShutdown.BOTH);
142 _socket.close();
143 }
144
145 /** Query the transport to see if it's still active. */
146 nothrow
147 bool isAlive() {
148 scope(failure) return false;
149 return _socket.isAlive();
150 }
151
152 private:
153
154 /** Receive incoming data. */
155 char[] receiveData() {
156 char[SocketBufSize] buf;
157 ptrdiff_t receivedBytes = 0;
158
159 receivedBytes = _socket.receive(buf);
160 if (receivedBytes <= 0) return [];
161 return buf[0..receivedBytes].dup;
162 }
163
164 Socket _socket;
165
166 /** This constructor is for unit testing. */
167 package this(Socket socket) {
168 _socket = socket;
169 _socket.blocking = true;
170 }
171 }
172
173 /** Listen for incoming connections and pass clients to a handler function.
174
175 Template_Parameters:
176 API = The class containing the methods for the server to execute.
177 */
178 struct TCPListener(API) {
179 static assert(isListener!(TCPListener!(API)));
180
181 package:
182
183 /** Instantiate a TCPListener object.
184
185 Params:
186 host = The hostname to connect to.
187 port = The port number of the host to connect to.
188 */
189 this(string host, ushort port) in {
190 assert(host.length > 0);
191 } body {
192 _socket = new TcpSocket();
193 _socket.blocking = true;
194 _socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
195 _socket.bind(getAddress(host, port)[0]);
196 }
197
198 /** Listen for client requests.
199
200 `listen` will call the specified handler function in a new thread to
201 handle each client it accepts.
202
203 Template_Parameters:
204 handler = The handler function to call when a client connects.
205
206 Params:
207 api = An instantiated class with the methods to
208 execute.
209 maxQueuedConnections = The maximum number of connections to backlog
210 before refusing connections.
211 */
212 void listen(alias handler)(API api, int maxQueuedConnections = 10) {
213 _socket.listen(maxQueuedConnections);
214 if (! _socket.isAlive) {
215 raise!(ConnectionException)("Listening socket not active.");
216 }
217
218 while (true) {
219 import std.parallelism : task;
220 auto conn = _socket.accept();
221 task!handler(TCPTransport(conn), api).executeInNewThread();
222 }
223 }
224
225 private:
226
227 Socket _socket;
228 }
229
230 version(unittest) @system:
231
232 @test("receiveJSONObjectOrArray can receive a JSON object")
233 unittest {
234 interface I {}
235 auto sock = new FakeSocket();
236 auto transport = TCPTransport(sock);
237 enum val = cast(char[])`{"id":23,"method":"func","params":[1,2,3]}`;
238
239 sock._receiveReturnValue = val;
240 auto ret = transport.receiveJSONObjectOrArray();
241 assert(ret == val);
242 }
243
244 @test("receiveJSONObjectOrArray can receive a JSON array")
245 unittest {
246 interface I {}
247 auto sock = new FakeSocket();
248 auto transport = TCPTransport(sock);
249 enum val = cast(char[])
250 `[{"id":23,"method":"func","params":[1,2,3]},
251 {"id":24,"method":"func","params":[1,2,3]},
252 {"id":25,"method":"func","params":[1,2,3]},
253 {"method":"func","params":[1,2,3]},
254 {"id":26,"method":"func","params":[1,2,3]}]`;
255
256 sock._receiveReturnValue = val;
257 auto ret = transport.receiveJSONObjectOrArray();
258 assert(ret == val);
259 }
260
261 @test("receiveJSONObjectOrArray throws an exception if not given an array or object")
262 unittest {
263 import std.exception : assertThrown;
264 interface I {}
265 auto sock = new FakeSocket();
266 auto transport = TCPTransport(sock);
267 enum val = cast(char[])`"id":23,"method":"func","params":[1,2,3]}`;
268
269 sock._receiveReturnValue = val;
270 assertThrown!InvalidDataReceived(transport.receiveJSONObjectOrArray());
271 }
272
273 @test("receiveJSONObjectOrArray receives a full object when its length exceeds SocketBufSize")
274 unittest {
275 import std.array : array;
276 import std.range : repeat, takeExactly;
277 auto sock = new FakeSocket();
278 auto transport = TCPTransport(sock);
279
280 // This gives us a length of SocketBufSize+8.
281 enum key = 'a'.repeat().takeExactly(SocketBufSize/2).array;
282 enum val = 'b'.repeat().takeExactly(SocketBufSize/2).array;
283 auto sockReturn = cast(char[]) (`{"` ~ key ~ `": "` ~ val ~ `"}`);
284
285 sock._receiveReturnValue = sockReturn;
286 auto ret = transport.receiveJSONObjectOrArray();
287 assert(cast(string) ret == sockReturn);
288 }
289
290 version(unittest) {
291 class FakeSocket : Socket {
292 private bool _blocking;
293 private bool _isAlive;
294
295 private char[] _receiveReturnValue =
296 cast(char[])`{"id":3,"result":[1,2,3]}`;
297
298 private char[] _lastDataSent;
299
300 @property receiveReturnValue(inout char[] s) {
301 _receiveReturnValue = cast(char[])s;
302 }
303
304 @property lastDataSent() { return _lastDataSent; }
305
306 @property char[] receiveReturnValue() { return _receiveReturnValue; }
307
308 override void bind(Address addr) { _isAlive = true; }
309
310 override const nothrow @nogc @property @trusted bool blocking() {
311 return _blocking;
312 }
313
314 override @property @trusted void blocking(bool byes) {
315 _blocking = byes;
316 }
317
318 override @trusted void setOption(SocketOptionLevel level,
319 SocketOption option, void[] value) {}
320
321 override const @property @trusted bool isAlive() { return _isAlive; }
322
323 override @trusted void listen(int backlog) { _isAlive = true; }
324
325 alias receive = Socket.receive;
326 override @trusted ptrdiff_t receive(void[] buf) {
327 if (buf.length == 0) return 0;
328 auto ret = fillBuffer(cast(char*)buf.ptr, buf.length);
329 _receiveReturnValue = _receiveReturnValue[ret..$];
330 return ret;
331 }
332
333 @test("FakeSocket.receive allows injecting 'received' characters.")
334 unittest {
335 auto s = new FakeSocket;
336 char[] buf = new char[](SocketBufSize);
337 s.receiveReturnValue = `{"id":3,"result":[1,2,3]}`;
338
339 auto len = s.receive(buf);
340 assert(buf[0..len] == `{"id":3,"result":[1,2,3]}`,
341 "Incorrect data received: " ~ buf);
342 }
343
344 alias send = Socket.send;
345 override @trusted ptrdiff_t send(const(void)[] buf) {
346 _lastDataSent = cast(char[])buf;
347 return buf.length;
348 }
349
350 private @trusted ptrdiff_t fillBuffer(char* ptr, size_t length) {
351 import std.algorithm.comparison : min;
352 char[] p = ptr[0..length];
353 ptrdiff_t cnt;
354 for (cnt = 0; cnt < min(length, receiveReturnValue.length); ++cnt) {
355 ptr[cnt] = receiveReturnValue[cnt];
356 }
357 return cnt;
358 }
359 }
360 }