More code

- TCP repository
This commit is contained in:
Linloir 2022-10-11 17:45:02 +08:00
parent f53debfd14
commit 2bbad43739
No known key found for this signature in database
GPG Key ID: 58EEB209A0F2C366
6 changed files with 267 additions and 13 deletions

View File

@ -10,7 +10,7 @@ import 'dart:convert';
import 'package:crypto/crypto.dart';
import 'package:flutter/foundation.dart';
import 'package:tcp_client/repositories/common_models/json_encodable.dart';
import 'package:tcp_client/repositories/file_repository/models/local_file.dart';
import 'package:tcp_client/repositories/local_service_repository/models/local_file.dart';
enum MessageType {
plaintext('plaintext'),

View File

@ -1,6 +0,0 @@
/*
* @Author : Linloir
* @Date : 2022-10-11 10:54:32
* @LastEditTime : 2022-10-11 10:54:32
* @Description : Repository for local file selecting and fetching
*/

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-11 10:55:36
* @LastEditTime : 2022-10-11 11:22:18
* @LastEditTime : 2022-10-11 17:44:06
* @Description : Local File Model
*/

View File

@ -1,17 +1,18 @@
/*
* @Author : Linloir
* @Date : 2022-10-11 09:44:03
* @LastEditTime : 2022-10-11 16:01:54
* @LastEditTime : 2022-10-11 16:55:08
* @Description : Abstract TCP request class
*/
export 'package:tcp_client/repositories/tcp_repository/models/tcp_request.dart';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:tcp_client/repositories/common_models/message.dart';
import 'package:tcp_client/repositories/common_models/useridentity.dart';
import 'package:tcp_client/repositories/common_models/userinfo.dart';
import 'package:tcp_client/repositories/file_repository/models/local_file.dart';
import 'package:tcp_client/repositories/local_service_repository/models/local_file.dart';
enum TCPRequestType {
checkState ('STATE'), //Check login state for device token
@ -58,6 +59,14 @@ abstract class TCPRequest {
'token': token
});
}
Stream<List<int>> get stream async* {
var jsonString = toJSON();
var requestLength = jsonString.length;
yield Uint8List(4)..buffer.asInt32List()[0] = requestLength;
yield Uint8List(4)..buffer.asInt32List()[0] = 0;
yield Uint8List.fromList(jsonString.codeUnits);
}
}
class CheckStateRequest extends TCPRequest {
@ -143,6 +152,21 @@ class SendMessageRequest extends TCPRequest {
Map<String, Object?> get body => _message.jsonObject;
Message get message => _message;
@override
Stream<List<int>> get stream async* {
var jsonString = toJSON();
var requestLength = jsonString.length;
yield Uint8List(4)..buffer.asInt32List()[0] = requestLength;
yield Uint8List(4)..buffer.asInt32List()[0] = 0;
yield Uint8List.fromList(jsonString.codeUnits);
if(_message.payload != null) {
var fileStream = _message.payload!.file.openRead();
await for(var bytes in fileStream) {
yield bytes;
}
}
}
}
class FetchMessageRequest extends TCPRequest {

View File

@ -7,7 +7,7 @@
import 'package:tcp_client/repositories/common_models/message.dart';
import 'package:tcp_client/repositories/common_models/userinfo.dart';
import 'package:tcp_client/repositories/file_repository/models/local_file.dart';
import 'package:tcp_client/repositories/local_service_repository/models/local_file.dart';
enum TCPResponseType {
token ('TOKEN'), //Only exists when server is sending message

View File

@ -1,6 +1,242 @@
/*
* @Author : Linloir
* @Date : 2022-10-11 09:42:05
* @LastEditTime : 2022-10-11 09:42:05
* @Description :
* @LastEditTime : 2022-10-11 17:41:11
* @Description : TCP repository
*/
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:crypto/crypto.dart';
import 'package:flutter/foundation.dart';
import 'package:tcp_client/repositories/local_service_repository/models/local_file.dart';
import 'package:tcp_client/repositories/tcp_repository/models/tcp_request.dart';
import 'package:tcp_client/repositories/tcp_repository/models/tcp_response.dart';
class TCPRepository {
TCPRepository(this._socket) {
_socket.listen(_pullResponse);
//This future never ends, would that be bothersome?
Future(() async {
await for(var request in _requestStreamController.stream) {
await _socket.addStream(request.stream);
}
});
Future(() async {
await for(var response in _responseRawStreamController.stream) {
var payloadFile = await _payloadRawStreamController.stream.single;
await _pushResponse(responseBytes: response, tempFile: payloadFile);
}
});
}
final Socket _socket;
//Stores the incoming bytes of the TCP connection temporarily
final List<int> buffer = [];
//Byte length for json object
int responseLength = 0;
//Byte length for subsequent data of the json object
int payloadLength = 0;
//Construct a stream which emits events on intact requests
final StreamController<List<int>> _responseRawStreamController = StreamController();
final StreamController<File> _payloadRawStreamController = StreamController();
//Construct a payload stream which forward the incoming byte into temp file
StreamController<List<int>> _payloadPullStreamController = StreamController()..close();
//Provide a response stream for blocs to listen on
final StreamController<TCPResponse> _responseStreamController = StreamController();
Stream<TCPResponse> get responseStream => _responseStreamController.stream;
//Provide a request stream for widgets to push to
final StreamController<TCPRequest> _requestStreamController = StreamController();
Future<void> pushRequest(TCPRequest request) async {
_requestStreamController.add(request);
}
//Listen to the incoming stream and emits event whenever there is a intact request
void _pullResponse(Uint8List fetchedData) {
//Put incoming data into buffer
buffer.addAll(fetchedData);
//Consume buffer until it's not enough for first 8 byte of a message
while(true) {
if(responseLength == 0 && payloadLength == 0 && _payloadPullStreamController.isClosed) {
//New request
if(buffer.length >= 8) {
//Buffered data has more than 8 bytes, enough to read request length and body length
responseLength = Uint8List.fromList(buffer.sublist(0, 4)).buffer.asInt32List()[0];
payloadLength = Uint8List.fromList(buffer.sublist(4, 8)).buffer.asInt32List()[0];
//Clear the length indicator bytes
buffer.removeRange(0, 8);
//Create temp file to read payload (might be huge)
var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}')..createSync();
//Create a pull stream for payload file
_payloadPullStreamController = StreamController();
//Create a future that listens to the status of the payload transmission
Future(() async {
await for(var data in _payloadPullStreamController.stream) {
await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true);
}
_payloadRawStreamController.add(tempFile);
});
}
else {
//Buffered data is not long enough
//Do nothing
break;
}
}
else {
//Currently awaiting full transmission
if(responseLength > 0) {
//Currently processing on a request
if(buffer.length >= responseLength) {
//Got intact request json
//Emit request buffer through stream
_responseRawStreamController.add(buffer.sublist(0, responseLength));
_responseRawStreamController.close();
//Remove proccessed buffer
buffer.removeRange(0, responseLength);
//Clear awaiting request length
responseLength = 0;
}
else {
//Got part of request json
//do nothing
break;
}
}
else {
//Currently processing on a payload
if(buffer.length >= payloadLength) {
//Last few bytes to emit
//Send the last few bytes to stream
_payloadPullStreamController.add(Uint8List.fromList(buffer.sublist(0, payloadLength)));
//Clear buffer
buffer.removeRange(0, payloadLength);
//Set payload length to zero
payloadLength = 0;
//Close the payload transmission stream
_payloadPullStreamController.close();
}
else {
//Part of payload
//Transmit all to stream
_payloadPullStreamController.add(Uint8List.fromList(buffer));
//Reduce payload bytes left
payloadLength -= buffer.length;
//Clear buffer
buffer.clear();
//Exit and wait for another submit
break;
}
}
}
}
}
Future<void> _pushResponse({
required List<int> responseBytes,
required File tempFile
}) async {
var responseJSON = String.fromCharCodes(responseBytes);
var responseObject = jsonDecode(responseJSON);
TCPResponseType responseType = TCPResponseType.fromValue(responseObject['response'] as String);
switch(responseType) {
case TCPResponseType.token: {
await tempFile.delete();
_responseStreamController.add(SetTokenReponse(jsonObject: responseObject));
break;
}
case TCPResponseType.checkState: {
await tempFile.delete();
_responseStreamController.add(CheckStateResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.register: {
await tempFile.delete();
_responseStreamController.add(RegisterResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.login: {
await tempFile.delete();
_responseStreamController.add(LoginResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.logout: {
await tempFile.delete();
_responseStreamController.add(LogoutResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.profile: {
await tempFile.delete();
_responseStreamController.add(GetProfileResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.modifyPassword: {
await tempFile.delete();
_responseStreamController.add(ModifyPasswordResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.modifyProfile: {
await tempFile.delete();
_responseStreamController.add(ModifyProfileResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.sendMessage: {
await tempFile.delete();
_responseStreamController.add(SendMessageResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.forwardMessage: {
await tempFile.delete();
_responseStreamController.add(ForwardMessageResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.fetchMessage: {
await tempFile.delete();
_responseStreamController.add(FetchMessageResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.findFile: {
await tempFile.delete();
_responseStreamController.add(FindFileResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.fetchFile: {
_responseStreamController.add(FetchFileResponse(
jsonObject: responseObject,
payload: LocalFile(
file: tempFile,
filemd5: md5.convert(await tempFile.readAsBytes()).toString()
)
));
break;
}
case TCPResponseType.searchUser: {
await tempFile.delete();
_responseStreamController.add(SearchUserResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.addContact: {
await tempFile.delete();
_responseStreamController.add(AddContactResponse(jsonObject: responseObject));
break;
}
case TCPResponseType.fetchContact: {
await tempFile.delete();
_responseStreamController.add(FetchContactResponse(jsonObject: responseObject));
break;
}
default: {
await tempFile.delete();
break;
}
}
}
}