/* * @Author : Linloir * @Date : 2022-10-11 09:42:05 * @LastEditTime : 2022-10-15 16:50:16 * @Description : TCP repository */ import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:async/async.dart'; import 'package:crypto/crypto.dart'; import 'package:flutter/foundation.dart'; import 'package:path_provider/path_provider.dart'; import 'package:shared_preferences/shared_preferences.dart'; import 'package:tcp_client/repositories/common_models/message.dart'; import 'package:tcp_client/repositories/local_service_repository/models/local_file.dart'; import 'package:tcp_client/repositories/tcp_repository/models/tcp_request.dart'; import 'package:tcp_client/repositories/tcp_repository/models/tcp_response.dart'; class TCPRepository { TCPRepository._internal({ required Socket socket, required String remoteAddress, required int remotePort }): _socket = socket, _remoteAddress = remoteAddress, _remotePort = remotePort { _socket.listen(_pullResponse); //This future never ends, would that be bothersome? Future(() async { await for(var request in _requestStreamController.stream) { await _socket.addStream(request.stream); } }); Future(() async { var responseQueue = StreamQueue(_responseRawStreamController.stream); var payloadQueue = StreamQueue(_payloadRawStreamController.stream); while(await Future(() => !_responseRawStreamController.isClosed && !_payloadRawStreamController.isClosed)) { var response = await responseQueue.next; var payload = await payloadQueue.next; await _pushResponse(responseBytes: response, tempFile: payload); } responseQueue.cancel(); payloadQueue.cancel(); }); } static Future create({ required String serverAddress, required int serverPort }) async { var socket = await Socket.connect(serverAddress, serverPort); return TCPRepository._internal( socket: socket, remoteAddress: serverAddress, remotePort: serverPort ); } Future clone() async { return await TCPRepository.create( serverAddress: _remoteAddress, serverPort: _remotePort ); } final Socket _socket; final String _remoteAddress; final int _remotePort; //Stores the incoming bytes of the TCP connection temporarily final List buffer = []; //Byte length for json object int responseLength = 0; //Byte length for subsequent data of the json object int payloadLength = 0; //Construct a stream which emits events on intact requests final StreamController> _responseRawStreamController = StreamController(); final StreamController _payloadRawStreamController = StreamController(); //Construct a payload stream which forward the incoming byte into temp file StreamController> _payloadPullStreamController = StreamController()..close(); //Provide a response stream for blocs to listen on final StreamController _responseStreamController = StreamController(); Stream? _responseStreamBroadcast; Stream get responseStream => _responseStreamController.stream; Stream get responseStreamBroadcast { _responseStreamBroadcast ??= _responseStreamController.stream.asBroadcastStream(); return _responseStreamBroadcast!; } //Provide a request stream for widgets to push to final StreamController _requestStreamController = StreamController(); void pushRequest(TCPRequest request) { if(request.type == TCPRequestType.sendMessage) { request as SendMessageRequest; if(request.message.type == MessageType.file) { Future(() async { //Duplicate current socket Socket socket = await Socket.connect(_remoteAddress, _remotePort); TCPRepository duplicatedRepository = TCPRepository._internal( socket: socket, remoteAddress: _remoteAddress, remotePort: _remotePort ); duplicatedRepository._requestStreamController.add(request); await for(var response in duplicatedRepository.responseStreamBroadcast) { if(response.type == TCPResponseType.sendMessage) { _responseStreamController.add(response); break; } } duplicatedRepository.dispose(); }); } else { _requestStreamController.add(request); } } else { _requestStreamController.add(request); } } //Listen to the incoming stream and emits event whenever there is a intact request void _pullResponse(Uint8List fetchedData) { //Put incoming data into buffer buffer.addAll(fetchedData); //Consume buffer until it's not enough for first 8 byte of a message while(true) { if(responseLength == 0 && payloadLength == 0 && _payloadPullStreamController.isClosed) { //New request if(buffer.length >= 8) { //Buffered data has more than 8 bytes, enough to read request length and body length responseLength = Uint8List.fromList(buffer.sublist(0, 4)).buffer.asInt32List()[0]; payloadLength = Uint8List.fromList(buffer.sublist(4, 8)).buffer.asInt32List()[0]; //Clear the length indicator bytes buffer.removeRange(0, 8); //Create a pull stream for payload file _payloadPullStreamController = StreamController(); //Create a future that listens to the status of the payload transmission () { var payloadPullStream = _payloadPullStreamController.stream; Future(() async { var documentDirectory = await getApplicationDocumentsDirectory(); //Create temp file to read payload (might be huge) Directory('${documentDirectory.path}/ChatClient').createSync(); Directory('${documentDirectory.path}/ChatClient/.tmp').createSync(); var tempFile = File('${documentDirectory.path}/ChatClient/.tmp/${DateTime.now().microsecondsSinceEpoch}')..createSync(); await for(var data in payloadPullStream) { await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true); } _payloadRawStreamController.add(tempFile); }); }(); } else { //Buffered data is not long enough //Do nothing break; } } else { //Currently awaiting full transmission if(responseLength > 0) { //Currently processing on a request if(buffer.length >= responseLength) { //Got intact request json //Emit request buffer through stream _responseRawStreamController.add(buffer.sublist(0, responseLength)); //Remove proccessed buffer buffer.removeRange(0, responseLength); //Clear awaiting request length responseLength = 0; } else { //Got part of request json //do nothing break; } } else { //Currently processing on a payload if(buffer.length >= payloadLength) { //Last few bytes to emit //Send the last few bytes to stream _payloadPullStreamController.add(Uint8List.fromList(buffer.sublist(0, payloadLength))); //Clear buffer buffer.removeRange(0, payloadLength); //Set payload length to zero payloadLength = 0; //Close the payload transmission stream _payloadPullStreamController.close(); } else { //Part of payload //Transmit all to stream _payloadPullStreamController.add(Uint8List.fromList(buffer)); //Reduce payload bytes left payloadLength -= buffer.length; //Clear buffer buffer.clear(); //Exit and wait for another submit break; } } } } } Future _pushResponse({ required List responseBytes, required File tempFile }) async { if(_responseStreamController.isClosed) { await tempFile.delete(); return; } var responseJSON = String.fromCharCodes(responseBytes); var responseObject = jsonDecode(responseJSON); TCPResponseType responseType = TCPResponseType.fromValue(responseObject['response'] as String); switch(responseType) { case TCPResponseType.token: { await tempFile.delete(); _responseStreamController.add(SetTokenReponse(jsonObject: responseObject)); break; } case TCPResponseType.checkState: { await tempFile.delete(); _responseStreamController.add(CheckStateResponse(jsonObject: responseObject)); break; } case TCPResponseType.register: { await tempFile.delete(); _responseStreamController.add(RegisterResponse(jsonObject: responseObject)); break; } case TCPResponseType.login: { await tempFile.delete(); _responseStreamController.add(LoginResponse(jsonObject: responseObject)); break; } case TCPResponseType.logout: { await tempFile.delete(); _responseStreamController.add(LogoutResponse(jsonObject: responseObject)); break; } case TCPResponseType.profile: { await tempFile.delete(); _responseStreamController.add(GetProfileResponse(jsonObject: responseObject)); break; } case TCPResponseType.modifyPassword: { await tempFile.delete(); _responseStreamController.add(ModifyPasswordResponse(jsonObject: responseObject)); break; } case TCPResponseType.modifyProfile: { await tempFile.delete(); _responseStreamController.add(ModifyProfileResponse(jsonObject: responseObject)); break; } case TCPResponseType.sendMessage: { await tempFile.delete(); _responseStreamController.add(SendMessageResponse(jsonObject: responseObject)); break; } case TCPResponseType.forwardMessage: { await tempFile.delete(); _responseStreamController.add(ForwardMessageResponse(jsonObject: responseObject)); break; } case TCPResponseType.fetchMessage: { await tempFile.delete(); _responseStreamController.add(FetchMessageResponse(jsonObject: responseObject)); break; } case TCPResponseType.findFile: { await tempFile.delete(); _responseStreamController.add(FindFileResponse(jsonObject: responseObject)); break; } case TCPResponseType.fetchFile: { _responseStreamController.add(FetchFileResponse( jsonObject: responseObject, payload: LocalFile( file: tempFile, filemd5: md5.convert(await tempFile.readAsBytes()).toString() ) )); break; } case TCPResponseType.searchUser: { await tempFile.delete(); _responseStreamController.add(SearchUserResponse(jsonObject: responseObject)); break; } case TCPResponseType.addContact: { await tempFile.delete(); _responseStreamController.add(AddContactResponse(jsonObject: responseObject)); break; } case TCPResponseType.fetchContact: { await tempFile.delete(); _responseStreamController.add(FetchContactResponse(jsonObject: responseObject)); break; } default: { await tempFile.delete(); break; } } } Future checkFileExistence({ required LocalFile file }) async { //Duplicate current socket Socket socket = await Socket.connect(_remoteAddress, _remotePort); TCPRepository duplicatedRepository = TCPRepository._internal( socket: socket, remoteAddress: _remoteAddress, remotePort: _remotePort ); var pref = await SharedPreferences.getInstance(); var request = FindFileRequest(file: file, token: pref.getInt('token')!); duplicatedRepository.pushRequest(request); var hasFile = false; await for(var response in duplicatedRepository.responseStream) { if(response.type == TCPResponseType.findFile) { hasFile = response.status == TCPResponseStatus.ok; break; } } duplicatedRepository.dispose(); return hasFile; } void dispose() async { await _socket.flush(); await _socket.close(); _responseRawStreamController.close(); _payloadPullStreamController.close(); _payloadRawStreamController.close(); _responseStreamController.close(); _requestStreamController.close(); } }