Compare commits

...

13 Commits
v1.1.0 ... main

Author SHA1 Message Date
c0fbd69421
Improvement:
- Shorten log when changing avatar
2022-10-23 10:34:59 +08:00
23a0633623
Improvements & Feature:
- Add API ACKFETCH to update fetch history
- Improve anti-exception performance
2022-10-22 21:26:38 +08:00
3e575110a9
Bug Fix:
- Not deleting closed sockets in sokcet map
2022-10-22 20:53:49 +08:00
9825802e06
Bug Fix:
- Execute parameter
2022-10-22 20:15:37 +08:00
125b7ba135
Behavior Adjustment:
- Change path to ${current}/.data/
2022-10-20 22:22:41 +08:00
8627538d31
Behavior update:
- Push contact info to client when adding contact
- Prevent crash when add bytes to closed request stream
2022-10-20 17:35:42 +08:00
325e518a17
Improvements:
- Shorten logs
2022-10-20 11:59:38 +08:00
5909308475
Bug Fix & Improvements
- Improve logs
- Fix fetch message not working
2022-10-20 10:44:09 +08:00
0aa72b5dc3
Docker Update
- Usable docker file!
- docker image now at linloir/lchatserver
2022-10-19 17:01:20 +08:00
68f8d51d3b
Fix Bugs
- Fix bug when receiving file: passing a to-be-cleared buffer by referrence
2022-10-19 11:22:20 +08:00
4504c196f6
Improvements
- add await in file receiving
2022-10-19 00:56:39 +08:00
522e0712e7
API adjustment
- Change payload length indicator bytes length: 4b -> 8b
2022-10-18 15:07:42 +08:00
bdfeb07927
API adjustment
- Return filemd5 on FETCHMSG request
2022-10-18 14:41:14 +08:00
8 changed files with 330 additions and 89 deletions

14
.dockerignore Normal file
View File

@ -0,0 +1,14 @@
# Files and directories created by pub.
Dockerfile
build/
.dart_tool/
.git/
.github/
.gitignore
.packages
.data/
.tmp/
# Conventional directory for build output.
build/

27
Dockerfile Normal file
View File

@ -0,0 +1,27 @@
# Specify the Dart SDK base image version using dart:<version> (ex: dart:2.12)
FROM dart:stable AS compile
# Resolve app dependencies.
WORKDIR /lchatserver
COPY pubspec.* ./
RUN dart pub get
# Copy app source code and AOT compile it.
COPY . .
# Ensure packages are still up-to-date if anything has changed
RUN dart pub get --offline
RUN dart compile exe bin/tcp_server.dart -o bin/tcp_server
FROM ubuntu:latest
RUN apt-get update && apt-get -y install libsqlite3-0 libsqlite3-dev
# Copy the previously built executable into the scratch layer
RUN mkdir /lchatserver
COPY --from=compile /runtime/ /lchatserver/
COPY --from=compile /lchatserver/bin/tcp_server /lchatserver/bin/
# Start server.
EXPOSE 20706
WORKDIR /lchatserver/bin
CMD ["/lchatserver/bin/tcp_server"]

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-06 15:44:16
* @LastEditTime : 2022-10-17 22:56:11
* @LastEditTime : 2022-10-23 10:33:58
* @Description :
*/
@ -15,20 +15,21 @@ import 'package:tcp_server/tcpcontroller/request.dart';
import 'package:tcp_server/tcpcontroller/response.dart';
void main(List<String> arguments) async {
//Set port
var address = arguments.isEmpty ? '127.0.0.1' : arguments[0];
//Set address
var port = arguments.length < 2 ? 20706 : int.tryParse(arguments[1]) ?? 20706;
var port = arguments.isEmpty ? 20706 : int.tryParse(arguments[0]) ?? 20706;
print('[L] [STARTUP ]-----------------------');
print('[L] Running at directory ${Directory.current.path}');
//Create nessesary working directories
await Directory('${Directory.current.path}/.tmp').create();
await Directory('${Directory.current.path}/.data').create();
await Directory('${Directory.current.path}/.data/.tmp').create();
await Directory('${Directory.current.path}/.data/files').create();
await DataBaseHelper().initialize();
Map<int, List<TCPController>> tokenMap = {};
Map<TCPController, Future<int>> controllerMap = {};
var listenSocket = await ServerSocket.bind(address, port);
var listenSocket = await ServerSocket.bind(InternetAddress.anyIPv4, port);
listenSocket.listen(
(socket) {
var controller = TCPController(socket: socket);
@ -37,17 +38,31 @@ void main(List<String> arguments) async {
onError: (_) {
print('[L] [EXCEPTION]-----------------------');
print('[L] TCP Controller ran into exception');
print('[L] Remote: ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
print('[L] socket: ${controller.socket.address}:${controller.socket.port}');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
},
onDone: () {
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
},
cancelOnError: true
);
controller.requestStreamBroadcast.listen(
(request) async {
print('[L] [INCOMING ]-----------------------');
print('[L] Incoming from ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
if(request.requestType == RequestType.sendMessage) {
print('[L] Message: (Message body)');
}
else if(request.requestType == RequestType.modifyProfile) {
print('[L] Profile: (Profile body)');
}
else {
print('[L] Message: ${request.toJSON}');
}
if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) {
if(controllerMap[controller] == null) {
controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))();
@ -69,17 +84,38 @@ void main(List<String> arguments) async {
switch(request.requestType) {
case RequestType.checkState: {
var response = await onCheckState(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.register: {
var response = await onRegister(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.login: {
var response = await onLogin(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.logout: {
@ -89,17 +125,38 @@ void main(List<String> arguments) async {
}
case RequestType.profile: {
var response = await onFetchProfile(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.modifyProfile: {
var response = await onModifyProfile(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.modifyPassword: {
var response = await onModifyPassword(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.sendMessage: {
@ -123,16 +180,22 @@ void main(List<String> arguments) async {
);
for(var controller in targetControllers) {
try {
print('[L] [MSGFOWARD]-----------------------');
print('[L] Forwarding message to ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
controller.outStream.add(forwardResponse);
} catch(e) {
print(e);
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
continue;
}
}
//Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor(
tokenID: device,
newTimeStamp: message.timestamp
);
// //Update Fetch Histories
// await DataBaseHelper().setFetchHistoryFor(
// tokenID: device,
// newTimeStamp: message.timestamp
// );
}
var targetUserID = message.receiverID;
var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID);
@ -155,51 +218,121 @@ void main(List<String> arguments) async {
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
continue;
}
}
//Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor(
tokenID: device,
newTimeStamp: message.timestamp
);
// //Update Fetch Histories
// await DataBaseHelper().setFetchHistoryFor(
// tokenID: device,
// newTimeStamp: message.timestamp
// );
}
var response = await onSendMessage(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.fetchMessage: {
var response = await onFetchMessage(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.findFile: {
var response = await onFindFile(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.fetchFile: {
var response = await onFetchFile(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.searchUser: {
var response = await onSearchUser(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.addContact: {
var response = await onAddContact(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
var contactResponse = await onFetchContact(
TCPRequest.fromData(
type: RequestType.fetchContact,
body: {},
tokenID: request.tokenID
),
socket
);
controller.outStream.add(contactResponse);
break;
}
case RequestType.fetchContact: {
var response = await onFetchContact(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
case RequestType.ackFetch: {
onAckFetch(request, socket);
break;
}
case RequestType.unknown: {
var response = await onUnknownRequest(request, socket);
try {
controller.outStream.add(response);
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
var token = controllerMap[controller];
controllerMap.remove(controller);
tokenMap[token]?.remove(controller);
}
break;
}
default: {

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-06 16:15:01
* @LastEditTime : 2022-10-15 11:26:03
* @LastEditTime : 2022-10-22 21:08:27
* @Description :
*/
@ -25,7 +25,7 @@ class DataBaseHelper {
Future<void> initialize() async {
_database = await databaseFactoryFfi.openDatabase(
'${Directory.current.path}/.tmp/database.db',
'${Directory.current.path}/.data/.tmp/database.db',
options: OpenDatabaseOptions(
version: 1,
onCreate: (db, version) async {
@ -34,7 +34,7 @@ class DataBaseHelper {
'''
CREATE TABLE users (
userid integer primary key autoincrement,
username text not null,
username text unique not null,
passwd text not null,
avatar text
);
@ -241,7 +241,18 @@ class DataBaseHelper {
//Insert into users
try {
await _database.insert(
await _database.transaction((txn) async {
var result = await txn.query(
'users',
where: 'username = ?',
whereArgs: [
identity.userName
]
);
if(result.isNotEmpty) {
throw Exception('Username already exists');
}
await txn.insert(
'users',
{
'username': identity.userName,
@ -250,8 +261,9 @@ class DataBaseHelper {
},
conflictAlgorithm: ConflictAlgorithm.rollback
);
} catch (conflict) {
throw Exception(['Database failure', conflict.toString()]);
});
} catch (e) {
rethrow;
}
//Get new userid
@ -358,7 +370,16 @@ class DataBaseHelper {
//Fetch unfetched messages
var unfetchMsgQueryResult = await _database.query(
'msgs',
'msgs left outer join msgfiles on msgs.md5encoded = msgfiles.msgmd5',
columns: [
'msgs.userid as userid',
'msgs.targetid as targetid',
'msgs.contenttype as contenttype',
'msgs.content as content',
'msgs.timestamp as timestamp',
'msgs.md5encoded as md5encoded',
'msgfiles.filemd5 as filemd5'
],
where: '(userid = ? or targetid = ?) and timestamp > ?',
whereArgs: [
userID,
@ -376,24 +397,25 @@ class DataBaseHelper {
),
content: message['content'] as String,
timestamp: message['timestamp'] as int,
md5encoded: message['md5encoded'] as String
md5encoded: message['md5encoded'] as String,
filemd5: message['filemd5'] as String?
);
}).toList();
//Set new fetch history
if(unfetchMsgQueryResult.isNotEmpty) {
await _database.update(
'histories',
{
'lastfetch': unfetchMsgQueryResult[0]['timestamp']
},
where: 'tokenid = ? and userid = ?',
whereArgs: [
tokenID,
userID
]
);
}
// if(unfetchMsgQueryResult.isNotEmpty) {
// await _database.update(
// 'histories',
// {
// 'lastfetch': unfetchMsgQueryResult[0]['timestamp']
// },
// where: 'tokenid = ? and userid = ?',
// whereArgs: [
// tokenID,
// userID
// ]
// );
// }
//return result
return unfetchMessages;
@ -607,6 +629,10 @@ class DataBaseHelper {
'username': userInfo.userName,
'avatar': userInfo.userAvatar
},
where: 'userid = ?',
whereArgs: [
currentUserID
],
conflictAlgorithm: ConflictAlgorithm.rollback
);
} catch (conflict) {

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-08 20:52:48
* @LastEditTime : 2022-10-15 00:40:24
* @LastEditTime : 2022-10-22 20:56:15
* @Description :
*/
@ -276,6 +276,14 @@ Future<TCPResponse> onFetchContact(TCPRequest request, Socket socket) async {
}
}
void onAckFetch(TCPRequest request, Socket socket) async {
//Update Fetch Histories
await DataBaseHelper().setFetchHistoryFor(
tokenID: request.tokenID,
newTimeStamp: request.body['timestamp'] as int,
);
}
Future<TCPResponse> onUnknownRequest(TCPRequest request, Socket socket) async {
return TCPResponse(
type: ResponseType.fromRequestType(request.requestType),

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-08 15:10:04
* @LastEditTime : 2022-10-17 22:53:17
* @LastEditTime : 2022-10-22 21:20:57
* @Description :
*/
@ -57,19 +57,35 @@ class TCPController {
print('[L] Connection Established');
print('[L] Remote: ${socket.remoteAddress}:${socket.remotePort}');
print('[L] Local: ${socket.address}:${socket.port}');
socket.listen(
_pullRequest,
onError: (e) {
print(e);
Future(() async {
try {
await for(var request in socket) {
_pullRequest(request);
await Future.delayed(const Duration(microseconds: 0));
}
} catch (e) {
_requestStreamController.addError(e);
},
onDone: () {
_responseStreamController.addError(e);
}
}).then((_) {
print('[L] [CLOSED ]-----------------------');
print('[L] Connection closed: ${socket.address}:${socket.port}<-${socket.remoteAddress}:${socket.remotePort}');
print('[L] Connection closed: ${socket.address}:${socket.port}');
_requestStreamController.close();
},
cancelOnError: true,
);
_responseStreamController.close();
});
// socket.listen(
// _pullRequest,
// onError: (e) {
// print(e);
// _requestStreamController.addError(e);
// },
// onDone: () {
// print('[L] [CLOSED ]-----------------------');
// print('[L] Connection closed: ${socket.address}:${socket.port}<-${socket.remoteAddress}:${socket.remotePort}');
// _requestStreamController.close();
// },
// cancelOnError: true,
// );
//This future never ends, would that be bothersome?
Future(() async {
try{
@ -77,7 +93,8 @@ class TCPController {
await socket.addStream(response.stream);
}
} catch (e) {
print(e);
print('[E] [EXCEPTION]-----------------------');
print('[E] Adding bytes to socket stream failed');
await socket.flush();
socket.close();
}
@ -108,23 +125,23 @@ class TCPController {
while(true) {
if(requestLength == 0 && payloadLength == 0 && _payloadPullStreamController.isClosed) {
//New request
if(buffer.length >= 8) {
if(buffer.length >= 12) {
//Buffered data has more than 8 bytes, enough to read request length and body length
requestLength = Uint8List.fromList(buffer.sublist(0, 4)).buffer.asInt32List()[0];
payloadLength = Uint8List.fromList(buffer.sublist(4, 8)).buffer.asInt32List()[0];
payloadLength = Uint8List.fromList(buffer.sublist(4, 12)).buffer.asInt64List()[0];
//Clear the length indicator bytes
buffer.removeRange(0, 8);
buffer.removeRange(0, 12);
//Initialize payload transmission controller
_payloadPullStreamController = StreamController();
//Create a future that listens to the status of the payload transmission
() {
var payloadPullStream = _payloadPullStreamController.stream;
var tempFile = File('${Directory.current.path}/.tmp/${DateTime.now().microsecondsSinceEpoch}$_fileCounter')..createSync();
var tempFile = File('${Directory.current.path}/.data/.tmp/${DateTime.now().microsecondsSinceEpoch}$_fileCounter')..createSync();
_fileCounter += 1;
_fileCounter %= 1000;
Future(() async {
await for(var data in payloadPullStream) {
await tempFile.writeAsBytes(data, mode: FileMode.append, flush: true);
await tempFile.writeAsBytes(data, mode: FileMode.writeOnlyAppend);
}
_payloadRawStreamController.add(tempFile);
});
@ -160,7 +177,7 @@ class TCPController {
if(buffer.length >= payloadLength) {
//Last few bytes to emit
//Send the last few bytes to stream
_payloadPullStreamController.add(Uint8List.fromList(buffer.sublist(0, payloadLength)));
_payloadPullStreamController.add(buffer.sublist(0, payloadLength));
//Clear buffer
buffer.removeRange(0, payloadLength);
//Set payload length to zero
@ -171,7 +188,7 @@ class TCPController {
else {
//Part of payload
//Transmit all to stream
_payloadPullStreamController.add(Uint8List.fromList(buffer));
_payloadPullStreamController.add([...buffer]);
//Reduce payload bytes left
payloadLength -= buffer.length;
//Clear buffer
@ -188,6 +205,11 @@ class TCPController {
required List<int> requestBytes,
required File tempFile
}) async {
try{
_requestStreamController.add(TCPRequest(requestBytes, tempFile));
} catch (e) {
print('[E] [EXCEPTION]-----------------------');
print('[E] Adding bytes to request stream failed');
}
}
}

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-08 15:14:26
* @LastEditTime : 2022-10-09 22:56:26
* @LastEditTime : 2022-10-22 20:54:40
* @Description :
*/
import 'dart:convert';
@ -16,6 +16,7 @@ enum RequestType {
modifyPassword('MODIFYPASSWD'), //Modify user password
modifyProfile ('MODIFYPROFILE'), //Modify user profile
sendMessage ('SENDMSG'), //Send message
ackFetch ('ACKFETCH'), //Ack fetched messages, update fetch history
fetchMessage ('FETCHMSG'), //Fetch message
findFile ('FINDFILE'), //Find file by md5 before transmitting the file
fetchFile ('FETCHFILE'), //Fetch file and file md5 by message md5
@ -40,6 +41,16 @@ class TCPRequest {
File? payload;
TCPRequest(List<int> data, this.payload): _data = jsonDecode(String.fromCharCodes(data));
TCPRequest.fromData({
required RequestType type,
required Map<String, Object?> body,
required int? tokenID,
this.payload
}): _data = {
'request': type.value,
'tokenid': tokenID,
'body': body
};
TCPRequest.none(): _data = {};
String get toJSON => jsonEncode(_data);

View File

@ -1,7 +1,7 @@
/*
* @Author : Linloir
* @Date : 2022-10-08 22:40:47
* @LastEditTime : 2022-10-12 13:53:06
* @LastEditTime : 2022-10-18 14:44:49
* @Description :
*/
@ -76,7 +76,7 @@ class TCPResponse {
int get payloadLength => payloadFile?.lengthSync() ?? 0;
Stream<List<int>> get stream async* {
yield Uint8List(4)..buffer.asInt32List()[0] = responseLength;
yield Uint8List(4)..buffer.asInt32List()[0] = payloadLength;
yield Uint8List(8)..buffer.asInt64List()[0] = payloadLength;
yield Uint8List.fromList(responseJson.codeUnits);
if(payloadFile != null) {
yield await payloadFile!.readAsBytes();