2021-10-30 17:26:41 +00:00
import * as electronRemote from '@electron/remote' ;
const electronConsole = electronRemote . getGlobal ( 'console' ) as Console ;
import Logger from '../../logger/logger' ;
2021-11-02 04:29:24 +00:00
const LOG = Logger . create ( __filename , electronConsole ) ;
2021-10-30 17:26:41 +00:00
import * as crypto from 'crypto' ;
import { EventEmitter } from 'events' ;
import * as socketio from 'socket.io-client' ;
import ConcurrentQueue from '../../concurrent-queue/concurrent-queue' ;
import { Message , Member , Channel , Changes , ConnectionInfo , CacheServerData , ServerMetaData , ServerConfig , ShouldNeverHappenError , Token } from './data-types' ;
import DBCache from './db-cache' ;
import ResourceRAMCache from './resource-ram-cache' ;
import RecentMessageRAMCache from './message-ram-cache' ;
2021-11-07 20:13:59 +00:00
import { channel } from 'diagnostics_channel' ;
2021-10-30 17:26:41 +00:00
// Events:
// 'connected' function() called when connected to the server
// 'disconnected' function() called when connection to the server is lost
// 'verified' function() called when verification handshake is completed
// 'update-server' function(serverData) called when the server metadata updates
// 'deleted-members' function(members) called when members were deleted on the server-side
2021-11-07 20:13:59 +00:00
// 'updated-members' function(data: Array [ { oldMember, newMember } ]) called when a member was updated on the server-side
2021-10-30 17:26:41 +00:00
// 'added-members' function(members) called when members were added on the server-side
// 'deleted-channels' function(channels) called when channels were deleted on the server-side
2021-11-07 20:13:59 +00:00
// 'updated-channels' function(data: Array [ { oldChannel, newChannel } ]) called when a channel was updated on the server-side
2021-10-30 17:26:41 +00:00
// 'added-channels' function(channels) called when channels are added on the server-side
// 'new-message' function(message) called when a message is received in a channel
// 'deleted-messages' function(messages) called when messages were deleted on the server-side
// 'updated-messages' function(data: Array [ { oldMessage, newMessage } ]) called when messages were updated on the server-side
// 'added-messages' function(addedAfter : Map<messageId -> addedMessage>, addedBefore : <messageId -> addedMessage>) called when messages were added on the server-side (that were not in our client-side db).
// Functions:
// these three have grab counterparts
// async fetchMetadata() Fetches the server information as { name, icon_resource_id }
// async fetchMembers() Fetches the server members as [ { id, display_name, avatar_resource_id }, ... ]
// async fetchChannels() Fetches the available channels as [ { id, name }, ... ]
// async fetchMessagesRecent(channelId, number) Fetches the most recent number messages in a channel [ Message, ... ]
// async fetchMessagesBefore(channelId, messageId, number) Fetches number messages before a specified message [ Message, ... ]
// async fetchMessagesAfter(channelId, messageId, number) Fetches number messages after a specified message [ Message, ... ]
// TODO: change this to grab
// async fetchResource(resourceId) Fetches the resource associated with a specified resrouceId
// async sendMessage(channelId, text) Sends a text message to a channel, returning the sent Message
// async sendMessageWithResource(channelId, text, resource, resourceName) Sends a text message with a resource to a channel, returning the sent Message. text is optional
// async setStatus(status) Set the current logged in user's status
// async setDisplayName(displayName) Sets the current logged in user's display name
2021-11-07 20:13:59 +00:00
// async setAvatar(avatarBuff)
2021-10-30 17:26:41 +00:00
// async updateChannel(channelId, name, flavorText) Updates a channel's name and flavor text
// async queryTokens() Queries for the login tokens as [ { token, member_id, created, expires }, ... ]
// async revokeToken(token) Revokes an outstanding token
interface FetchCachedAndVerifyProps < ClientType , ServerType > {
2021-11-07 16:50:30 +00:00
lock? : ConcurrentQueue < void > ,
serverFunc : ( ( ) = > Promise < ServerType > )
cacheFunc : ( ( ) = > Promise < ClientType | null > ) ,
cacheUpdateFunc : ( ( cacheData : ClientType | null , serverData : ServerType ) = > Promise < boolean > ) , // returns true if changes were made to the cache
updateEventName? : string | null ,
2021-10-30 17:26:41 +00:00
}
export default class ClientController extends EventEmitter {
2021-11-07 16:50:30 +00:00
public id : string ;
public memberId : string ;
public url : string ;
public publicKey : crypto.KeyObject ;
public privateKey : crypto.KeyObject ;
public serverCert : string
public socket : socketio.Socket ;
private _metadata : ServerMetaData | CacheServerData | null ;
private _recentMessages : RecentMessageRAMCache ;
public channels : Map < string , Channel > ;
public members : Map < string , Member > ;
public channelsLock : ConcurrentQueue < void > ;
public membersLock : ConcurrentQueue < void > ;
public isVerified : boolean ;
public resourceCallbacks : Map < string , ( ( err : any , resourceBuff : Buffer | null ) = > Promise < void > | void ) [ ] > ;
public dedupedCallbacks : Map < string , ( ( ) = > Promise < void > | void ) [ ] > ;
constructor ( config : ServerConfig ) {
super ( ) ;
// TODO: fetch these from the cache when they are needed rather than storing them in memory (especially private key)
let publicKey = typeof config . publicKey === 'string' ? crypto . createPublicKey ( config . publicKey ) : config . publicKey ;
let privateKey = typeof config . privateKey === 'string' ? crypto . createPrivateKey ( config . privateKey ) : config . privateKey ;
this . id = config . serverId + '' ; // this is the client-side server id (from Cache)
this . memberId = config . memberId ;
this . url = config . url ;
this . publicKey = publicKey ;
this . privateKey = privateKey ;
this . serverCert = config . serverCert ;
this . socket = socketio . connect ( this . url , {
forceNew : true ,
ca : this.serverCert , // this provides identity verification for the server
} ) ;
this . _metadata = null ; // use grabMetadata();
this . _recentMessages = new RecentMessageRAMCache ( ) ; // use grabRecentMessages();
// These are used to make message objects more useful
// TODO: make these private
this . channels = new Map < string , Channel > ( ) ; // use grabChannels();
this . members = new Map < string , Member > ( ) ; // use grabMembers();
this . channelsLock = new ConcurrentQueue < void > ( 1 ) ;
this . membersLock = new ConcurrentQueue < void > ( 1 ) ;
this . isVerified = false ;
this . resourceCallbacks = new Map < string , ( ( err : any , resourceBuff : Buffer | null ) = > Promise < void > | void ) [ ] > ( ) ; // resourceId -> [ callbackFunc, ... ];
this . dedupedCallbacks = new Map < string , ( ( ) = > Promise < void > | void ) [ ] > ( ) ; // dedupeId -> [ callbackFunc, ... ];
this . _bindSocket ( ) ;
this . _bindInternalEvents ( ) ;
}
_bindSocket ( ) : void {
this . socket . on ( 'connect' , async ( ) = > {
LOG . info ( 'connected to server#' + this . id ) ;
this . emit ( 'connected' ) ;
// TODO: re-verify on verification failure?
await this . verify ( ) ;
} ) ;
this . socket . on ( 'disconnect' , ( ) = > {
LOG . info ( 'disconnected from server#' + this . id ) ;
this . isVerified = false ;
this . _metadata = null ;
this . _recentMessages . clear ( ) ;
this . channels . clear ( ) ;
this . members . clear ( ) ;
this . emit ( 'disconnected' ) ;
} ) ;
this . socket . on ( 'new-message' , async ( dataMessage ) = > {
await this . ensureMembers ( ) ;
await this . ensureChannels ( ) ;
let message = Message . fromDBData ( dataMessage , this . members , this . channels ) ;
await DBCache . upsertServerMessages ( this . id , message . channel . id , [ message ] ) ;
LOG . info ( message . toString ( ) ) ;
this . _recentMessages . addNewMessage ( this . id , message ) ;
this . emit ( 'new-message' , message ) ;
} ) ;
this . socket . on ( 'update-member' , async ( member ) = > {
await this . ensureMembers ( ) ;
let oldMember = this . members . get ( member . id ) ;
if ( oldMember ) {
2021-11-07 20:13:59 +00:00
this . emit ( 'updated-members' , [ { oldMember : oldMember , newMember : member } ] ) ;
2021-11-07 16:50:30 +00:00
} else {
this . emit ( 'added-members' , [ member ] ) ;
}
} ) ;
this . socket . on ( 'update-channel' , async ( channel ) = > {
await this . ensureChannels ( ) ;
let oldChannel = this . channels . get ( channel . id ) ;
if ( oldChannel ) {
2021-11-07 20:13:59 +00:00
this . emit ( 'updated-channels' , [ { oldChannel : oldChannel , newChannel : channel } ] ) ;
2021-11-07 16:50:30 +00:00
} else {
this . emit ( 'added-channels' , [ channel ] ) ;
}
} ) ;
this . socket . on ( 'new-channel' , async ( channel ) = > {
await this . ensureChannels ( ) ;
this . emit ( 'added-channels' , [ channel ] ) ;
} ) ;
this . socket . on ( 'update-server' , async ( serverMeta ) = > {
await DBCache . updateServer ( this . id , serverMeta ) ;
this . emit ( 'update-server' , serverMeta ) ;
} ) ;
}
_bindInternalEvents ( ) : void {
this . on ( 'added-members' , async ( members : Member [ ] ) = > {
for ( let member of members ) {
this . members . set ( member . id , member ) ;
}
await DBCache . updateServerMembers ( this . id , Array . from ( this . members . values ( ) ) ) ;
} ) ;
2021-11-07 20:13:59 +00:00
this . on ( 'updated-members' , async ( data : { oldMember : Member , newMember : Member } [ ] ) = > {
for ( const { oldMember , newMember } of data ) {
this . members . set ( newMember . id , newMember ) ;
2021-11-07 16:50:30 +00:00
}
await DBCache . updateServerMembers ( this . id , Array . from ( this . members . values ( ) ) ) ;
} ) ;
this . on ( 'deleted-members' , async ( members : Member [ ] ) = > {
for ( let member of members ) {
this . members . delete ( member . id ) ;
}
await DBCache . updateServerMembers ( this . id , Array . from ( this . members . values ( ) ) ) ;
} ) ;
this . on ( 'added-channels' , async ( channels : Channel [ ] ) = > {
for ( let channel of channels ) {
this . channels . set ( channel . id , channel ) ;
}
await DBCache . updateServerChannels ( this . id , Array . from ( this . channels . values ( ) ) ) ;
} ) ;
2021-11-07 20:13:59 +00:00
this . on ( 'updated-channels' , async ( data : { oldChannel : Channel , newChannel : Channel } [ ] ) = > {
for ( const { oldChannel , newChannel } of data ) {
this . channels . set ( newChannel . id , newChannel ) ;
2021-11-07 16:50:30 +00:00
}
await DBCache . updateServerChannels ( this . id , Array . from ( this . channels . values ( ) ) ) ;
} ) ;
this . on ( 'deleted-channels' , async ( channels : Channel [ ] ) = > {
for ( let channel of channels ) {
this . channels . delete ( channel . id ) ;
}
await DBCache . updateServerChannels ( this . id , Array . from ( this . channels . values ( ) ) ) ;
} ) ;
this . on ( 'added-messages' , async ( channel : Channel , addedAfter : Map < string , Message > , addedBefore : Map < string , Message > ) = > {
// Adding messages is surprisingly complicated (see script.js's added-messages function)
// so we can just drop the channel and refresh it once if any messages got added while
// we were gone. Further, it is probably best to make 100% sure that the script.js
// implementation is correct before copying it elsewhere. (I'm 90% sure it is correct)
// Getting this correct and actually implementing the added-messages feature in the
// RAM cache would be useful for first-time joiners to a server, getting the channel
// messages for the first time
// Alternatively, just store the date in the message and use order-by
this . _recentMessages . dropChannel ( this . id , channel . id ) ;
} ) ;
2021-11-07 20:13:59 +00:00
this . on ( 'updated-messages' , async ( channel : Channel , data : { oldMessage : Message , newMessage : Message } [ ] ) = > {
for ( let { oldMessage , newMessage } of data ) {
this . _recentMessages . updateMessage ( this . id , oldMessage , newMessage ) ;
2021-11-07 16:50:30 +00:00
}
} ) ;
2021-11-07 20:13:59 +00:00
this . on ( 'deleted-messages' , async ( _channel : Channel , messages : Message [ ] ) = > {
2021-11-07 16:50:30 +00:00
for ( let message of messages ) {
this . _recentMessages . deleteMessage ( this . id , message ) ;
}
} ) ;
}
_updateCachedMembers ( members : Member [ ] ) : void {
this . members . clear ( ) ;
for ( let member of members ) {
this . members . set ( member . id , member ) ;
}
}
_updateCachedChannels ( channels : Channel [ ] ) : void {
this . channels . clear ( ) ;
for ( let channel of channels ) {
this . channels . set ( channel . id , channel ) ;
}
}
/ * *
* Fetches data from the server but returns cached data if it already exists . If the server data comes back different
* from the cached data , emits an event with the server data
* @param lock A locking enqueue function to make sure that the cache data does not change while waiting for the server data to be requested .
* @param serverFunc A function ( ) that returns the data from the server
* @param cacheFunc A function ( ) that returns the data from the cache ( returns null if no data )
* @param cacheUpdateFunc A function ( cacheData , serverData ) is called after the server data is fetched . It should be used to update the data in the cache
* @param updateEventName The name of the event to emit when the server data is different from the cache data ( and the cache data is not null ) , null for no event
* /
async _fetchCachedAndVerify < T , U > ( props : FetchCachedAndVerifyProps < T , U > ) : Promise < T | U > {
const { lock , serverFunc , cacheFunc , cacheUpdateFunc , updateEventName } = props ;
return await new Promise ( async ( resolve , reject ) = > {
let func = async ( ) = > {
try {
let serverPromise = serverFunc ( ) ;
let cacheData = await cacheFunc ( ) ;
if ( cacheData !== null ) {
resolve ( cacheData ) ;
}
let serverData : U ;
try {
serverData = await serverPromise ;
} catch ( e ) {
if ( cacheData !== null ) {
// Print an error if this was already resolved
LOG . warn ( 'Error fetching server data:' , e ) ;
return ;
} else {
throw e ;
}
}
// make sure the 'added/deleted/updated' events get run before returning the server data
try {
let changesMade = await cacheUpdateFunc ( cacheData , serverData ) ;
if ( updateEventName && cacheData != null && changesMade ) {
this . emit ( updateEventName , serverData ) ;
}
} catch ( e ) {
LOG . error ( 'error handling cache update' , e ) ;
}
if ( cacheData == null ) {
resolve ( serverData ) ;
}
} catch ( e ) {
reject ( e ) ;
}
} ;
if ( lock ) {
await lock . push ( func ) ;
} else {
await func ( ) ;
}
} ) ;
}
// This function is a promise for error stack tracking purposes.
async _socketEmitTimeout ( ms : number , name : string , . . . args : any [ ] ) : Promise < void > {
return new Promise < void > ( ( resolve ) = > {
let socketArgs = args . slice ( 0 , args . length - 1 ) ;
let respond = args [ args . length - 1 ] ;
let cutoff = false ;
let timeout = setTimeout ( ( ) = > {
cutoff = true ;
respond ( 'emit timeout' ) ;
resolve ( ) ;
} , ms ) ;
this . socket . emit ( name , . . . socketArgs , ( . . . respondArgs : any [ ] ) = > {
if ( cutoff ) {
return ;
}
clearTimeout ( timeout ) ;
respond ( . . . respondArgs ) ;
resolve ( ) ;
} ) ;
} ) ;
}
/ * *
* Queries data from the server
* @param endpoint The server - side socket endpoint
* @param args The server socket arguments
* /
async _queryServer ( endpoint : string , . . . args : any [ ] ) : Promise < any > {
// NOTE: socket.io may cause client-side memory leaks if the ack function is never called
await this . ensureVerified ( 5000 ) ;
LOG . silly ( 'querying s#' + this . id + ' @' + endpoint + ' / [' + args . map ( arg = > LOG . inspect ( arg ) ) . join ( ', ' ) + ']' ) ;
return await new Promise ( ( resolve , reject ) = > {
this . _socketEmitTimeout ( 5000 , endpoint , . . . args , async ( errMsg : string , serverData : any ) = > {
if ( errMsg ) {
reject ( new Error ( 'error fetching server data @' + endpoint + ' / [' + args . map ( arg = > LOG . inspect ( arg ) ) . join ( ', ' ) + ']: ' + errMsg ) ) ;
} else {
resolve ( serverData ) ;
}
} ) ;
} ) ;
}
// TODO: Make this "T extends something with an id"
static _getChanges < T > ( cacheData : T [ ] | null , serverData : T [ ] , equal : ( ( a : T , b : T ) = > boolean ) ) : Changes < T > {
if ( cacheData === null ) {
return { updated : [ ] , added : serverData , deleted : [ ] } ;
}
let updated : { oldDataPoint : T , newDataPoint : T } [ ] = [ ] ;
let added : T [ ] = [ ] ;
let deleted : T [ ] = [ ] ;
for ( let serverDataPoint of serverData ) {
let cacheDataPoint = cacheData . find ( ( m : T ) = > ( m as any ) . id == ( serverDataPoint as any ) . id ) ;
if ( cacheDataPoint ) {
if ( ! equal ( cacheDataPoint , serverDataPoint ) ) {
updated . push ( { oldDataPoint : cacheDataPoint , newDataPoint : serverDataPoint } ) ;
}
} else {
added . push ( serverDataPoint ) ;
}
}
for ( let cacheDataPoint of cacheData ) {
let serverDataPoint = serverData . find ( ( s : T ) = > ( s as any ) . id == ( cacheDataPoint as any ) . id ) ;
if ( serverDataPoint == null ) {
deleted . push ( cacheDataPoint ) ;
}
}
return { updated , added , deleted } ;
}
disconnect ( ) : void {
this . socket . disconnect ( ) ;
}
async verifyWithServer ( ) : Promise < void > {
await new Promise < void > ( async ( resolve , reject ) = > {
// Solve the server's challenge
let publicKeyBuff = this . publicKey . export ( { type : 'spki' , format : 'der' } ) ;
this . _socketEmitTimeout ( 5000 , 'challenge' , publicKeyBuff , ( errMsg , algo , type , challenge ) = > {
if ( errMsg ) {
reject ( new Error ( 'challenge request failed: ' + errMsg ) ) ;
return ;
}
const sign = crypto . createSign ( algo ) ;
sign . write ( challenge ) ;
sign . end ( ) ;
let signature = sign . sign ( this . privateKey , type ) ;
this . _socketEmitTimeout ( 5000 , 'verify' , signature , ( errMsg , memberId ) = > {
if ( errMsg ) {
reject ( new Error ( 'verification request failed: ' + errMsg ) ) ;
return ;
}
this . memberId = memberId ;
DBCache . updateServerMemberId ( this . id , this . memberId ) ;
this . isVerified = true ;
LOG . info ( ` s# ${ this . id } client verified as u# ${ this . memberId } ` ) ;
resolve ( ) ;
this . emit ( 'verified' ) ;
} ) ;
} ) ;
} ) ;
}
async verify ( ) : Promise < void > {
await new Promise < void > ( async ( resolve , reject ) = > {
// Solve the server's challenge
let publicKeyBuff = this . publicKey . export ( { type : 'spki' , format : 'der' } ) ;
this . _socketEmitTimeout ( 5000 , 'challenge' , publicKeyBuff , ( errMsg , algo , type , challenge ) = > {
if ( errMsg ) {
reject ( new Error ( 'challenge request failed: ' + errMsg ) ) ;
return ;
}
const sign = crypto . createSign ( algo ) ;
sign . write ( challenge ) ;
sign . end ( ) ;
let signature = sign . sign ( this . privateKey , type ) ;
this . _socketEmitTimeout ( 5000 , 'verify' , signature , ( errMsg , memberId ) = > {
if ( errMsg ) {
reject ( new Error ( 'verification request failed: ' + errMsg ) ) ;
return ;
}
this . memberId = memberId ;
DBCache . updateServerMemberId ( this . id , this . memberId ) ;
this . isVerified = true ;
LOG . info ( ` verified at server# ${ this . id } as u# ${ this . memberId } ` ) ;
resolve ( ) ;
this . emit ( 'verified' ) ;
} ) ;
} ) ;
} ) ;
}
// timeout is in ms
async ensureVerified ( timeout : number ) : Promise < void > {
if ( this . isVerified ) {
return ;
}
await new Promise < void > ( ( resolve , reject ) = > {
let timeoutId : any = null ;
let listener = ( ) = > {
clearTimeout ( timeoutId ) ;
resolve ( ) ;
}
this . once ( 'verified' , listener ) ;
timeoutId = setTimeout ( ( ) = > {
this . off ( 'verified' , listener ) ;
reject ( new Error ( 'verification timeout' ) ) ;
} , timeout ) ;
} ) ;
}
async ensureMetadata ( ) : Promise < void > {
if ( this . _metadata !== null ) return ;
await this . fetchMetadata ( ) ;
}
async ensureMembers ( ) : Promise < void > {
if ( this . members . size > 0 ) return ;
await this . fetchMembers ( ) ;
}
async ensureChannels ( ) : Promise < void > {
if ( this . channels . size > 0 ) return ;
await this . fetchChannels ( ) ;
}
async getMyMember ( ) : Promise < Member > {
await this . ensureMembers ( ) ;
return this . members . get ( this . memberId ) as Member ;
}
async grabMetadata ( ) : Promise < ServerMetaData | CacheServerData > {
await this . ensureMetadata ( ) ;
return this . _metadata as ServerMetaData | CacheServerData ;
}
async grabMembers ( ) : Promise < Member [ ] > {
await this . ensureMembers ( ) ;
return Array . from ( this . members . values ( ) ) ;
}
async grabChannels ( ) : Promise < Channel [ ] > {
await this . ensureChannels ( ) ;
return Array . from ( this . channels . values ( ) ) ;
}
async grabRecentMessages ( channelId : string , number : number ) : Promise < Message [ ] > {
let cached = this . _recentMessages . getRecentMessages ( this . id , channelId , number ) ;
2021-11-07 20:13:59 +00:00
if ( cached !== null ) return cached ;
2021-11-07 16:50:30 +00:00
return await this . fetchMessagesRecent ( channelId , number ) ;
}
async fetchMetadata ( ) : Promise < ServerMetaData | CacheServerData > {
function isDifferent ( cacheData : CacheServerData | null , serverData : ServerMetaData ) {
if ( cacheData === null ) return true ;
return ! ! ( cacheData . name != serverData . name || cacheData . iconResourceId != serverData . iconResourceId )
}
let metadata = await this . _fetchCachedAndVerify < CacheServerData , ServerMetaData > ( {
serverFunc : async ( ) = > { return ServerMetaData . fromServerDBData ( await this . _queryServer ( 'fetch-server' ) ) ; } ,
cacheFunc : async ( ) = > { return await DBCache . getServer ( this . id ) ; } ,
cacheUpdateFunc : async ( cacheData : CacheServerData | null , serverData : ServerMetaData ) = > {
if ( ! isDifferent ( cacheData , serverData ) ) return false ;
await DBCache . updateServer ( this . id , serverData ) ;
return true ;
} ,
updateEventName : 'update-server' ,
} ) ;
this . _metadata = metadata as ServerMetaData | CacheServerData ;
return metadata ;
}
// if not verified, will attempt to load from cache rather than waiting for verification
// returns { avatar_resource_id, display_name, status }
async fetchConnectionInfo ( ) : Promise < ConnectionInfo > {
let connection : ConnectionInfo = {
id : null ,
avatarResourceId : null ,
displayName : 'Connecting...' ,
status : '' ,
privileges : [ ] ,
roleName : null ,
roleColor : null ,
rolePriority : null
}
if ( this . isVerified ) {
await this . ensureMembers ( ) ;
let member = this . members . get ( this . memberId ) ;
if ( member ) {
connection . id = member . id ;
connection . avatarResourceId = member . avatarResourceId ;
connection . displayName = member . displayName ;
connection . status = member . status ;
connection . roleName = member . roleName ;
connection . roleColor = member . roleColor ;
connection . rolePriority = member . rolePriority ;
connection . privileges = member . privileges ;
} else {
LOG . warn ( 'Unable to find self in members' , this . members ) ;
}
} else {
let cacheMembers = await DBCache . getMembers ( this . id ) ;
if ( cacheMembers ) {
let member = cacheMembers . find ( m = > m . id == this . memberId ) ;
if ( member ) {
connection . id = member . id ;
connection . avatarResourceId = member . avatarResourceId ;
connection . displayName = member . displayName ;
connection . status = 'connecting' ;
connection . privileges = [ ] ;
} else {
LOG . warn ( 'Unable to find self in cached members' ) ;
}
}
}
return connection ;
}
async fetchMembers ( ) : Promise < Member [ ] > {
let members = await this . _fetchCachedAndVerify < Member [ ] , Member [ ] > ( {
lock : this.membersLock ,
serverFunc : async ( ) = > {
let dataMembers = ( await this . _queryServer ( 'fetch-members' ) ) as any [ ] ;
return dataMembers . map ( ( dataMember : any ) = > Member . fromDBData ( dataMember ) ) ;
} ,
cacheFunc : async ( ) = > { return await DBCache . getMembers ( this . id ) ; } ,
cacheUpdateFunc : async ( cacheData : Member [ ] | null , serverData : Member [ ] ) = > {
function equal ( cacheMember : Member , serverMember : Member ) {
return (
cacheMember . id === serverMember . id &&
cacheMember . displayName === serverMember . displayName &&
cacheMember . status === serverMember . status &&
cacheMember . avatarResourceId === serverMember . avatarResourceId &&
cacheMember . roleName === serverMember . roleName &&
cacheMember . roleColor === serverMember . roleColor &&
cacheMember . rolePriority === serverMember . rolePriority &&
cacheMember . privileges . join ( ',' ) === serverMember . privileges . join ( ',' )
) ;
}
let changes = ClientController . _getChanges < Member > ( cacheData , serverData , equal ) ;
if ( changes . updated . length == 0 && changes . added . length == 0 && changes . deleted . length == 0 ) {
return false ;
}
await DBCache . updateServerMembers ( this . id , serverData ) ;
this . _updateCachedMembers ( serverData ) ;
if ( changes . deleted . length > 0 ) {
this . emit ( 'deleted-members' , changes . deleted ) ;
}
if ( changes . added . length > 0 ) {
this . emit ( 'added-members' , changes . added ) ;
}
if ( changes . updated . length > 0 ) {
2021-11-07 20:13:59 +00:00
this . emit ( 'updated-members' , changes . updated . map ( change = > ( { oldMember : change.oldDataPoint , newMember : change.newDataPoint } ) ) ) ;
2021-11-07 16:50:30 +00:00
}
return true ;
} ,
updateEventName : null ,
} ) ;
this . _updateCachedMembers ( members ) ;
return members ;
}
async fetchChannels ( ) : Promise < Channel [ ] > {
let changes : Changes < Channel > | null = null ;
let channels = await this . _fetchCachedAndVerify < Channel [ ] , Channel [ ] > ( {
lock : this.channelsLock ,
serverFunc : async ( ) = > {
let dataChannels = ( await this . _queryServer ( 'fetch-channels' ) ) as any [ ] ;
return dataChannels . map ( ( dataChannel : any ) = > Channel . fromDBData ( dataChannel ) ) ;
} ,
cacheFunc : async ( ) = > { return await DBCache . getChannels ( this . id ) ; } ,
cacheUpdateFunc : async ( cacheData : Channel [ ] | null , serverData : Channel [ ] ) = > {
function equal ( cacheChannel : Channel , serverChannel : Channel ) {
return cacheChannel . id == serverChannel . id &&
cacheChannel . index == serverChannel . index &&
cacheChannel . name == serverChannel . name &&
cacheChannel . flavorText == serverChannel . flavorText ;
}
let changes = ClientController . _getChanges ( cacheData , serverData , equal ) ;
if ( changes . updated . length == 0 && changes . added . length == 0 && changes . deleted . length == 0 ) {
return false ;
}
// All cache updates are handled by internal event handlers
if ( changes . deleted . length > 0 ) {
this . emit ( 'deleted-channels' , changes . deleted ) ;
}
if ( changes . added . length > 0 ) {
this . emit ( 'added-channels' , changes . added ) ;
}
if ( changes . updated . length > 0 ) {
2021-11-07 20:13:59 +00:00
this . emit ( 'updated-channels' , changes . updated . map ( change = > ( { oldChannel : change.oldDataPoint , newChannel : change.newDataPoint } ) ) ) ;
2021-11-07 16:50:30 +00:00
}
return true ;
} ,
updateEventName : null ,
} ) ;
this . _updateCachedChannels ( channels ) ;
return channels ;
}
// channelId: the id of the channel the messages were fetched from (used in the events)
// firstMessageId: the first message in the current list of messages
// lastMessageId: the last element in the current list of messages
private _createDiffMessageDataFunc (
channelId : string ,
firstMessageId : string | null ,
lastMessageId : string | null
) : ( ( cacheMessages : Message [ ] , serverMessages : Message [ ] ) = > Promise < boolean > ) {
return async ( cacheMessages : Message [ ] , serverMessages : Message [ ] ) = > {
function equal ( cacheMessage : Message , serverMessage : Message ) {
return cacheMessage . id == serverMessage . id &&
cacheMessage . channel . id == serverMessage . channel . id &&
cacheMessage . member . id == serverMessage . member . id &&
cacheMessage . sent . getTime ( ) == serverMessage . sent . getTime ( ) &&
cacheMessage . text == serverMessage . text &&
cacheMessage . resourceId == serverMessage . resourceId &&
cacheMessage . resourceName == serverMessage . resourceName &&
cacheMessage . resourceWidth == serverMessage . resourceWidth &&
cacheMessage . resourceHeight == serverMessage . resourceHeight &&
cacheMessage . resourcePreviewId == serverMessage . resourcePreviewId
}
return new Promise < boolean > ( async ( resolve , reject ) = > {
let diffFound = false ;
let updatedMessages : { oldMessage : Message , newMessage : Message } [ ] = [ ] ;
let addedAfter = new Map < string , Message > ( ) ; // messageId -> message added after this message
let addedBefore = new Map < string , Message > ( ) ; // messageId -> message added before this message
for ( let i = 0 ; i < serverMessages . length ; ++ i ) {
let serverMessage = serverMessages [ i ] ;
let cacheMessage = cacheMessages . find ( m = > serverMessage . id == m . id ) ;
if ( cacheMessage ) {
if ( ! equal ( cacheMessage , serverMessage ) ) {
diffFound = true ;
updatedMessages . push ( {
oldMessage : cacheMessage ,
newMessage : serverMessage ,
} ) ;
}
} else {
// items in server not in cache are added
diffFound = true ;
let comesAfter = serverMessages [ i - 1 ] || { id : lastMessageId } ; // this message comesAfter comesAfter
let comesBefore = serverMessages [ i + 1 ] || { id : firstMessageId } ; // this message comesBefore comesBefore
addedAfter . set ( comesAfter . id , serverMessage ) ;
addedBefore . set ( comesBefore . id , serverMessage ) ;
}
}
let deletedMessages : Message [ ] = [ ] ;
for ( let cacheMessage of cacheMessages ) {
let serverMessage = serverMessages . find ( m = > cacheMessage . id == m . id ) ;
if ( serverMessage == null ) {
// items in cache not in server are deleted
diffFound = true ;
deletedMessages . push ( cacheMessage ) ;
}
}
resolve ( diffFound ) ;
if ( cacheMessages . length > 0 && deletedMessages . length === cacheMessages . length ) {
// if all of the cache data was invalid, it is likely that it needs to be cleared
// this typically happens when the server got a lot of new messages since the cache
// was last updated
await DBCache . clearServerMessages ( this . id , deletedMessages [ 0 ] . channel . id ) ;
} else if ( deletedMessages . length > 0 ) {
// Messages from the cache that come on the far side of the request are marked as deleted
// so they are deleted from the UI. However, they should not be removed from the cache
// yet since they most likely have not been pulled from the server yet. (and will be
// very likely pulled from the server on the next fetch).
let cacheDeletedMessages = deletedMessages . slice ( ) ;
let i : number | null = null ;
if ( firstMessageId || ( firstMessageId == null && lastMessageId == null ) ) { // before & recent
i = 0 ;
while ( cacheDeletedMessages . length > 0 && cacheDeletedMessages [ 0 ] . id == cacheMessages [ i ] . id ) {
cacheDeletedMessages . shift ( ) ;
i ++ ;
}
}
if ( lastMessageId ) { // after
i = 0 ;
while ( cacheDeletedMessages . length > 0 && cacheDeletedMessages [ cacheDeletedMessages . length - 1 ] . id == cacheMessages [ cacheMessages . length - 1 - i ] . id ) {
cacheDeletedMessages . pop ( ) ;
i ++ ;
}
}
//LOG.debug('skipping ' + i + ' deleted messages on the cache side -> deleting ' + cacheDeletedMessages.length + ' cache messages instead of ' + deletedMessages.length);
if ( cacheDeletedMessages . length > 0 ) {
await DBCache . deleteServerMessages ( this . id , cacheDeletedMessages . map ( m = > m . id ) ) ;
}
}
if ( deletedMessages . length > 0 ) { // make sure to do deleted before added
this . emit ( 'deleted-messages' , this . channels . get ( channelId ) , deletedMessages ) ;
}
if ( updatedMessages . length > 0 ) {
this . emit ( 'updated-messages' , this . channels . get ( channelId ) , updatedMessages ) ;
}
if ( addedAfter . size > 0 || addedBefore . size > 0 ) {
this . emit ( 'added-messages' , this . channels . get ( channelId ) , addedAfter , addedBefore ) ;
}
} ) ;
} ;
}
async fetchMessagesRecent ( channelId : string , number : number ) : Promise < Message [ ] > {
await this . ensureMembers ( ) ;
await this . ensureChannels ( ) ;
let messages = await this . _fetchCachedAndVerify < Message [ ] , Message [ ] > ( {
serverFunc : async ( ) = > {
let dataMessages = await this . _queryServer ( 'fetch-messages-recent' , channelId , number ) as any [ ] ;
return dataMessages . map ( ( dataMessage : any ) = > Message . fromDBData ( dataMessage , this . members , this . channels ) ) ;
} ,
cacheFunc : async ( ) = > {
return await DBCache . getMessagesRecent ( this . id , channelId , number , this . members , this . channels ) ;
} ,
cacheUpdateFunc : async ( cacheData : Message [ ] | null , serverData : Message [ ] ) = > {
if ( cacheData === null || this . _createDiffMessageDataFunc ( channelId , null , null ) ( cacheData , serverData ) ) {
await DBCache . upsertServerMessages ( this . id , channelId , serverData ) ;
return true ;
}
return false ;
} ,
updateEventName : null // all events are handled in diffFunc
} ) ;
this . _recentMessages . putRecentMessages ( this . id , channelId , messages ) ;
return messages ;
}
async fetchMessagesBefore ( channelId : string , messageId : string , number : number ) : Promise < Message [ ] | null > {
await this . ensureMembers ( ) ;
await this . ensureChannels ( ) ;
let messages = await this . _fetchCachedAndVerify < Message [ ] , Message [ ] | null > ( {
serverFunc : async ( ) = > {
let dataMessages = await this . _queryServer ( 'fetch-messages-before' , channelId , messageId , number ) as any [ ] ;
return dataMessages . map ( ( dataMessage : any ) = > Message . fromDBData ( dataMessage , this . members , this . channels ) ) ;
} ,
cacheFunc : async ( ) = > {
return await DBCache . getMessagesBefore ( this . id , channelId , messageId , number , this . members , this . channels ) ;
} ,
cacheUpdateFunc : async ( cacheData : Message [ ] | null , serverData : Message [ ] ) = > {
if ( cacheData === null || this . _createDiffMessageDataFunc ( channelId , messageId , null ) ( cacheData , serverData ) ) {
await DBCache . upsertServerMessages ( this . id , channelId , serverData ) ;
return true ;
}
return false ;
} ,
updateEventName : null // all events are handled in diffFunc
} ) ;
return messages ;
}
async fetchMessagesAfter ( channelId : string , messageId : string , number : number ) : Promise < Message [ ] | null > {
await this . ensureMembers ( ) ;
await this . ensureChannels ( ) ;
let messages = await this . _fetchCachedAndVerify < Message [ ] , Message [ ] | null > ( {
serverFunc : async ( ) = > {
let dataMessages = await this . _queryServer ( 'fetch-messages-after' , channelId , messageId , number ) as any [ ] ;
return dataMessages . map ( ( dataMessage : any ) = > Message . fromDBData ( dataMessage , this . members , this . channels ) ) ;
} ,
cacheFunc : async ( ) = > { return await DBCache . getMessagesAfter ( this . id , channelId , messageId , number , this . members , this . channels ) ; } ,
cacheUpdateFunc : async ( cacheData : Message [ ] | null , serverData : Message [ ] ) = > {
if ( cacheData === null || this . _createDiffMessageDataFunc ( channelId , null , messageId ) ( cacheData , serverData ) ) {
await DBCache . upsertServerMessages ( this . id , channelId , serverData ) ;
return true ;
}
return false ;
} ,
updateEventName : null // all events are handled in diffFunc
} ) ;
return messages ;
}
async _fetchResourceInternal ( resourceId : string ) : Promise < Buffer > {
// not using standard _fetchCached here because server-side resources never change.
// rather, the resource_id would be updated if it changes for a message, server icon, avatar, etc.
// this provides for a much simpler cache system (3 stages, client-memory, client-db, server-side)
// since all serverId / resourceId pairs will never update their resource buffers, this cache becomes exceedingly simple
let resourceCacheDataBuff = ResourceRAMCache . getResource ( this . id , resourceId ) ;
if ( resourceCacheDataBuff != null ) {
return resourceCacheDataBuff ;
}
let cacheData = await DBCache . getResource ( this . id , resourceId ) ;
if ( cacheData !== null ) {
ResourceRAMCache . putResource ( this . id , resourceId , cacheData . data ) ;
return cacheData . data ;
}
// Note: Not pre-requesting from server asynchronously to reduce fetch-resource requests
let serverData = await this . _queryServer ( 'fetch-resource' , resourceId ) ;
ResourceRAMCache . putResource ( this . id , resourceId , serverData . data ) ;
await DBCache . upsertServerResources ( this . id , [ serverData ] ) ;
return serverData . data ;
}
/ * *
* Deduplicates client - side resource requests . Useful for when the client wants to fetch an avatar
* multiple times for the channel feed . Or if there is a duplicate image in the feed .
* @param resourceId
* /
async fetchResource ( resourceId : string ) : Promise < Buffer > {
return await new Promise ( async ( resolve , reject ) = > {
let resultFunc = ( err : any , resourceBuff : Buffer | null ) = > {
if ( err ) {
reject ( err ) ;
} else if ( resourceBuff ) {
resolve ( resourceBuff ) ;
} else {
reject ( new ShouldNeverHappenError ( 'no buffer or error!' ) ) ;
}
}
if ( this . resourceCallbacks . has ( resourceId ) ) {
this . resourceCallbacks . get ( resourceId ) ? . push ( resultFunc ) ;
} else {
this . resourceCallbacks . set ( resourceId , [ resultFunc ] ) ;
let result : Buffer | null = null ;
let err : any = null ;
try {
result = await this . _fetchResourceInternal ( resourceId ) ;
} catch ( e ) {
err = e ;
}
for ( let callbackFunc of this . resourceCallbacks . get ( resourceId ) ? ? [ ] ) {
callbackFunc ( err , result ) ;
}
this . resourceCallbacks . delete ( resourceId ) ;
}
} ) ;
}
async sendMessage ( channelId : string , text : string ) {
let dataMessage = await this . _queryServer ( 'send-message' , channelId , text ) ;
await this . ensureMembers ( ) ;
await this . ensureChannels ( ) ;
return Message . fromDBData ( dataMessage , this . members , this . channels ) ;
}
async sendMessageWithResource ( channelId : string , text : string | null , resourceBuff : Buffer , resourceName : string ) {
let dataMessage = await this . _queryServer ( 'send-message-with-resource' , channelId , text , resourceBuff , resourceName ) ;
await this . ensureMembers ( ) ;
await this . ensureChannels ( ) ;
return Message . fromDBData ( dataMessage , this . members , this . channels ) ;
}
async setStatus ( status : string ) : Promise < void > {
// wow, that's simple for a change
await this . _queryServer ( 'set-status' , status ) ;
}
async setDisplayName ( displayName : string ) : Promise < void > {
await this . _queryServer ( 'set-display-name' , displayName ) ;
}
async setAvatar ( avatarBuff : Buffer ) : Promise < void > {
await this . _queryServer ( 'set-avatar' , avatarBuff ) ;
}
async setName ( name : string ) : Promise < ServerMetaData > {
return ServerMetaData . fromServerDBData ( await this . _queryServer ( 'set-name' , name ) ) ;
}
async setIcon ( iconBuff : Buffer ) : Promise < ServerMetaData > {
return ServerMetaData . fromServerDBData ( await this . _queryServer ( 'set-icon' , iconBuff ) ) ;
}
async updateChannel ( channelId : string , name : string , flavorText : string | null ) : Promise < Channel > {
return Channel . fromDBData ( await this . _queryServer ( 'update-channel' , channelId , name , flavorText ) ) ;
}
async createChannel ( name : string , flavorText : string | null ) : Promise < Channel > {
return Channel . fromDBData ( await this . _queryServer ( 'create-text-channel' , name , flavorText ) ) ;
}
async queryTokens ( ) : Promise < Token [ ] > {
// No cacheing for now, this is a relatively small request and comes
// after a context-menu click so it's not as important to cache as the
// channels, members, and messages
let dataTokens = await this . _queryServer ( 'fetch-tokens' ) ;
await this . ensureMembers ( ) ;
return dataTokens . map ( dataToken = > {
return Token . fromDBData ( dataToken , this . members ) ;
} ) ;
}
async revokeToken ( token : string ) : Promise < void > {
return await this . _queryServer ( 'revoke-token' , token ) ;
}
close ( ) : void {
this . socket . close ( ) ;
}
2021-10-30 17:26:41 +00:00
}