- Support one token -> multi sockets
- Tempt to fix concurrency income request causing same temp filename
This commit is contained in:
Linloir 2022-10-18 11:50:14 +08:00
parent 436af5c21a
commit 44ae7faef4
No known key found for this signature in database
GPG Key ID: 58EEB209A0F2C366
4 changed files with 244 additions and 151 deletions

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-06 15:44:16 * @Date : 2022-10-06 15:44:16
* @LastEditTime : 2022-10-14 10:26:00 * @LastEditTime : 2022-10-17 22:56:11
* @Description : * @Description :
*/ */
@ -16,7 +16,9 @@ import 'package:tcp_server/tcpcontroller/response.dart';
void main(List<String> arguments) async { void main(List<String> arguments) async {
//Set port //Set port
var port = arguments.isEmpty ? 20706 : int.tryParse(arguments[0]) ?? 20706; var address = arguments.isEmpty ? '127.0.0.1' : arguments[0];
//Set address
var port = arguments.length < 2 ? 20706 : int.tryParse(arguments[1]) ?? 20706;
//Create nessesary working directories //Create nessesary working directories
await Directory('${Directory.current.path}/.tmp').create(); await Directory('${Directory.current.path}/.tmp').create();
@ -24,154 +26,204 @@ void main(List<String> arguments) async {
await Directory('${Directory.current.path}/.data/files').create(); await Directory('${Directory.current.path}/.data/files').create();
await DataBaseHelper().initialize(); await DataBaseHelper().initialize();
var tokenMap = <int, TCPController>{}; Map<int, List<TCPController>> tokenMap = {};
var controllerMap = <TCPController, Future<int>>{}; Map<TCPController, Future<int>> controllerMap = {};
var listenSocket = await ServerSocket.bind('127.0.0.1', port); var listenSocket = await ServerSocket.bind(address, port);
listenSocket.listen( listenSocket.listen(
(socket) { (socket) {
var controller = TCPController(socket: socket); var controller = TCPController(socket: socket);
controller.requestStreamBroadcast.listen((request) async { controller.responseStreamBroadcast.listen(
print('[L] ${request.toJSON}'); null,
if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) { onError: (_) {
if(controllerMap[controller] == null) { print('[L] [EXCEPTION]-----------------------');
controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))(); print('[L] TCP Controller ran into exception');
} print('[L] Remote: ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
request.tokenID = await controllerMap[controller]; var token = controllerMap[controller];
var tokenResponse = TCPResponse( controllerMap.remove(controller);
type: ResponseType.token, tokenMap[token]?.remove(controller);
status: ResponseStatus.ok,
body: {
"tokenid": request.tokenID
}
);
controller.outStream.add(tokenResponse);
} }
tokenMap[request.tokenID!] = tokenMap[request.tokenID!] ?? controller; );
switch(request.requestType) { controller.requestStreamBroadcast.listen(
case RequestType.checkState: { (request) async {
var response = await onCheckState(request, socket); print('[L] [INCOMING ]-----------------------');
controller.outStream.add(response); print('[L] Incoming from ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
break; print('[L] Message: ${request.toJSON}');
} if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) {
case RequestType.register: { if(controllerMap[controller] == null) {
var response = await onRegister(request, socket); controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))();
controller.outStream.add(response); }
break; request.tokenID = await controllerMap[controller];
} var tokenResponse = TCPResponse(
case RequestType.login: { type: ResponseType.token,
var response = await onLogin(request, socket); status: ResponseStatus.ok,
controller.outStream.add(response); body: {
break; "tokenid": request.tokenID
}
case RequestType.logout: {
var response = await onLogout(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.profile: {
var response = await onFetchProfile(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.modifyProfile: {
var response = await onModifyProfile(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.modifyPassword: {
var response = await onModifyPassword(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.sendMessage: {
//Forword Message
var message = Message.fromJSONObject(request.body);
await DataBaseHelper().setFetchHistoryFor(
tokenID: request.tokenID,
newTimeStamp: message.timestamp
);
var originUserID = message.senderID;
var onlineDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: originUserID);
for(var device in onlineDevices) {
if(device == request.tokenID) {
continue;
} }
var targetController = tokenMap[device]; );
var forwardResponse = TCPResponse( controller.outStream.add(tokenResponse);
type: ResponseType.forwardMessage, }
status: ResponseStatus.ok, tokenMap[request.tokenID!] ??= [];
body: message.jsonObject if(!tokenMap[request.tokenID]!.contains(controller)) {
); tokenMap[request.tokenID]!.add(controller);
targetController?.outStream.add(forwardResponse); }
//Update Fetch Histories switch(request.requestType) {
case RequestType.checkState: {
var response = await onCheckState(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.register: {
var response = await onRegister(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.login: {
var response = await onLogin(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.logout: {
var response = await onLogout(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.profile: {
var response = await onFetchProfile(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.modifyProfile: {
var response = await onModifyProfile(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.modifyPassword: {
var response = await onModifyPassword(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.sendMessage: {
//Forword Message
var message = Message.fromJSONObject(request.body);
await DataBaseHelper().setFetchHistoryFor( await DataBaseHelper().setFetchHistoryFor(
tokenID: device, tokenID: request.tokenID,
newTimeStamp: message.timestamp newTimeStamp: message.timestamp
); );
var originUserID = message.senderID;
var onlineDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: originUserID);
for(var device in onlineDevices) {
if(device == request.tokenID) {
continue;
}
var targetControllers = tokenMap[device] ?? [];
var forwardResponse = TCPResponse(
type: ResponseType.forwardMessage,
status: ResponseStatus.ok,
body: message.jsonObject
);
for(var controller in targetControllers) {
try {
controller.outStream.add(forwardResponse);
} catch(e) {
print(e);
}
}
//Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor(
tokenID: device,
newTimeStamp: message.timestamp
);
}
var targetUserID = message.receiverID;
var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID);
for(var device in targetDevices) {
//Forward to socket
var targetControllers = tokenMap[device] ?? [];
var forwardResponse = TCPResponse(
type: ResponseType.forwardMessage,
status: ResponseStatus.ok,
body: message.jsonObject
);
for(int i = targetControllers.length - 1; i >= 0; i--) {
var controller = targetControllers[i];
try{
print('[L] [MSGFOWARD]-----------------------');
print('[L] Forwarding message to ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
controller.outStream.add(forwardResponse);
} catch(e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
}
//Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor(
tokenID: device,
newTimeStamp: message.timestamp
);
}
var response = await onSendMessage(request, socket);
controller.outStream.add(response);
break;
} }
var targetUserID = message.receiverID; case RequestType.fetchMessage: {
var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID); var response = await onFetchMessage(request, socket);
for(var device in targetDevices) { controller.outStream.add(response);
//Forward to socket break;
var targetController = tokenMap[device]; }
var forwardResponse = TCPResponse( case RequestType.findFile: {
type: ResponseType.forwardMessage, var response = await onFindFile(request, socket);
status: ResponseStatus.ok, controller.outStream.add(response);
body: message.jsonObject break;
); }
targetController?.outStream.add(forwardResponse); case RequestType.fetchFile: {
//Update Fetch Histories var response = await onFetchFile(request, socket);
await DataBaseHelper().setFetchHistoryFor( controller.outStream.add(response);
tokenID: device, break;
newTimeStamp: message.timestamp }
); case RequestType.searchUser: {
var response = await onSearchUser(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.addContact: {
var response = await onAddContact(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.fetchContact: {
var response = await onFetchContact(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.unknown: {
var response = await onUnknownRequest(request, socket);
controller.outStream.add(response);
break;
}
default: {
print('[E] Drop out of switch case');
} }
var response = await onSendMessage(request, socket);
controller.outStream.add(response);
break;
} }
case RequestType.fetchMessage: { //Clear temp file
var response = await onFetchMessage(request, socket); if(request.payload?.existsSync() ?? false) {
controller.outStream.add(response); request.payload?.delete();
break;
}
case RequestType.findFile: {
var response = await onFindFile(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.fetchFile: {
var response = await onFetchFile(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.searchUser: {
var response = await onSearchUser(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.addContact: {
var response = await onAddContact(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.fetchContact: {
var response = await onFetchContact(request, socket);
controller.outStream.add(response);
break;
}
case RequestType.unknown: {
var response = await onUnknownRequest(request, socket);
controller.outStream.add(response);
break;
}
default: {
print('[E] Drop out of switch case');
} }
},
onError: (e) {
print(e);
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
},
onDone: () {
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
} }
//Clear temp file );
request.payload?.delete();
});
}, },
cancelOnError: true
); );
} }

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-06 16:15:01 * @Date : 2022-10-06 16:15:01
* @LastEditTime : 2022-10-14 12:13:23 * @LastEditTime : 2022-10-15 11:26:03
* @Description : * @Description :
*/ */
@ -502,6 +502,16 @@ class DataBaseHelper {
var filePath = '${Directory.current.path}/.data/files/$fileMd5'; var filePath = '${Directory.current.path}/.data/files/$fileMd5';
await tempFile.copy(filePath); await tempFile.copy(filePath);
try { try {
var sameFile = await _database.query(
'files',
where: 'filemd5 = ?',
whereArgs: [
fileMd5
]
);
if(sameFile.isNotEmpty) {
return;
}
await _database.insert( await _database.insert(
'files', 'files',
{ {
@ -533,7 +543,7 @@ class DataBaseHelper {
}) async { }) async {
var queryResult = await _database.query( var queryResult = await _database.query(
'msgfiles natural join files', 'msgfiles natural join files',
where: 'msgfile.msgmd5 = ?', where: 'msgfiles.msgmd5 = ?',
whereArgs: [ whereArgs: [
msgMd5 msgMd5
] ]

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-08 20:52:48 * @Date : 2022-10-08 20:52:48
* @LastEditTime : 2022-10-14 15:13:08 * @LastEditTime : 2022-10-15 00:40:24
* @Description : * @Description :
*/ */
@ -169,7 +169,7 @@ Future<TCPResponse> onSendMessage(TCPRequest request, Socket socket) async {
return TCPResponse( return TCPResponse(
type: ResponseType.fromRequestType(request.requestType), type: ResponseType.fromRequestType(request.requestType),
status: ResponseStatus.err, status: ResponseStatus.err,
errInfo: exception.toString() errInfo: exception.toString(),
); );
} }
} }

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-08 15:10:04 * @Date : 2022-10-08 15:10:04
* @LastEditTime : 2022-10-14 10:23:16 * @LastEditTime : 2022-10-17 22:53:17
* @Description : * @Description :
*/ */
@ -24,9 +24,11 @@ class TCPController {
//Byte length for subsequent data of the json object //Byte length for subsequent data of the json object
int payloadLength = 0; int payloadLength = 0;
int _fileCounter = 0;
//Construct a stream which emits events on intact requests //Construct a stream which emits events on intact requests
StreamController<List<int>> _requestRawStreamController = StreamController(); final StreamController<List<int>> _requestRawStreamController = StreamController();
StreamController<File> _payloadRawStreamController = StreamController(); final StreamController<File> _payloadRawStreamController = StreamController();
//Construct a payload stream which forward the incoming byte into temp file //Construct a payload stream which forward the incoming byte into temp file
StreamController<List<int>> _payloadPullStreamController = StreamController()..close(); StreamController<List<int>> _payloadPullStreamController = StreamController()..close();
@ -41,16 +43,43 @@ class TCPController {
//Provide a post stream for caller functions to push to //Provide a post stream for caller functions to push to
final StreamController<TCPResponse> _responseStreamController = StreamController(); final StreamController<TCPResponse> _responseStreamController = StreamController();
StreamSink<TCPResponse> get outStream => _responseStreamController.sink; StreamSink<TCPResponse> get outStream => _responseStreamController;
Stream<TCPResponse>? _responseStreamBroadcast;
Stream<TCPResponse> get responseStreamBroadcast {
_responseStreamBroadcast ??= _responseStreamController.stream.asBroadcastStream();
return _responseStreamBroadcast!;
}
TCPController({ TCPController({
required this.socket required this.socket
}) { }) {
socket.listen(_pullRequest); print('[L] [CONNECTED]-----------------------');
print('[L] Connection Established');
print('[L] Remote: ${socket.remoteAddress}:${socket.remotePort}');
print('[L] Local: ${socket.address}:${socket.port}');
socket.listen(
_pullRequest,
onError: (e) {
print(e);
_requestStreamController.addError(e);
},
onDone: () {
print('[L] [CLOSED ]-----------------------');
print('[L] Connection closed: ${socket.address}:${socket.port}<-${socket.remoteAddress}:${socket.remotePort}');
_requestStreamController.close();
},
cancelOnError: true,
);
//This future never ends, would that be bothersome? //This future never ends, would that be bothersome?
Future(() async { Future(() async {
await for(var response in _responseStreamController.stream) { try{
await socket.addStream(response.stream); await for(var response in responseStreamBroadcast) {
await socket.addStream(response.stream);
}
} catch (e) {
print(e);
await socket.flush();
socket.close();
} }
}); });
//This one will fail if two request are handled simultaneously, which cause a stream //This one will fail if two request are handled simultaneously, which cause a stream
@ -62,9 +91,9 @@ class TCPController {
var requestQueue = StreamQueue(_requestRawStreamController.stream); var requestQueue = StreamQueue(_requestRawStreamController.stream);
var payloadQueue = StreamQueue(_payloadRawStreamController.stream); var payloadQueue = StreamQueue(_payloadRawStreamController.stream);
while(await Future<bool>(() => !_requestRawStreamController.isClosed && !_payloadRawStreamController.isClosed)) { while(await Future<bool>(() => !_requestRawStreamController.isClosed && !_payloadRawStreamController.isClosed)) {
var response = await requestQueue.next; var request = await requestQueue.next;
var payload = await payloadQueue.next; var payload = await payloadQueue.next;
await _pushRequest(requestBytes: response, tempFile: payload); await _pushRequest(requestBytes: request, tempFile: payload);
} }
requestQueue.cancel(); requestQueue.cancel();
payloadQueue.cancel(); payloadQueue.cancel();
@ -90,7 +119,9 @@ class TCPController {
//Create a future that listens to the status of the payload transmission //Create a future that listens to the status of the payload transmission
() { () {
var payloadPullStream = _payloadPullStreamController.stream; var payloadPullStream = _payloadPullStreamController.stream;
var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}')..createSync(); var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}$_fileCounter')..createSync();
_fileCounter += 1;
_fileCounter %= 1000;
Future(() async { Future(() async {
await for(var data in payloadPullStream) { await for(var data in payloadPullStream) {
await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true); await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true);