Compare commits

..

13 Commits
v1.1.0 ... main

Author SHA1 Message Date
c0fbd69421
Improvement:
- Shorten log when changing avatar
2022-10-23 10:34:59 +08:00
23a0633623
Improvements & Feature:
- Add API ACKFETCH to update fetch history
- Improve anti-exception performance
2022-10-22 21:26:38 +08:00
3e575110a9
Bug Fix:
- Not deleting closed sockets in sokcet map
2022-10-22 20:53:49 +08:00
9825802e06
Bug Fix:
- Execute parameter
2022-10-22 20:15:37 +08:00
125b7ba135
Behavior Adjustment:
- Change path to ${current}/.data/
2022-10-20 22:22:41 +08:00
8627538d31
Behavior update:
- Push contact info to client when adding contact
- Prevent crash when add bytes to closed request stream
2022-10-20 17:35:42 +08:00
325e518a17
Improvements:
- Shorten logs
2022-10-20 11:59:38 +08:00
5909308475
Bug Fix & Improvements
- Improve logs
- Fix fetch message not working
2022-10-20 10:44:09 +08:00
0aa72b5dc3
Docker Update
- Usable docker file!
- docker image now at linloir/lchatserver
2022-10-19 17:01:20 +08:00
68f8d51d3b
Fix Bugs
- Fix bug when receiving file: passing a to-be-cleared buffer by referrence
2022-10-19 11:22:20 +08:00
4504c196f6
Improvements
- add await in file receiving
2022-10-19 00:56:39 +08:00
522e0712e7
API adjustment
- Change payload length indicator bytes length: 4b -> 8b
2022-10-18 15:07:42 +08:00
bdfeb07927
API adjustment
- Return filemd5 on FETCHMSG request
2022-10-18 14:41:14 +08:00
8 changed files with 330 additions and 89 deletions

14
.dockerignore Normal file
View File

@ -0,0 +1,14 @@
# Files and directories created by pub.
Dockerfile
build/
.dart_tool/
.git/
.github/
.gitignore
.packages
.data/
.tmp/
# Conventional directory for build output.
build/

27
Dockerfile Normal file
View File

@ -0,0 +1,27 @@
# Specify the Dart SDK base image version using dart:<version> (ex: dart:2.12)
FROM dart:stable AS compile
# Resolve app dependencies.
WORKDIR /lchatserver
COPY pubspec.* ./
RUN dart pub get
# Copy app source code and AOT compile it.
COPY . .
# Ensure packages are still up-to-date if anything has changed
RUN dart pub get --offline
RUN dart compile exe bin/tcp_server.dart -o bin/tcp_server
FROM ubuntu:latest
RUN apt-get update && apt-get -y install libsqlite3-0 libsqlite3-dev
# Copy the previously built executable into the scratch layer
RUN mkdir /lchatserver
COPY --from=compile /runtime/ /lchatserver/
COPY --from=compile /lchatserver/bin/tcp_server /lchatserver/bin/
# Start server.
EXPOSE 20706
WORKDIR /lchatserver/bin
CMD ["/lchatserver/bin/tcp_server"]

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-06 15:44:16 * @Date : 2022-10-06 15:44:16
* @LastEditTime : 2022-10-17 22:56:11 * @LastEditTime : 2022-10-23 10:33:58
* @Description : * @Description :
*/ */
@ -15,20 +15,21 @@ import 'package:tcp_server/tcpcontroller/request.dart';
import 'package:tcp_server/tcpcontroller/response.dart'; import 'package:tcp_server/tcpcontroller/response.dart';
void main(List<String> arguments) async { void main(List<String> arguments) async {
//Set port
var address = arguments.isEmpty ? '127.0.0.1' : arguments[0];
//Set address //Set address
var port = arguments.length < 2 ? 20706 : int.tryParse(arguments[1]) ?? 20706; var port = arguments.isEmpty ? 20706 : int.tryParse(arguments[0]) ?? 20706;
print('[L] [STARTUP ]-----------------------');
print('[L] Running at directory ${Directory.current.path}');
//Create nessesary working directories //Create nessesary working directories
await Directory('${Directory.current.path}/.tmp').create();
await Directory('${Directory.current.path}/.data').create(); await Directory('${Directory.current.path}/.data').create();
await Directory('${Directory.current.path}/.data/.tmp').create();
await Directory('${Directory.current.path}/.data/files').create(); await Directory('${Directory.current.path}/.data/files').create();
await DataBaseHelper().initialize(); await DataBaseHelper().initialize();
Map<int, List<TCPController>> tokenMap = {}; Map<int, List<TCPController>> tokenMap = {};
Map<TCPController, Future<int>> controllerMap = {}; Map<TCPController, Future<int>> controllerMap = {};
var listenSocket = await ServerSocket.bind(address, port); var listenSocket = await ServerSocket.bind(InternetAddress.anyIPv4, port);
listenSocket.listen( listenSocket.listen(
(socket) { (socket) {
var controller = TCPController(socket: socket); var controller = TCPController(socket: socket);
@ -37,17 +38,31 @@ void main(List<String> arguments) async {
onError: (_) { onError: (_) {
print('[L] [EXCEPTION]-----------------------'); print('[L] [EXCEPTION]-----------------------');
print('[L] TCP Controller ran into exception'); print('[L] TCP Controller ran into exception');
print('[L] Remote: ${controller.socket.remoteAddress}:${controller.socket.remotePort}'); print('[L] socket: ${controller.socket.address}:${controller.socket.port}');
var token = controllerMap[controller]; var token = controllerMap[controller];
controllerMap.remove(controller); controllerMap.remove(controller);
tokenMap[token]?.remove(controller); tokenMap[token]?.remove(controller);
} },
onDone: () {
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
},
cancelOnError: true
); );
controller.requestStreamBroadcast.listen( controller.requestStreamBroadcast.listen(
(request) async { (request) async {
print('[L] [INCOMING ]-----------------------'); print('[L] [INCOMING ]-----------------------');
print('[L] Incoming from ${controller.socket.remoteAddress}:${controller.socket.remotePort}'); print('[L] Incoming from ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
print('[L] Message: ${request.toJSON}'); if(request.requestType == RequestType.sendMessage) {
print('[L] Message: (Message body)');
}
else if(request.requestType == RequestType.modifyProfile) {
print('[L] Profile: (Profile body)');
}
else {
print('[L] Message: ${request.toJSON}');
}
if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) { if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) {
if(controllerMap[controller] == null) { if(controllerMap[controller] == null) {
controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))(); controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))();
@ -69,17 +84,38 @@ void main(List<String> arguments) async {
switch(request.requestType) { switch(request.requestType) {
case RequestType.checkState: { case RequestType.checkState: {
var response = await onCheckState(request, socket); var response = await onCheckState(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.register: { case RequestType.register: {
var response = await onRegister(request, socket); var response = await onRegister(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.login: { case RequestType.login: {
var response = await onLogin(request, socket); var response = await onLogin(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.logout: { case RequestType.logout: {
@ -89,17 +125,38 @@ void main(List<String> arguments) async {
} }
case RequestType.profile: { case RequestType.profile: {
var response = await onFetchProfile(request, socket); var response = await onFetchProfile(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.modifyProfile: { case RequestType.modifyProfile: {
var response = await onModifyProfile(request, socket); var response = await onModifyProfile(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.modifyPassword: { case RequestType.modifyPassword: {
var response = await onModifyPassword(request, socket); var response = await onModifyPassword(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.sendMessage: { case RequestType.sendMessage: {
@ -123,16 +180,22 @@ void main(List<String> arguments) async {
); );
for(var controller in targetControllers) { for(var controller in targetControllers) {
try { try {
print('[L] [MSGFOWARD]-----------------------');
print('[L] Forwarding message to ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
controller.outStream.add(forwardResponse); controller.outStream.add(forwardResponse);
} catch(e) { } catch(e) {
print(e); print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
continue;
} }
} }
//Update Fetch Histories // //Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor( // await DataBaseHelper().setFetchHistoryFor(
tokenID: device, // tokenID: device,
newTimeStamp: message.timestamp // newTimeStamp: message.timestamp
); // );
} }
var targetUserID = message.receiverID; var targetUserID = message.receiverID;
var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID); var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID);
@ -155,51 +218,121 @@ void main(List<String> arguments) async {
var token = controllerMap[controller]; var token = controllerMap[controller];
controllerMap.remove(controller); controllerMap.remove(controller);
tokenMap[token]?.remove(controller); tokenMap[token]?.remove(controller);
continue;
} }
} }
//Update Fetch Histories // //Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor( // await DataBaseHelper().setFetchHistoryFor(
tokenID: device, // tokenID: device,
newTimeStamp: message.timestamp // newTimeStamp: message.timestamp
); // );
} }
var response = await onSendMessage(request, socket); var response = await onSendMessage(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.fetchMessage: { case RequestType.fetchMessage: {
var response = await onFetchMessage(request, socket); var response = await onFetchMessage(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.findFile: { case RequestType.findFile: {
var response = await onFindFile(request, socket); var response = await onFindFile(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.fetchFile: { case RequestType.fetchFile: {
var response = await onFetchFile(request, socket); var response = await onFetchFile(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.searchUser: { case RequestType.searchUser: {
var response = await onSearchUser(request, socket); var response = await onSearchUser(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
case RequestType.addContact: { case RequestType.addContact: {
var response = await onAddContact(request, socket); var response = await onAddContact(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
var contactResponse = await onFetchContact(
TCPRequest.fromData(
type: RequestType.fetchContact,
body: {},
tokenID: request.tokenID
),
socket
);
controller.outStream.add(contactResponse);
break; break;
} }
case RequestType.fetchContact: { case RequestType.fetchContact: {
var response = await onFetchContact(request, socket); var response = await onFetchContact(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.ackFetch: {
onAckFetch(request, socket);
break; break;
} }
case RequestType.unknown: { case RequestType.unknown: {
var response = await onUnknownRequest(request, socket); var response = await onUnknownRequest(request, socket);
controller.outStream.add(response); try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break; break;
} }
default: { default: {

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-06 16:15:01 * @Date : 2022-10-06 16:15:01
* @LastEditTime : 2022-10-15 11:26:03 * @LastEditTime : 2022-10-22 21:08:27
* @Description : * @Description :
*/ */
@ -25,7 +25,7 @@ class DataBaseHelper {
Future<void> initialize() async { Future<void> initialize() async {
_database = await databaseFactoryFfi.openDatabase( _database = await databaseFactoryFfi.openDatabase(
'${Directory.current.path}/.tmp/database.db', '${Directory.current.path}/.data/.tmp/database.db',
options: OpenDatabaseOptions( options: OpenDatabaseOptions(
version: 1, version: 1,
onCreate: (db, version) async { onCreate: (db, version) async {
@ -34,7 +34,7 @@ class DataBaseHelper {
''' '''
CREATE TABLE users ( CREATE TABLE users (
userid integer primary key autoincrement, userid integer primary key autoincrement,
username text not null, username text unique not null,
passwd text not null, passwd text not null,
avatar text avatar text
); );
@ -241,17 +241,29 @@ class DataBaseHelper {
//Insert into users //Insert into users
try { try {
await _database.insert( await _database.transaction((txn) async {
'users', var result = await txn.query(
{ 'users',
'username': identity.userName, where: 'username = ?',
'passwd': identity.userPasswd, whereArgs: [
'avatar': null identity.userName
}, ]
conflictAlgorithm: ConflictAlgorithm.rollback );
); if(result.isNotEmpty) {
} catch (conflict) { throw Exception('Username already exists');
throw Exception(['Database failure', conflict.toString()]); }
await txn.insert(
'users',
{
'username': identity.userName,
'passwd': identity.userPasswd,
'avatar': null
},
conflictAlgorithm: ConflictAlgorithm.rollback
);
});
} catch (e) {
rethrow;
} }
//Get new userid //Get new userid
@ -358,7 +370,16 @@ class DataBaseHelper {
//Fetch unfetched messages //Fetch unfetched messages
var unfetchMsgQueryResult = await _database.query( var unfetchMsgQueryResult = await _database.query(
'msgs', 'msgs left outer join msgfiles on msgs.md5encoded = msgfiles.msgmd5',
columns: [
'msgs.userid as userid',
'msgs.targetid as targetid',
'msgs.contenttype as contenttype',
'msgs.content as content',
'msgs.timestamp as timestamp',
'msgs.md5encoded as md5encoded',
'msgfiles.filemd5 as filemd5'
],
where: '(userid = ? or targetid = ?) and timestamp > ?', where: '(userid = ? or targetid = ?) and timestamp > ?',
whereArgs: [ whereArgs: [
userID, userID,
@ -376,24 +397,25 @@ class DataBaseHelper {
), ),
content: message['content'] as String, content: message['content'] as String,
timestamp: message['timestamp'] as int, timestamp: message['timestamp'] as int,
md5encoded: message['md5encoded'] as String md5encoded: message['md5encoded'] as String,
filemd5: message['filemd5'] as String?
); );
}).toList(); }).toList();
//Set new fetch history //Set new fetch history
if(unfetchMsgQueryResult.isNotEmpty) { // if(unfetchMsgQueryResult.isNotEmpty) {
await _database.update( // await _database.update(
'histories', // 'histories',
{ // {
'lastfetch': unfetchMsgQueryResult[0]['timestamp'] // 'lastfetch': unfetchMsgQueryResult[0]['timestamp']
}, // },
where: 'tokenid = ? and userid = ?', // where: 'tokenid = ? and userid = ?',
whereArgs: [ // whereArgs: [
tokenID, // tokenID,
userID // userID
] // ]
); // );
} // }
//return result //return result
return unfetchMessages; return unfetchMessages;
@ -607,6 +629,10 @@ class DataBaseHelper {
'username': userInfo.userName, 'username': userInfo.userName,
'avatar': userInfo.userAvatar 'avatar': userInfo.userAvatar
}, },
where: 'userid = ?',
whereArgs: [
currentUserID
],
conflictAlgorithm: ConflictAlgorithm.rollback conflictAlgorithm: ConflictAlgorithm.rollback
); );
} catch (conflict) { } catch (conflict) {

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-08 20:52:48 * @Date : 2022-10-08 20:52:48
* @LastEditTime : 2022-10-15 00:40:24 * @LastEditTime : 2022-10-22 20:56:15
* @Description : * @Description :
*/ */
@ -276,6 +276,14 @@ Future<TCPResponse> onFetchContact(TCPRequest request, Socket socket) async {
} }
} }
void onAckFetch(TCPRequest request, Socket socket) async {
//Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor(
tokenID: request.tokenID,
newTimeStamp: request.body['timestamp'] as int,
);
}
Future<TCPResponse> onUnknownRequest(TCPRequest request, Socket socket) async { Future<TCPResponse> onUnknownRequest(TCPRequest request, Socket socket) async {
return TCPResponse( return TCPResponse(
type: ResponseType.fromRequestType(request.requestType), type: ResponseType.fromRequestType(request.requestType),

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-08 15:10:04 * @Date : 2022-10-08 15:10:04
* @LastEditTime : 2022-10-17 22:53:17 * @LastEditTime : 2022-10-22 21:20:57
* @Description : * @Description :
*/ */
@ -57,19 +57,35 @@ class TCPController {
print('[L] Connection Established'); print('[L] Connection Established');
print('[L] Remote: ${socket.remoteAddress}:${socket.remotePort}'); print('[L] Remote: ${socket.remoteAddress}:${socket.remotePort}');
print('[L] Local: ${socket.address}:${socket.port}'); print('[L] Local: ${socket.address}:${socket.port}');
socket.listen( Future(() async {
_pullRequest, try {
onError: (e) { await for(var request in socket) {
print(e); _pullRequest(request);
await Future.delayed(const Duration(microseconds: 0));
}
} catch (e) {
_requestStreamController.addError(e); _requestStreamController.addError(e);
}, _responseStreamController.addError(e);
onDone: () { }
print('[L] [CLOSED ]-----------------------'); }).then((_) {
print('[L] Connection closed: ${socket.address}:${socket.port}<-${socket.remoteAddress}:${socket.remotePort}'); print('[L] [CLOSED ]-----------------------');
_requestStreamController.close(); print('[L] Connection closed: ${socket.address}:${socket.port}');
}, _requestStreamController.close();
cancelOnError: true, _responseStreamController.close();
); });
// socket.listen(
// _pullRequest,
// onError: (e) {
// print(e);
// _requestStreamController.addError(e);
// },
// onDone: () {
// print('[L] [CLOSED ]-----------------------');
// print('[L] Connection closed: ${socket.address}:${socket.port}<-${socket.remoteAddress}:${socket.remotePort}');
// _requestStreamController.close();
// },
// cancelOnError: true,
// );
//This future never ends, would that be bothersome? //This future never ends, would that be bothersome?
Future(() async { Future(() async {
try{ try{
@ -77,7 +93,8 @@ class TCPController {
await socket.addStream(response.stream); await socket.addStream(response.stream);
} }
} catch (e) { } catch (e) {
print(e); print('[E] [EXCEPTION]-----------------------');
print('[E] Adding bytes to socket stream failed');
await socket.flush(); await socket.flush();
socket.close(); socket.close();
} }
@ -108,23 +125,23 @@ class TCPController {
while(true) { while(true) {
if(requestLength == 0 && payloadLength == 0 && _payloadPullStreamController.isClosed) { if(requestLength == 0 && payloadLength == 0 && _payloadPullStreamController.isClosed) {
//New request //New request
if(buffer.length >= 8) { if(buffer.length >= 12) {
//Buffered data has more than 8 bytes, enough to read request length and body length //Buffered data has more than 8 bytes, enough to read request length and body length
requestLength = Uint8List.fromList(buffer.sublist(0, 4)).buffer.asInt32List()[0]; requestLength = Uint8List.fromList(buffer.sublist(0, 4)).buffer.asInt32List()[0];
payloadLength = Uint8List.fromList(buffer.sublist(4, 8)).buffer.asInt32List()[0]; payloadLength = Uint8List.fromList(buffer.sublist(4, 12)).buffer.asInt64List()[0];
//Clear the length indicator bytes //Clear the length indicator bytes
buffer.removeRange(0, 8); buffer.removeRange(0, 12);
//Initialize payload transmission controller //Initialize payload transmission controller
_payloadPullStreamController = StreamController(); _payloadPullStreamController = StreamController();
//Create a future that listens to the status of the payload transmission //Create a future that listens to the status of the payload transmission
() { () {
var payloadPullStream = _payloadPullStreamController.stream; var payloadPullStream = _payloadPullStreamController.stream;
var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}$_fileCounter')..createSync(); var tempFile = File('${Directory.current.path}/.data/.tmp/${DateTime.now().microsecondsSinceEpoch}$_fileCounter')..createSync();
_fileCounter += 1; _fileCounter += 1;
_fileCounter %= 1000; _fileCounter %= 1000;
Future(() async { Future(() async {
await for(var data in payloadPullStream) { await for(var data in payloadPullStream) {
await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true); await tempFile.writeAsBytes(data, mode: FileMode.writeOnlyAppend);
} }
_payloadRawStreamController.add(tempFile); _payloadRawStreamController.add(tempFile);
}); });
@ -160,7 +177,7 @@ class TCPController {
if(buffer.length >= payloadLength) { if(buffer.length >= payloadLength) {
//Last few bytes to emit //Last few bytes to emit
//Send the last few bytes to stream //Send the last few bytes to stream
_payloadPullStreamController.add(Uint8List.fromList(buffer.sublist(0, payloadLength))); _payloadPullStreamController.add(buffer.sublist(0, payloadLength));
//Clear buffer //Clear buffer
buffer.removeRange(0, payloadLength); buffer.removeRange(0, payloadLength);
//Set payload length to zero //Set payload length to zero
@ -171,7 +188,7 @@ class TCPController {
else { else {
//Part of payload //Part of payload
//Transmit all to stream //Transmit all to stream
_payloadPullStreamController.add(Uint8List.fromList(buffer)); _payloadPullStreamController.add([...buffer]);
//Reduce payload bytes left //Reduce payload bytes left
payloadLength -= buffer.length; payloadLength -= buffer.length;
//Clear buffer //Clear buffer
@ -188,6 +205,11 @@ class TCPController {
required List<int> requestBytes, required List<int> requestBytes,
required File tempFile required File tempFile
}) async { }) async {
_requestStreamController.add(TCPRequest(requestBytes, tempFile)); try{
_requestStreamController.add(TCPRequest(requestBytes, tempFile));
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
print('[E] Adding bytes to request stream failed');
}
} }
} }

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-08 15:14:26 * @Date : 2022-10-08 15:14:26
* @LastEditTime : 2022-10-09 22:56:26 * @LastEditTime : 2022-10-22 20:54:40
* @Description : * @Description :
*/ */
import 'dart:convert'; import 'dart:convert';
@ -16,6 +16,7 @@ enum RequestType {
modifyPassword('MODIFYPASSWD'), //Modify user password modifyPassword('MODIFYPASSWD'), //Modify user password
modifyProfile ('MODIFYPROFILE'), //Modify user profile modifyProfile ('MODIFYPROFILE'), //Modify user profile
sendMessage ('SENDMSG'), //Send message sendMessage ('SENDMSG'), //Send message
ackFetch ('ACKFETCH'), //Ack fetched messages, update fetch history
fetchMessage ('FETCHMSG'), //Fetch message fetchMessage ('FETCHMSG'), //Fetch message
findFile ('FINDFILE'), //Find file by md5 before transmitting the file findFile ('FINDFILE'), //Find file by md5 before transmitting the file
fetchFile ('FETCHFILE'), //Fetch file and file md5 by message md5 fetchFile ('FETCHFILE'), //Fetch file and file md5 by message md5
@ -40,6 +41,16 @@ class TCPRequest {
File? payload; File? payload;
TCPRequest(List<int> data, this.payload): _data = jsonDecode(String.fromCharCodes(data)); TCPRequest(List<int> data, this.payload): _data = jsonDecode(String.fromCharCodes(data));
TCPRequest.fromData({
required RequestType type,
required Map<String, Object?> body,
required int? tokenID,
this.payload
}): _data = {
'request': type.value,
'tokenid': tokenID,
'body': body
};
TCPRequest.none(): _data = {}; TCPRequest.none(): _data = {};
String get toJSON => jsonEncode(_data); String get toJSON => jsonEncode(_data);

View File

@ -1,7 +1,7 @@
/* /*
* @Author : Linloir * @Author : Linloir
* @Date : 2022-10-08 22:40:47 * @Date : 2022-10-08 22:40:47
* @LastEditTime : 2022-10-12 13:53:06 * @LastEditTime : 2022-10-18 14:44:49
* @Description : * @Description :
*/ */
@ -76,7 +76,7 @@ class TCPResponse {
int get payloadLength => payloadFile?.lengthSync() ?? 0; int get payloadLength => payloadFile?.lengthSync() ?? 0;
Stream<List<int>> get stream async* { Stream<List<int>> get stream async* {
yield Uint8List(4)..buffer.asInt32List()[0] = responseLength; yield Uint8List(4)..buffer.asInt32List()[0] = responseLength;
yield Uint8List(4)..buffer.asInt32List()[0] = payloadLength; yield Uint8List(8)..buffer.asInt64List()[0] = payloadLength;
yield Uint8List.fromList(responseJson.codeUnits); yield Uint8List.fromList(responseJson.codeUnits);
if(payloadFile != null) { if(payloadFile != null) {
yield await payloadFile!.readAsBytes(); yield await payloadFile!.readAsBytes();