From 44ae7faef43a8fabf7d70ca1601da6955eb18cc1 Mon Sep 17 00:00:00 2001 From: Linloir <3145078758@qq.com> Date: Tue, 18 Oct 2022 11:50:14 +0800 Subject: [PATCH] Bug Fix - Support one token -> multi sockets - Tempt to fix concurrency income request causing same temp filename --- bin/tcp_server.dart | 326 +++++++++++++++++------------- lib/database.dart | 14 +- lib/requesthandler.dart | 4 +- lib/tcpcontroller/controller.dart | 51 ++++- 4 files changed, 244 insertions(+), 151 deletions(-) diff --git a/bin/tcp_server.dart b/bin/tcp_server.dart index 5a75e4e..3ab5e9e 100644 --- a/bin/tcp_server.dart +++ b/bin/tcp_server.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-06 15:44:16 - * @LastEditTime : 2022-10-14 10:26:00 + * @LastEditTime : 2022-10-17 22:56:11 * @Description : */ @@ -16,7 +16,9 @@ import 'package:tcp_server/tcpcontroller/response.dart'; void main(List arguments) async { //Set port - var port = arguments.isEmpty ? 20706 : int.tryParse(arguments[0]) ?? 20706; + var address = arguments.isEmpty ? '127.0.0.1' : arguments[0]; + //Set address + var port = arguments.length < 2 ? 20706 : int.tryParse(arguments[1]) ?? 20706; //Create nessesary working directories await Directory('${Directory.current.path}/.tmp').create(); @@ -24,154 +26,204 @@ void main(List arguments) async { await Directory('${Directory.current.path}/.data/files').create(); await DataBaseHelper().initialize(); - var tokenMap = {}; - var controllerMap = >{}; - var listenSocket = await ServerSocket.bind('127.0.0.1', port); + Map> tokenMap = {}; + Map> controllerMap = {}; + var listenSocket = await ServerSocket.bind(address, port); listenSocket.listen( (socket) { var controller = TCPController(socket: socket); - controller.requestStreamBroadcast.listen((request) async { - print('[L] ${request.toJSON}'); - if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) { - if(controllerMap[controller] == null) { - controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))(); - } - request.tokenID = await controllerMap[controller]; - var tokenResponse = TCPResponse( - type: ResponseType.token, - status: ResponseStatus.ok, - body: { - "tokenid": request.tokenID - } - ); - controller.outStream.add(tokenResponse); + controller.responseStreamBroadcast.listen( + null, + onError: (_) { + print('[L] [EXCEPTION]-----------------------'); + print('[L] TCP Controller ran into exception'); + print('[L] Remote: ${controller.socket.remoteAddress}:${controller.socket.remotePort}'); + var token = controllerMap[controller]; + controllerMap.remove(controller); + tokenMap[token]?.remove(controller); } - tokenMap[request.tokenID!] = tokenMap[request.tokenID!] ?? controller; - switch(request.requestType) { - case RequestType.checkState: { - var response = await onCheckState(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.register: { - var response = await onRegister(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.login: { - var response = await onLogin(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.logout: { - var response = await onLogout(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.profile: { - var response = await onFetchProfile(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.modifyProfile: { - var response = await onModifyProfile(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.modifyPassword: { - var response = await onModifyPassword(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.sendMessage: { - //Forword Message - var message = Message.fromJSONObject(request.body); - await DataBaseHelper().setFetchHistoryFor( - tokenID: request.tokenID, - newTimeStamp: message.timestamp - ); - var originUserID = message.senderID; - var onlineDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: originUserID); - for(var device in onlineDevices) { - if(device == request.tokenID) { - continue; + ); + controller.requestStreamBroadcast.listen( + (request) async { + print('[L] [INCOMING ]-----------------------'); + print('[L] Incoming from ${controller.socket.remoteAddress}:${controller.socket.remotePort}'); + print('[L] Message: ${request.toJSON}'); + if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) { + if(controllerMap[controller] == null) { + controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))(); + } + request.tokenID = await controllerMap[controller]; + var tokenResponse = TCPResponse( + type: ResponseType.token, + status: ResponseStatus.ok, + body: { + "tokenid": request.tokenID } - var targetController = tokenMap[device]; - var forwardResponse = TCPResponse( - type: ResponseType.forwardMessage, - status: ResponseStatus.ok, - body: message.jsonObject - ); - targetController?.outStream.add(forwardResponse); - //Update Fetch Histories + ); + controller.outStream.add(tokenResponse); + } + tokenMap[request.tokenID!] ??= []; + if(!tokenMap[request.tokenID]!.contains(controller)) { + tokenMap[request.tokenID]!.add(controller); + } + switch(request.requestType) { + case RequestType.checkState: { + var response = await onCheckState(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.register: { + var response = await onRegister(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.login: { + var response = await onLogin(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.logout: { + var response = await onLogout(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.profile: { + var response = await onFetchProfile(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.modifyProfile: { + var response = await onModifyProfile(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.modifyPassword: { + var response = await onModifyPassword(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.sendMessage: { + //Forword Message + var message = Message.fromJSONObject(request.body); await DataBaseHelper().setFetchHistoryFor( - tokenID: device, + tokenID: request.tokenID, newTimeStamp: message.timestamp ); + var originUserID = message.senderID; + var onlineDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: originUserID); + for(var device in onlineDevices) { + if(device == request.tokenID) { + continue; + } + var targetControllers = tokenMap[device] ?? []; + var forwardResponse = TCPResponse( + type: ResponseType.forwardMessage, + status: ResponseStatus.ok, + body: message.jsonObject + ); + for(var controller in targetControllers) { + try { + controller.outStream.add(forwardResponse); + } catch(e) { + print(e); + } + } + //Update Fetch Histories + await DataBaseHelper().setFetchHistoryFor( + tokenID: device, + newTimeStamp: message.timestamp + ); + } + var targetUserID = message.receiverID; + var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID); + for(var device in targetDevices) { + //Forward to socket + var targetControllers = tokenMap[device] ?? []; + var forwardResponse = TCPResponse( + type: ResponseType.forwardMessage, + status: ResponseStatus.ok, + body: message.jsonObject + ); + for(int i = targetControllers.length - 1; i >= 0; i--) { + var controller = targetControllers[i]; + try{ + print('[L] [MSGFOWARD]-----------------------'); + print('[L] Forwarding message to ${controller.socket.remoteAddress}:${controller.socket.remotePort}'); + controller.outStream.add(forwardResponse); + } catch(e) { + print('[E] [EXCEPTION]-----------------------'); + var token = controllerMap[controller]; + controllerMap.remove(controller); + tokenMap[token]?.remove(controller); + } + } + //Update Fetch Histories + await DataBaseHelper().setFetchHistoryFor( + tokenID: device, + newTimeStamp: message.timestamp + ); + } + var response = await onSendMessage(request, socket); + controller.outStream.add(response); + break; } - var targetUserID = message.receiverID; - var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID); - for(var device in targetDevices) { - //Forward to socket - var targetController = tokenMap[device]; - var forwardResponse = TCPResponse( - type: ResponseType.forwardMessage, - status: ResponseStatus.ok, - body: message.jsonObject - ); - targetController?.outStream.add(forwardResponse); - //Update Fetch Histories - await DataBaseHelper().setFetchHistoryFor( - tokenID: device, - newTimeStamp: message.timestamp - ); + case RequestType.fetchMessage: { + var response = await onFetchMessage(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.findFile: { + var response = await onFindFile(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.fetchFile: { + var response = await onFetchFile(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.searchUser: { + var response = await onSearchUser(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.addContact: { + var response = await onAddContact(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.fetchContact: { + var response = await onFetchContact(request, socket); + controller.outStream.add(response); + break; + } + case RequestType.unknown: { + var response = await onUnknownRequest(request, socket); + controller.outStream.add(response); + break; + } + default: { + print('[E] Drop out of switch case'); } - var response = await onSendMessage(request, socket); - controller.outStream.add(response); - break; } - case RequestType.fetchMessage: { - var response = await onFetchMessage(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.findFile: { - var response = await onFindFile(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.fetchFile: { - var response = await onFetchFile(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.searchUser: { - var response = await onSearchUser(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.addContact: { - var response = await onAddContact(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.fetchContact: { - var response = await onFetchContact(request, socket); - controller.outStream.add(response); - break; - } - case RequestType.unknown: { - var response = await onUnknownRequest(request, socket); - controller.outStream.add(response); - break; - } - default: { - print('[E] Drop out of switch case'); + //Clear temp file + if(request.payload?.existsSync() ?? false) { + request.payload?.delete(); } + }, + onError: (e) { + print(e); + var token = controllerMap[controller]; + controllerMap.remove(controller); + tokenMap[token]?.remove(controller); + }, + onDone: () { + var token = controllerMap[controller]; + controllerMap.remove(controller); + tokenMap[token]?.remove(controller); } - //Clear temp file - request.payload?.delete(); - }); + ); }, + cancelOnError: true ); } diff --git a/lib/database.dart b/lib/database.dart index 9a82628..c07bb29 100644 --- a/lib/database.dart +++ b/lib/database.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-06 16:15:01 - * @LastEditTime : 2022-10-14 12:13:23 + * @LastEditTime : 2022-10-15 11:26:03 * @Description : */ @@ -502,6 +502,16 @@ class DataBaseHelper { var filePath = '${Directory.current.path}/.data/files/$fileMd5'; await tempFile.copy(filePath); try { + var sameFile = await _database.query( + 'files', + where: 'filemd5 = ?', + whereArgs: [ + fileMd5 + ] + ); + if(sameFile.isNotEmpty) { + return; + } await _database.insert( 'files', { @@ -533,7 +543,7 @@ class DataBaseHelper { }) async { var queryResult = await _database.query( 'msgfiles natural join files', - where: 'msgfile.msgmd5 = ?', + where: 'msgfiles.msgmd5 = ?', whereArgs: [ msgMd5 ] diff --git a/lib/requesthandler.dart b/lib/requesthandler.dart index 4e56ae1..5532bc1 100644 --- a/lib/requesthandler.dart +++ b/lib/requesthandler.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-08 20:52:48 - * @LastEditTime : 2022-10-14 15:13:08 + * @LastEditTime : 2022-10-15 00:40:24 * @Description : */ @@ -169,7 +169,7 @@ Future onSendMessage(TCPRequest request, Socket socket) async { return TCPResponse( type: ResponseType.fromRequestType(request.requestType), status: ResponseStatus.err, - errInfo: exception.toString() + errInfo: exception.toString(), ); } } diff --git a/lib/tcpcontroller/controller.dart b/lib/tcpcontroller/controller.dart index 58d06f9..b3c397b 100644 --- a/lib/tcpcontroller/controller.dart +++ b/lib/tcpcontroller/controller.dart @@ -1,7 +1,7 @@ /* * @Author : Linloir * @Date : 2022-10-08 15:10:04 - * @LastEditTime : 2022-10-14 10:23:16 + * @LastEditTime : 2022-10-17 22:53:17 * @Description : */ @@ -24,9 +24,11 @@ class TCPController { //Byte length for subsequent data of the json object int payloadLength = 0; + int _fileCounter = 0; + //Construct a stream which emits events on intact requests - StreamController> _requestRawStreamController = StreamController(); - StreamController _payloadRawStreamController = StreamController(); + final StreamController> _requestRawStreamController = StreamController(); + final StreamController _payloadRawStreamController = StreamController(); //Construct a payload stream which forward the incoming byte into temp file StreamController> _payloadPullStreamController = StreamController()..close(); @@ -41,16 +43,43 @@ class TCPController { //Provide a post stream for caller functions to push to final StreamController _responseStreamController = StreamController(); - StreamSink get outStream => _responseStreamController.sink; + StreamSink get outStream => _responseStreamController; + Stream? _responseStreamBroadcast; + Stream get responseStreamBroadcast { + _responseStreamBroadcast ??= _responseStreamController.stream.asBroadcastStream(); + return _responseStreamBroadcast!; + } TCPController({ required this.socket }) { - socket.listen(_pullRequest); + print('[L] [CONNECTED]-----------------------'); + print('[L] Connection Established'); + print('[L] Remote: ${socket.remoteAddress}:${socket.remotePort}'); + print('[L] Local: ${socket.address}:${socket.port}'); + socket.listen( + _pullRequest, + onError: (e) { + print(e); + _requestStreamController.addError(e); + }, + onDone: () { + print('[L] [CLOSED ]-----------------------'); + print('[L] Connection closed: ${socket.address}:${socket.port}<-${socket.remoteAddress}:${socket.remotePort}'); + _requestStreamController.close(); + }, + cancelOnError: true, + ); //This future never ends, would that be bothersome? Future(() async { - await for(var response in _responseStreamController.stream) { - await socket.addStream(response.stream); + try{ + await for(var response in responseStreamBroadcast) { + await socket.addStream(response.stream); + } + } catch (e) { + print(e); + await socket.flush(); + socket.close(); } }); //This one will fail if two request are handled simultaneously, which cause a stream @@ -62,9 +91,9 @@ class TCPController { var requestQueue = StreamQueue(_requestRawStreamController.stream); var payloadQueue = StreamQueue(_payloadRawStreamController.stream); while(await Future(() => !_requestRawStreamController.isClosed && !_payloadRawStreamController.isClosed)) { - var response = await requestQueue.next; + var request = await requestQueue.next; var payload = await payloadQueue.next; - await _pushRequest(requestBytes: response, tempFile: payload); + await _pushRequest(requestBytes: request, tempFile: payload); } requestQueue.cancel(); payloadQueue.cancel(); @@ -90,7 +119,9 @@ class TCPController { //Create a future that listens to the status of the payload transmission () { var payloadPullStream = _payloadPullStreamController.stream; - var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}')..createSync(); + var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}$_fileCounter')..createSync(); + _fileCounter += 1; + _fileCounter %= 1000; Future(() async { await for(var data in payloadPullStream) { await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true);