Improvements:

- Auto Reconnect
This commit is contained in:
Linloir 2022-10-22 17:54:11 +08:00
parent ba59d23484
commit a758929b46
No known key found for this signature in database
GPG Key ID: 58EEB209A0F2C366
3 changed files with 190 additions and 25 deletions

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-13 14:02:28
* @LastEditTime : 2022-10-21 23:31:43
* @LastEditTime : 2022-10-22 01:20:14
* @Description :
*/
@ -47,8 +47,8 @@ class HomeCubit extends Cubit<HomeState> {
await for(var response in tcpRepository.responseStreamBroadcast) {
if(response.type == TCPResponseType.fetchMessage) {
if(response.status == TCPResponseStatus.ok) {
response as FetchMessageResponse;
localServiceRepository.storeMessages(response.messages);
// response as FetchMessageResponse;
// localServiceRepository.storeMessages(response.messages);
break;
}
}

View File

@ -1,12 +1,15 @@
/*
* @Author : Linloir
* @Date : 2022-10-11 10:56:02
* @LastEditTime : 2022-10-21 22:47:38
* @LastEditTime : 2022-10-22 01:22:58
* @Description : Local Service Repository
*/
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:convert/convert.dart';
import 'package:crypto/crypto.dart';
import 'package:file_picker/file_picker.dart';
import 'package:path_provider/path_provider.dart';
import 'package:shared_preferences/shared_preferences.dart';
@ -54,6 +57,22 @@ class LocalServiceRepository {
);
'''
);
await txn.execute(
'''
create table msgimgs (
msgmd5 text primary key,
imgmd5 text not null
);
'''
);
await txn.execute(
'''
create table imgs (
imgmd5 text primary key,
dir text not null
);
'''
);
});
// await db.execute(
// '''
@ -105,12 +124,26 @@ class LocalServiceRepository {
Future<void> storeMessages(List<Message> messages) async {
await _database.transaction((txn) async {
for(var message in messages) {
if(message.type == MessageType.image) {
//store image first
storeImage(
image: base64.decode(message.contentDecoded),
msgmd5: message.contentmd5
);
await txn.insert(
'msgs',
message.jsonObject..['content'] = "",
conflictAlgorithm: ConflictAlgorithm.replace
);
}
else {
await txn.insert(
'msgs',
message.jsonObject,
conflictAlgorithm: ConflictAlgorithm.replace
);
}
}
});
}
@ -135,6 +168,20 @@ class LocalServiceRepository {
for(var rawMessage in rawMessages) {
var message = Message.fromJSONObject(jsonObject: rawMessage);
if(message.contentDecoded.toLowerCase().contains(pattern.toLowerCase())) {
//Since history page does not show message
//There is no need to fetch message here
// if(message.type == MessageType.image) {
// var image = await fetchImage(msgmd5: message.contentmd5);
// if(image != null) {
// alikeMessages.add(message.copyWith(
// content: base64.encode(image),
// ));
// continue;
// }
// else {
// //TODO: do something
// }
// }
alikeMessages.add(message);
}
}
@ -160,6 +207,8 @@ class LocalServiceRepository {
messages.add([]);
}
else {
//Since message page does not show message
//There is no need to fetch message here
messages.add([Message.fromJSONObject(jsonObject: queryResult[0])]);
}
}
@ -185,7 +234,18 @@ class LocalServiceRepository {
limit: num,
offset: position
);
return queryResult.map((e) => Message.fromJSONObject(jsonObject: e)).toList();
List<Message> messages = [];
for(var result in queryResult) {
var message = Message.fromJSONObject(jsonObject: result);
if(message.type == MessageType.image) {
var image = await fetchImage(msgmd5: message.contentmd5);
if(image != null) {
message = message.copyWith(content: base64.encode(image));
}
}
messages.add(message);
}
return messages;
}
Future<File?> findFile({required String filemd5, required String fileName}) async {
@ -340,4 +400,59 @@ class LocalServiceRepository {
return null;
}
}
Future<void> storeImage({required List<int> image, required String msgmd5}) async {
var md5Output = AccumulatorSink<Digest>();
ByteConversionSink md5Input = md5.startChunkedConversion(md5Output);
md5Input.add(image);
md5Input.close();
var imagemd5 = md5Output.events.single.toString();
//Write to image library
var documentPath = (await getApplicationDocumentsDirectory()).path;
await Directory('$documentPath/LChatClient/imgs').create();
var permanentFilePath = '$documentPath/LChatClient/imgs/$imagemd5';
var imageFile = await File(permanentFilePath).create();
imageFile.writeAsBytes(image);
await _database.transaction((txn) async {
txn.insert(
'msgimgs',
{
'msgmd5': msgmd5,
'imgmd5': imagemd5
},
conflictAlgorithm: ConflictAlgorithm.replace
);
txn.insert(
'imgs',
{
'imgmd5': imagemd5,
'dir': permanentFilePath
},
conflictAlgorithm: ConflictAlgorithm.replace
);
});
}
Future<List<int>?> fetchImage({required String msgmd5}) async {
var imageQueryResult = await _database.query(
'msgimgs natural join imgs',
where: 'msgimgs.msgmd5 = ?',
whereArgs: [
msgmd5
],
columns: [
'imgs.dir as dir'
]
);
if(imageQueryResult.isEmpty) {
return null;
}
var path = imageQueryResult[0]['dir'] as String;
var image = File(path);
if(!await image.exists()) {
return null;
}
var imageContent = await image.readAsBytes();
return imageContent;
}
}

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-11 09:42:05
* @LastEditTime : 2022-10-19 10:41:43
* @LastEditTime : 2022-10-22 17:46:28
* @Description : TCP repository
*/
@ -26,13 +26,25 @@ class TCPRepository {
required int remotePort
}): _socket = socket, _remoteAddress = remoteAddress, _remotePort = remotePort {
Future(() async {
while(true) {
try{
await for(var response in _socket) {
await for(var response in _socket!) {
_pullResponse(response);
await Future.delayed(const Duration(microseconds: 0));
}
break;
} catch(e) {
_socket.close();
_socket?.close();
_socket = null;
while(true) {
try{
_socket = await Socket.connect(remoteAddress, remotePort);
break;
} catch (e) {
continue;
}
}
}
}
// _responseRawStreamController.close();
// _payloadPullStreamController.close();
@ -42,10 +54,40 @@ class TCPRepository {
});
//This future never ends, would that be bothersome?
Future(() async {
await for(var request in _requestStreamController.stream) {
await _socket.addStream(request.stream);
TCPRequest? failedRequest;
while(true) {
try{
if(failedRequest != null) {
await Future.doWhile(() async {
await Future.delayed(const Duration(microseconds: 0));
return _socket == null;
});
await _socket!.addStream(failedRequest.stream);
}
}).onError((error, stackTrace) {_socket.close();});
await for(var request in _requestStreamController.stream) {
failedRequest = request;
await Future.doWhile(() async {
await Future.delayed(const Duration(microseconds: 0));
return _socket == null;
});
await _socket!.addStream(request.stream);
failedRequest = null;
}
break;
} catch (e) {
_socket?.close();
_socket = null;
while(true) {
try{
_socket = await Socket.connect(remoteAddress, remotePort);
break;
} catch (e) {
continue;
}
}
}
}
});
Future(() async {
var responseQueue = StreamQueue(_responseRawStreamController.stream);
var payloadQueue = StreamQueue(_payloadRawStreamController.stream);
@ -56,14 +98,22 @@ class TCPRepository {
}
responseQueue.cancel();
payloadQueue.cancel();
}).onError((error, stackTrace) {_socket.close();});
}).onError((error, stackTrace) {_socket?.close();});
}
static Future<TCPRepository> create({
required String serverAddress,
required int serverPort
}) async {
var socket = await Socket.connect(serverAddress, serverPort);
Socket socket;
while(true) {
try{
socket = await Socket.connect(serverAddress, serverPort);
break;
} catch (e) {
continue;
}
}
return TCPRepository._internal(
socket: socket,
remoteAddress: serverAddress,
@ -78,7 +128,7 @@ class TCPRepository {
);
}
final Socket _socket;
Socket? _socket;
final String _remoteAddress;
final int _remotePort;
@ -359,7 +409,7 @@ class TCPRepository {
}
void dispose() async {
await _socket.flush();
await _socket.close();
await _socket?.flush();
await _socket?.close();
}
}