From 52c32c2e74eb3bc7442019a6623c54c7987971eb Mon Sep 17 00:00:00 2001 From: Linloir <3145078758@qq.com> Date: Fri, 14 Oct 2022 13:35:55 +0800 Subject: [PATCH] Bug Fix and API adjustment - Check token validity - Prevent unlogged user to fetch contact - Fix missing 'tolist' conversion in contact fetch method - (IMPORTANT) Fix async bug in TCP controller (reconstruct the handler) --- bin/tcp_server.dart | 12 +++-- lib/database.dart | 51 ++++++++++++++++---- lib/requesthandler.dart | 8 ++-- lib/tcpcontroller/controller.dart | 80 +++++++++++++++++++------------ pubspec.lock | 2 +- pubspec.yaml | 1 + 6 files changed, 106 insertions(+), 48 deletions(-) diff --git a/bin/tcp_server.dart b/bin/tcp_server.dart index 7ee616b..5a75e4e 100644 --- a/bin/tcp_server.dart +++ b/bin/tcp_server.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-06 15:44:16 - * @LastEditTime : 2022-10-12 18:23:31 + * @LastEditTime : 2022-10-14 10:26:00 * @Description : */ @@ -30,9 +30,9 @@ void main(List arguments) async { listenSocket.listen( (socket) { var controller = TCPController(socket: socket); - controller.inStream.listen((request) async { + controller.requestStreamBroadcast.listen((request) async { print('[L] ${request.toJSON}'); - if(request.tokenID == null) { + if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) { if(controllerMap[controller] == null) { controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))(); } @@ -46,7 +46,6 @@ void main(List arguments) async { ); controller.outStream.add(tokenResponse); } - //TODO: check if token id is not in tokenid list tokenMap[request.tokenID!] = tokenMap[request.tokenID!] ?? controller; switch(request.requestType) { case RequestType.checkState: { @@ -151,6 +150,11 @@ void main(List arguments) async { controller.outStream.add(response); break; } + case RequestType.addContact: { + var response = await onAddContact(request, socket); + controller.outStream.add(response); + break; + } case RequestType.fetchContact: { var response = await onFetchContact(request, socket); controller.outStream.add(response); diff --git a/lib/database.dart b/lib/database.dart index 2d52a6a..9a82628 100644 --- a/lib/database.dart +++ b/lib/database.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-06 16:15:01 - * @LastEditTime : 2022-10-13 20:36:34 + * @LastEditTime : 2022-10-14 12:13:23 * @Description : */ @@ -83,6 +83,24 @@ class DataBaseHelper { ); } + Future isTokenValid({ + required int? tokenid, + }) async { + if(tokenid == null) { + return false; + } + + var tokenQueryResult = await _database.query( + 'tokens', + where: 'tokenid = ?', + whereArgs: [ + tokenid + ] + ); + + return tokenQueryResult.isNotEmpty; + } + //Creates new token Future createToken() async { //Insert new row @@ -628,13 +646,19 @@ class DataBaseHelper { } //Find current binded userID - var currentUserID = (await _database.query( + var currentUserIDQueryResult = (await _database.query( 'bindings', where: 'tokenid = ?', whereArgs: [ tokenID ] - ))[0]['userid'] as int; + )); + + if(currentUserIDQueryResult.isEmpty) { + throw Exception('User not logged in'); + } + + var currentUserID = currentUserIDQueryResult[0]['userid'] as int; //Fetch all contacts var contactsQueryResult = await _database.query( @@ -665,14 +689,19 @@ class DataBaseHelper { throw Exception('Invalid device token'); } - //Find current binded userID - var currentUserID = (await _database.query( + var currentUserIDQueryResult = (await _database.query( 'bindings', where: 'tokenid = ?', whereArgs: [ tokenID ] - ))[0]['userid'] as int; + )); + + if(currentUserIDQueryResult.isEmpty) { + throw Exception('User not logged in'); + } + + var currentUserID = currentUserIDQueryResult[0]['userid'] as int; //Fetch pending contacts var contactsQueryResult = await _database.query( @@ -707,13 +736,19 @@ class DataBaseHelper { } //Find current binded userID - var currentUserID = (await _database.query( + var currentUserIDQueryResult = (await _database.query( 'bindings', where: 'tokenid = ?', whereArgs: [ tokenID ] - ))[0]['userid'] as int; + )); + + if(currentUserIDQueryResult.isEmpty) { + throw Exception('User not logged in'); + } + + var currentUserID = currentUserIDQueryResult[0]['userid'] as int; //Fetch pending contacts var contactsQueryResult = await _database.query( diff --git a/lib/requesthandler.dart b/lib/requesthandler.dart index 4ea6fa6..5b0226d 100644 --- a/lib/requesthandler.dart +++ b/lib/requesthandler.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-08 20:52:48 - * @LastEditTime : 2022-10-13 20:30:53 + * @LastEditTime : 2022-10-14 11:29:03 * @Description : */ @@ -259,9 +259,9 @@ Future onFetchContact(TCPRequest request, Socket socket) async { type: ResponseType.fromRequestType(request.requestType), status: ResponseStatus.ok, body: { - "contacts": contacts.map((e) => e.jsonObject), - "pending": pendingContacts.map((e) => e.jsonObject), - "requesting": requestingContacts.map((e) => e.jsonObject) + "contacts": contacts.map((e) => e.jsonObject).toList(), + "pending": pendingContacts.map((e) => e.jsonObject).toList(), + "requesting": requestingContacts.map((e) => e.jsonObject).toList() } ); } on Exception catch (exception) { diff --git a/lib/tcpcontroller/controller.dart b/lib/tcpcontroller/controller.dart index da5ff12..58d06f9 100644 --- a/lib/tcpcontroller/controller.dart +++ b/lib/tcpcontroller/controller.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-08 15:10:04 - * @LastEditTime : 2022-10-12 13:45:01 + * @LastEditTime : 2022-10-14 10:23:16 * @Description : */ @@ -9,6 +9,7 @@ import 'dart:async'; import 'dart:io'; import 'dart:typed_data'; +import 'package:async/async.dart'; import 'package:tcp_server/tcpcontroller/request.dart'; import 'package:tcp_server/tcpcontroller/response.dart'; @@ -24,26 +25,31 @@ class TCPController { int payloadLength = 0; //Construct a stream which emits events on intact requests - StreamController> _requestStreamController = StreamController()..close(); + StreamController> _requestRawStreamController = StreamController(); + StreamController _payloadRawStreamController = StreamController(); //Construct a payload stream which forward the incoming byte into temp file - StreamController> _payloadStreamController = StreamController()..close(); + StreamController> _payloadPullStreamController = StreamController()..close(); //Provide a request stream for caller functions to listen on - final StreamController _inStreamController = StreamController(); - Stream get inStream => _inStreamController.stream; + final StreamController _requestStreamController = StreamController(); + Stream? _requestStreamBroadcast; + Stream get requestStreamBroadcast { + _requestStreamBroadcast ??= _requestStreamController.stream.asBroadcastStream(); + return _requestStreamBroadcast!; + } //Provide a post stream for caller functions to push to - final StreamController _outStreamController = StreamController(); - StreamSink get outStream => _outStreamController.sink; + final StreamController _responseStreamController = StreamController(); + StreamSink get outStream => _responseStreamController.sink; TCPController({ required this.socket }) { - socket.listen(socketHandler); + socket.listen(_pullRequest); //This future never ends, would that be bothersome? Future(() async { - await for(var response in _outStreamController.stream) { + await for(var response in _responseStreamController.stream) { await socket.addStream(response.stream); } }); @@ -52,15 +58,26 @@ class TCPController { // _outStreamController.stream.listen((response) async { // await socket.addStream(response.stream); // }); + Future(() async { + var requestQueue = StreamQueue(_requestRawStreamController.stream); + var payloadQueue = StreamQueue(_payloadRawStreamController.stream); + while(await Future(() => !_requestRawStreamController.isClosed && !_payloadRawStreamController.isClosed)) { + var response = await requestQueue.next; + var payload = await payloadQueue.next; + await _pushRequest(requestBytes: response, tempFile: payload); + } + requestQueue.cancel(); + payloadQueue.cancel(); + }); } //Listen to the incoming stream and emits event whenever there is a intact request - void socketHandler(Uint8List fetchedData) { + void _pullRequest(Uint8List fetchedData) { //Put incoming data into buffer buffer.addAll(fetchedData); //Consume buffer until it's not enough for first 8 byte of a message while(true) { - if(requestLength == 0 && payloadLength == 0 && _payloadStreamController.isClosed) { + if(requestLength == 0 && payloadLength == 0 && _payloadPullStreamController.isClosed) { //New request if(buffer.length >= 8) { //Buffered data has more than 8 bytes, enough to read request length and body length @@ -68,24 +85,19 @@ class TCPController { payloadLength = Uint8List.fromList(buffer.sublist(4, 8)).buffer.asInt32List()[0]; //Clear the length indicator bytes buffer.removeRange(0, 8); - //Create temp file to read payload (might be huge) - var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}')..createSync(); //Initialize payload transmission controller - _payloadStreamController = StreamController(); + _payloadPullStreamController = StreamController(); //Create a future that listens to the status of the payload transmission - var payloadTransmission = Future(() async { - await for(var data in _payloadStreamController.stream) { - await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true); - } - }); - //Bind request construction on stream - _requestStreamController = StreamController(); - _requestStreamController.stream.listen((requestBytes) { - //When request stream is closed by controller - payloadTransmission.then((_) { - _inStreamController.add(TCPRequest(requestBytes, tempFile)); + () { + var payloadPullStream = _payloadPullStreamController.stream; + var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}')..createSync(); + Future(() async { + await for(var data in payloadPullStream) { + await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true); + } + _payloadRawStreamController.add(tempFile); }); - }); + }(); } else { //Buffered data is not long enough @@ -100,8 +112,7 @@ class TCPController { if(buffer.length >= requestLength) { //Got intact request json //Emit request buffer through stream - _requestStreamController.add(buffer.sublist(0, requestLength)); - _requestStreamController.close(); + _requestRawStreamController.add(buffer.sublist(0, requestLength)); //Remove proccessed buffer buffer.removeRange(0, requestLength); //Clear awaiting request length @@ -118,18 +129,18 @@ class TCPController { if(buffer.length >= payloadLength) { //Last few bytes to emit //Send the last few bytes to stream - _payloadStreamController.add(Uint8List.fromList(buffer.sublist(0, payloadLength))); + _payloadPullStreamController.add(Uint8List.fromList(buffer.sublist(0, payloadLength))); //Clear buffer buffer.removeRange(0, payloadLength); //Set payload length to zero payloadLength = 0; //Close the payload transmission stream - _payloadStreamController.close(); + _payloadPullStreamController.close(); } else { //Part of payload //Transmit all to stream - _payloadStreamController.add(Uint8List.fromList(buffer)); + _payloadPullStreamController.add(Uint8List.fromList(buffer)); //Reduce payload bytes left payloadLength -= buffer.length; //Clear buffer @@ -141,4 +152,11 @@ class TCPController { } } } + + Future _pushRequest({ + required List requestBytes, + required File tempFile + }) async { + _requestStreamController.add(TCPRequest(requestBytes, tempFile)); + } } \ No newline at end of file diff --git a/pubspec.lock b/pubspec.lock index 318f0d2..894b05a 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -23,7 +23,7 @@ packages: source: hosted version: "2.3.1" async: - dependency: transitive + dependency: "direct main" description: name: async url: "https://pub.flutter-io.cn" diff --git a/pubspec.yaml b/pubspec.yaml index 662b334..d7931f1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -12,6 +12,7 @@ dependencies: sqflite_common: ^2.3.0 crypto: ^3.0.2 convert: ^3.0.2 + async: ^2.9.0 # dependencies: # path: ^1.8.0