import * as electronRemote from '@electron/remote'; const electronConsole = electronRemote.getGlobal('console') as Console; import Logger from '../../logger/logger'; const LOG = Logger.create(__filename, electronConsole); import * as StackTrace from '../../stack-trace/stack-trace'; import * as crypto from 'crypto'; import { EventEmitter } from 'events'; import * as socketio from 'socket.io-client'; import ConcurrentQueue from '../../concurrent-queue/concurrent-queue'; import { Message, Member, Channel, Changes, ConnectionInfo, CacheServerData, ServerMetaData, ServerConfig, ShouldNeverHappenError, Token } from './data-types'; import DBCache from './db-cache'; import ResourceRAMCache from './resource-ram-cache'; import RecentMessageRAMCache from './message-ram-cache'; // Events: // 'connected' function() called when connected to the server // 'disconnected' function() called when connection to the server is lost // 'verified' function() called when verification handshake is completed // 'update-server' function(serverData) called when the server metadata updates // 'deleted-members' function(members) called when members were deleted on the server-side // 'updated-members' function(data: Array [ { oldMember, newMember } ]) called when a member was updated on the server-side // 'added-members' function(members) called when members were added on the server-side // 'deleted-channels' function(channels) called when channels were deleted on the server-side // 'updated-channels' function(data: Array [ { oldChannel, newChannel } ]) called when a channel was updated on the server-side // 'added-channels' function(channels) called when channels are added on the server-side // 'new-message' function(message) called when a message is received in a channel // 'deleted-messages' function(messages) called when messages were deleted on the server-side // 'updated-messages' function(data: Array [ { oldMessage, newMessage } ]) called when messages were updated on the server-side // 'added-messages' function(addedAfter : Map addedMessage>, addedBefore : addedMessage>) called when messages were added on the server-side (that were not in our client-side db). // Functions: // these three have grab counterparts // async fetchMetadata() Fetches the server information as { name, icon_resource_id } // async fetchMembers() Fetches the server members as [ { id, display_name, avatar_resource_id }, ... ] // async fetchChannels() Fetches the available channels as [ { id, name }, ... ] // async fetchMessagesRecent(channelId, number) Fetches the most recent number messages in a channel [ Message, ... ] // async fetchMessagesBefore(channelId, messageId, number) Fetches number messages before a specified message [ Message, ... ] // async fetchMessagesAfter(channelId, messageId, number) Fetches number messages after a specified message [ Message, ... ] // TODO: change this to grab // async fetchResource(resourceId) Fetches the resource associated with a specified resrouceId // async sendMessage(channelId, text) Sends a text message to a channel, returning the sent Message // async sendMessageWithResource(channelId, text, resource, resourceName) Sends a text message with a resource to a channel, returning the sent Message. text is optional // async setStatus(status) Set the current logged in user's status // async setDisplayName(displayName) Sets the current logged in user's display name // async setAvatar(avatarBuff) // async updateChannel(channelId, name, flavorText) Updates a channel's name and flavor text // async queryTokens() Queries for the login tokens as [ { token, member_id, created, expires }, ... ] // async revokeToken(token) Revokes an outstanding token interface FetchCachedAndVerifyProps { lock?: ConcurrentQueue, serverFunc: (() => Promise) cacheFunc: (() => Promise), cacheUpdateFunc: ((cacheData: ClientType | null, serverData: ServerType) => Promise), // returns true if changes were made to the cache updateEventName?: string | null, } export default class ClientController extends EventEmitter { private dbCache: DBCache; public id: string; public memberId: string; public url: string; public publicKey: crypto.KeyObject; public privateKey: crypto.KeyObject; public serverCert: string public socket: socketio.Socket; private _metadata: ServerMetaData | CacheServerData | null; private _recentMessages: RecentMessageRAMCache; public channels: Map; public members: Map; public channelsLock: ConcurrentQueue; public membersLock: ConcurrentQueue; public isVerified: boolean; public resourceCallbacks: Map Promise | void)[]>; public dedupedCallbacks: Map Promise | void)[]>; constructor(dbCache: DBCache, config: ServerConfig) { super(); this.dbCache = dbCache; // TODO: fetch these from the cache when they are needed rather than storing them in memory (especially private key) let publicKey = typeof config.publicKey === 'string' ? crypto.createPublicKey(config.publicKey) : config.publicKey; let privateKey = typeof config.privateKey === 'string' ? crypto.createPrivateKey(config.privateKey) : config.privateKey; this.id = config.serverId + ''; // this is the client-side server id (from Cache) this.memberId = config.memberId; this.url = config.url; this.publicKey = publicKey; this.privateKey = privateKey; this.serverCert = config.serverCert; this.socket = socketio.connect(this.url, { forceNew: true, ca: this.serverCert, // this provides identity verification for the server }); this._metadata = null; // use grabMetadata(); this._recentMessages = new RecentMessageRAMCache(); // use grabRecentMessages(); // These are used to make message objects more useful // TODO: make these private this.channels = new Map(); // use grabChannels(); this.members = new Map(); // use grabMembers(); this.channelsLock = new ConcurrentQueue(1); this.membersLock = new ConcurrentQueue(1); this.isVerified = false; this.resourceCallbacks = new Map Promise | void)[]>(); // resourceId -> [ callbackFunc, ... ]; this.dedupedCallbacks = new Map Promise | void)[]>(); // dedupeId -> [ callbackFunc, ... ]; this._bindSocket(); this._bindInternalEvents(); } _bindSocket(): void { this.socket.on('connect', async () => { LOG.info('connected to server#' + this.id); this.emit('connected'); // TODO: re-verify on verification failure? await this.verify(); }); this.socket.on('disconnect', () => { LOG.info('disconnected from server#' + this.id); this.isVerified = false; this._metadata = null; this._recentMessages.clear(); this.channels.clear(); this.members.clear(); this.emit('disconnected'); }); this.socket.on('new-message', async (dataMessage) => { await this.ensureMembers(); await this.ensureChannels(); let message = Message.fromDBData(dataMessage, this.members, this.channels); await this.dbCache.upsertServerMessages(this.id, message.channel.id, [ message ]); LOG.info(message.toString()); this._recentMessages.addNewMessage(this.id, message); this.emit('new-message', message); }); this.socket.on('update-member', async (member) => { await this.ensureMembers(); let oldMember = this.members.get(member.id); if (oldMember) { this.emit('updated-members', [ { oldMember: oldMember, newMember: member } ]); } else { this.emit('added-members', [ member ]); } }); this.socket.on('update-channel', async (channel) => { await this.ensureChannels(); let oldChannel = this.channels.get(channel.id); if (oldChannel) { this.emit('updated-channels', [ { oldChannel: oldChannel, newChannel: channel } ]); } else { this.emit('added-channels', [ channel ]); } }); this.socket.on('new-channel', async (channel) => { await this.ensureChannels(); this.emit('added-channels', [ channel ]); }); this.socket.on('update-server', async (serverMeta) => { await this.dbCache.updateServer(this.id, serverMeta); this.emit('update-server', serverMeta); }); } _bindInternalEvents(): void { this.on('added-members', async (members: Member[]) => { for (let member of members) { this.members.set(member.id, member); } await this.dbCache.updateServerMembers(this.id, Array.from(this.members.values())); }); this.on('updated-members', async (data: { oldMember: Member, newMember: Member }[]) => { for (const { oldMember, newMember } of data) { this.members.set(newMember.id, newMember); } await this.dbCache.updateServerMembers(this.id, Array.from(this.members.values())); }); this.on('deleted-members', async (members: Member[]) => { for (let member of members) { this.members.delete(member.id); } await this.dbCache.updateServerMembers(this.id, Array.from(this.members.values())); }); this.on('added-channels', async (channels: Channel[]) => { for (let channel of channels) { this.channels.set(channel.id, channel); } await this.dbCache.updateServerChannels(this.id, Array.from(this.channels.values())); }); this.on('updated-channels', async (data: { oldChannel: Channel, newChannel: Channel }[]) => { for (const { oldChannel, newChannel } of data) { this.channels.set(newChannel.id, newChannel); } await this.dbCache.updateServerChannels(this.id, Array.from(this.channels.values())); }); this.on('deleted-channels', async (channels: Channel[]) => { for (let channel of channels) { this.channels.delete(channel.id); } await this.dbCache.updateServerChannels(this.id, Array.from(this.channels.values())); }); this.on('added-messages', async (channel: Channel, addedAfter: Map, addedBefore: Map) => { // Adding messages is surprisingly complicated (see script.js's added-messages function) // so we can just drop the channel and refresh it once if any messages got added while // we were gone. Further, it is probably best to make 100% sure that the script.js // implementation is correct before copying it elsewhere. (I'm 90% sure it is correct) // Getting this correct and actually implementing the added-messages feature in the // RAM cache would be useful for first-time joiners to a server, getting the channel // messages for the first time // Alternatively, just store the date in the message and use order-by this._recentMessages.dropChannel(this.id, channel.id); await this.dbCache.upsertServerMessages(this.id, channel.id, Array.from(addedAfter.values())); }); this.on('updated-messages', async (channel: Channel, data: { oldMessage: Message, newMessage: Message }[]) => { for (let { oldMessage, newMessage } of data) { this._recentMessages.updateMessage(this.id, oldMessage, newMessage); } await this.dbCache.upsertServerMessages(this.id, channel.id, data.map(change => change.newMessage)); }); this.on('deleted-messages', async (_channel: Channel, messages: Message[]) => { for (let message of messages) { this._recentMessages.deleteMessage(this.id, message); } await this.dbCache.deleteServerMessages(this.id, messages.map(message => message.id)); }); } _updateCachedMembers(members: Member[]): void { this.members.clear(); for (let member of members) { this.members.set(member.id, member); } } _updateCachedChannels(channels: Channel[]): void { this.channels.clear(); for (let channel of channels) { this.channels.set(channel.id, channel); } } /** * Fetches data from the server but returns cached data if it already exists. If the server data comes back different * from the cached data, emits an event with the server data * @param lock A locking enqueue function to make sure that the cache data does not change while waiting for the server data to be requested. * @param serverFunc A function() that returns the data from the server * @param cacheFunc A function() that returns the data from the cache (returns null if no data) * @param cacheUpdateFunc A function(cacheData, serverData) is called after the server data is fetched. It should be used to update the data in the cache * @param updateEventName The name of the event to emit when the server data is different from the cache data (and the cache data is not null), null for no event */ async _fetchCachedAndVerify(props: FetchCachedAndVerifyProps): Promise { const { lock, serverFunc, cacheFunc, cacheUpdateFunc, updateEventName } = props; return await new Promise(async (resolve, reject) => { let func = async () => { try { let serverPromise = serverFunc(); let cacheData = await cacheFunc(); if (cacheData !== null) { resolve(cacheData); } let serverData: U; try { serverData = await serverPromise; } catch (e) { if (cacheData !== null) { // Print an error if this was already resolved LOG.warn('Error fetching server data:', e); return; } else { throw e; } } // make sure the 'added/deleted/updated' events get run before returning the server data try { let changesMade = await cacheUpdateFunc(cacheData, serverData); if (updateEventName && cacheData != null && changesMade) { this.emit(updateEventName, serverData); } } catch (e) { LOG.error('error handling cache update', e); } if (cacheData == null) { resolve(serverData); } } catch (e) { reject(e); } }; if (lock) { await lock.push(func); } else { await func(); } }); } // This function is a promise for error stack tracking purposes. async _socketEmitTimeout(ms: number, name: string, ...args: any[]): Promise { return new Promise((resolve) => { let socketArgs = args.slice(0, args.length - 1); let respond = args[args.length - 1]; let cutoff = false; let timeout = setTimeout(() => { cutoff = true; respond('emit timeout'); resolve(); }, ms); this.socket.emit(name, ...socketArgs, (...respondArgs: any[]) => { if (cutoff) { return; } clearTimeout(timeout); respond(...respondArgs); resolve(); }); }); } /** * Queries data from the server * @param endpoint The server-side socket endpoint * @param args The server socket arguments */ async _queryServer(endpoint: string, ...args: any[]): Promise { // NOTE: socket.io may cause client-side memory leaks if the ack function is never called await this.ensureVerified(5000); let message = `querying s#${this.id} @${endpoint}(${args.map(arg => LOG.inspect(arg)).join(', ')})`; LOG.silly(message); //if (endpoint === 'fetch-messages-recent') LOG.silly(null, new Error('call stack')); return await new Promise((resolve, reject) => { this._socketEmitTimeout(5000, endpoint, ...args, async (errMsg: string, serverData: any) => { if (errMsg) { reject(new Error('error fetching server data @' + endpoint + ' / [' + args.map(arg => LOG.inspect(arg)).join(', ') + ']: ' + errMsg)); } else { resolve(serverData); } }); }); } // TODO: Make this "T extends something with an id" static _getChanges(cacheData: T[] | null, serverData: T[], equal: ((a: T, b: T) => boolean)): Changes { if (cacheData === null) { return { updated: [], added: serverData, deleted: [] }; } let updated: { oldDataPoint: T, newDataPoint: T }[] = []; let added: T[] = []; let deleted: T[] = []; for (let serverDataPoint of serverData) { let cacheDataPoint = cacheData.find((m: T) => (m as any).id == (serverDataPoint as any).id); if (cacheDataPoint) { if (!equal(cacheDataPoint, serverDataPoint)) { updated.push({ oldDataPoint: cacheDataPoint, newDataPoint: serverDataPoint }); } } else { added.push(serverDataPoint); } } for (let cacheDataPoint of cacheData) { let serverDataPoint = serverData.find((s: T) => (s as any).id == (cacheDataPoint as any).id); if (serverDataPoint == null) { deleted.push(cacheDataPoint); } } return { updated, added, deleted }; } disconnect(): void { this.socket.disconnect(); } async verifyWithServer(): Promise { await new Promise(async (resolve, reject) => { // Solve the server's challenge let publicKeyBuff = this.publicKey.export({ type: 'spki', format: 'der' }); this._socketEmitTimeout(5000, 'challenge', publicKeyBuff, (errMsg, algo, type, challenge) => { if (errMsg) { reject(new Error('challenge request failed: ' + errMsg)); return; } const sign = crypto.createSign(algo); sign.write(challenge); sign.end(); let signature = sign.sign(this.privateKey, type); this._socketEmitTimeout(5000, 'verify', signature, (errMsg, memberId) => { if (errMsg) { reject(new Error('verification request failed: ' + errMsg)); return; } this.memberId = memberId; this.dbCache.updateServerMemberId(this.id, this.memberId); this.isVerified = true; LOG.info(`s#${this.id} client verified as u#${this.memberId}`); resolve(); this.emit('verified'); }); }); }); } async verify(): Promise { await new Promise(async (resolve, reject) => { // Solve the server's challenge let publicKeyBuff = this.publicKey.export({ type: 'spki', format: 'der' }); this._socketEmitTimeout(5000, 'challenge', publicKeyBuff, (errMsg, algo, type, challenge) => { if (errMsg) { reject(new Error('challenge request failed: ' + errMsg)); return; } const sign = crypto.createSign(algo); sign.write(challenge); sign.end(); let signature = sign.sign(this.privateKey, type); this._socketEmitTimeout(5000, 'verify', signature, (errMsg, memberId) => { if (errMsg) { reject(new Error('verification request failed: ' + errMsg)); return; } this.memberId = memberId; this.dbCache.updateServerMemberId(this.id, this.memberId); this.isVerified = true; LOG.info(`verified at server#${this.id} as u#${this.memberId}`); resolve(); this.emit('verified'); }); }); }); } // timeout is in ms async ensureVerified(timeout: number): Promise { if (this.isVerified) { return; } await new Promise((resolve, reject) => { let timeoutId: any = null; let listener = () => { clearTimeout(timeoutId); resolve(); } this.once('verified', listener); timeoutId = setTimeout(() => { this.off('verified', listener); reject(new Error('verification timeout')); }, timeout); }); } async ensureMetadata(): Promise { if (this._metadata !== null) return; await this.fetchMetadata(); } async ensureMembers(): Promise { if (this.members.size > 0) return; await this.fetchMembers(); } async ensureChannels(): Promise { if (this.channels.size > 0) return; await this.fetchChannels(); } async getMyMember(): Promise { await this.ensureMembers(); return this.members.get(this.memberId) as Member; } async grabMetadata(): Promise { await this.ensureMetadata(); return this._metadata as ServerMetaData | CacheServerData; } async grabMembers(): Promise { await this.ensureMembers(); return Array.from(this.members.values()); } async grabChannels(): Promise { await this.ensureChannels(); return Array.from(this.channels.values()); } async grabRecentMessages(channelId: string, number: number): Promise { let cached = this._recentMessages.getRecentMessages(this.id, channelId, number); if (cached !== null) return cached; return await this.fetchMessagesRecent(channelId, number); } async fetchMetadata(): Promise { function isDifferent(cacheData: CacheServerData | null, serverData: ServerMetaData) { if (cacheData === null) return true; return !!(cacheData.name != serverData.name || cacheData.iconResourceId != serverData.iconResourceId) } let metadata = await this._fetchCachedAndVerify({ serverFunc: async () => { return ServerMetaData.fromServerDBData(await this._queryServer('fetch-server')); }, cacheFunc: async () => { return await this.dbCache.getServer(this.id); }, cacheUpdateFunc: async (cacheData: CacheServerData | null, serverData: ServerMetaData) => { if (!isDifferent(cacheData, serverData)) return false; await this.dbCache.updateServer(this.id, serverData); return true; }, updateEventName: 'update-server', }); this._metadata = metadata as ServerMetaData | CacheServerData; return metadata; } // if not verified, will attempt to load from cache rather than waiting for verification // returns { avatar_resource_id, display_name, status } async fetchConnectionInfo(): Promise { let connection: ConnectionInfo = { id: null, avatarResourceId: null, displayName: 'Connecting...', status: '', privileges: [], roleName: null, roleColor: null, rolePriority: null } if (this.isVerified) { await this.ensureMembers(); let member = this.members.get(this.memberId); if (member) { connection.id = member.id; connection.avatarResourceId = member.avatarResourceId; connection.displayName = member.displayName; connection.status = member.status; connection.roleName = member.roleName; connection.roleColor = member.roleColor; connection.rolePriority = member.rolePriority; connection.privileges = member.privileges; } else { LOG.warn('Unable to find self in members', this.members); } } else { let cacheMembers = await this.dbCache.getMembers(this.id); if (cacheMembers) { let member = cacheMembers.find(m => m.id == this.memberId); if (member) { connection.id = member.id; connection.avatarResourceId = member.avatarResourceId; connection.displayName = member.displayName; connection.status = 'connecting'; connection.privileges = []; } else { LOG.warn('Unable to find self in cached members'); } } } return connection; } async fetchMembers(): Promise { let members = await this._fetchCachedAndVerify({ lock: this.membersLock, serverFunc: async () => { let dataMembers = (await this._queryServer('fetch-members')) as any[]; return dataMembers.map((dataMember: any) => Member.fromDBData(dataMember)); }, cacheFunc: async () => { return await this.dbCache.getMembers(this.id); }, cacheUpdateFunc: async (cacheData: Member[] | null, serverData: Member[]) => { function equal(cacheMember: Member, serverMember: Member) { return ( cacheMember.id === serverMember.id && cacheMember.displayName === serverMember.displayName && cacheMember.status === serverMember.status && cacheMember.avatarResourceId === serverMember.avatarResourceId && cacheMember.roleName === serverMember.roleName && cacheMember.roleColor === serverMember.roleColor && cacheMember.rolePriority === serverMember.rolePriority && cacheMember.privileges.join(',') === serverMember.privileges.join(',') ); } let changes = ClientController._getChanges(cacheData, serverData, equal); if (changes.updated.length == 0 && changes.added.length == 0 && changes.deleted.length == 0) { return false; } await this.dbCache.updateServerMembers(this.id, serverData); this._updateCachedMembers(serverData); if (changes.deleted.length > 0) { this.emit('deleted-members', changes.deleted); } if (changes.added.length > 0) { this.emit('added-members', changes.added); } if (changes.updated.length > 0) { this.emit('updated-members', changes.updated.map(change => ({ oldMember: change.oldDataPoint, newMember: change.newDataPoint }))); } return true; }, updateEventName: null, }); this._updateCachedMembers(members); return members; } async fetchChannels(): Promise { let changes: Changes | null = null; let channels = await this._fetchCachedAndVerify({ lock: this.channelsLock, serverFunc: async () => { let dataChannels = (await this._queryServer('fetch-channels')) as any[]; return dataChannels.map((dataChannel: any) => Channel.fromDBData(dataChannel)); }, cacheFunc: async () => { return await this.dbCache.getChannels(this.id); }, cacheUpdateFunc: async (cacheData: Channel[] | null, serverData: Channel[]) => { function equal(cacheChannel: Channel, serverChannel: Channel) { return cacheChannel.id == serverChannel.id && cacheChannel.index == serverChannel.index && cacheChannel.name == serverChannel.name && cacheChannel.flavorText == serverChannel.flavorText; } let changes = ClientController._getChanges(cacheData, serverData, equal); if (changes.updated.length == 0 && changes.added.length == 0 && changes.deleted.length == 0) { return false; } // All cache updates are handled by internal event handlers if (changes.deleted.length > 0) { this.emit('deleted-channels', changes.deleted); } if (changes.added.length > 0) { this.emit('added-channels', changes.added); } if (changes.updated.length > 0) { this.emit('updated-channels', changes.updated.map(change => ({ oldChannel: change.oldDataPoint, newChannel: change.newDataPoint }))); } return true; }, updateEventName: null, }); this._updateCachedChannels(channels); return channels; } // channelId: the id of the channel the messages were fetched from (used in the events) // firstMessageId: the first message in the current list of messages // lastMessageId: the last element in the current list of messages private async updateMessageCache( channelId: string, firstMessageId: string | null, lastMessageId: string | null, cacheMessages: Message[] | null, serverMessages: Message[] ): Promise { function equal(cacheMessage: Message, serverMessage: Message) { return cacheMessage.id == serverMessage.id && cacheMessage.channel.id == serverMessage.channel.id && cacheMessage.member.id == serverMessage.member.id && cacheMessage.sent.getTime() == serverMessage.sent.getTime() && cacheMessage.text == serverMessage.text && cacheMessage.resourceId == serverMessage.resourceId && cacheMessage.resourceName == serverMessage.resourceName && cacheMessage.resourceWidth == serverMessage.resourceWidth && cacheMessage.resourceHeight == serverMessage.resourceHeight && cacheMessage.resourcePreviewId == serverMessage.resourcePreviewId } let diffFound = false; let updatedMessages: { oldMessage: Message, newMessage: Message }[] = []; let addedAfter = new Map(); // messageId -> message added after this message let addedBefore = new Map(); // messageId -> message added before this message for (let i = 0; i < serverMessages.length; ++i) { let serverMessage = serverMessages[i]; let cacheMessage = cacheMessages?.find(m => serverMessage.id == m.id); if (cacheMessage) { if (!equal(cacheMessage, serverMessage)) { diffFound = true; updatedMessages.push({ oldMessage: cacheMessage, newMessage: serverMessage, }); } } else { // items in server not in cache are added diffFound = true; let comesAfter = serverMessages[i - 1] || { id: lastMessageId }; // this message comesAfter comesAfter let comesBefore = serverMessages[i + 1] || { id: firstMessageId }; // this message comesBefore comesBefore addedAfter.set(comesAfter.id, serverMessage); addedBefore.set(comesBefore.id, serverMessage); } } let deletedMessages: Message[] = []; if (cacheMessages !== null) { for (let cacheMessage of cacheMessages) { let serverMessage = serverMessages.find(m => cacheMessage.id == m.id); if (serverMessage == null) { // items in cache not in server are deleted diffFound = true; deletedMessages.push(cacheMessage); } } } // Send out the events and update the cache if (cacheMessages !== null) { if (cacheMessages.length > 0 && deletedMessages.length === cacheMessages.length) { // if all of the cache data was invalid, it is likely that it needs to be cleared // this typically happens when the server got a lot of new messages since the cache // was last updated await this.dbCache.clearServerMessages(this.id, deletedMessages[0].channel.id); } else if (deletedMessages.length > 0) { // Messages from the cache that come on the far side of the request are marked as deleted // so they are deleted from the UI. However, they should not be removed from the cache // yet since they most likely have not been pulled from the server yet. (and will be // very likely pulled from the server on the next fetch). let cacheDeletedMessages = deletedMessages.slice(); let i: number | null = null; if (firstMessageId || (firstMessageId == null && lastMessageId == null)) { // before & recent i = 0; while (cacheDeletedMessages.length > 0 && cacheDeletedMessages[0].id == cacheMessages[i].id) { cacheDeletedMessages.shift(); i++; } } if (lastMessageId) { // after i = 0; while (cacheDeletedMessages.length > 0 && cacheDeletedMessages[cacheDeletedMessages.length - 1].id == cacheMessages[cacheMessages.length - 1 - i].id) { cacheDeletedMessages.pop(); i++; } } //LOG.debug('skipping ' + i + ' deleted messages on the cache side -> deleting ' + cacheDeletedMessages.length + ' cache messages instead of ' + deletedMessages.length); if (cacheDeletedMessages.length > 0) { await this.dbCache.deleteServerMessages(this.id, cacheDeletedMessages.map(m => m.id)); } } } if (deletedMessages.length > 0) { // make sure to do deleted before added this.emit('deleted-messages', this.channels.get(channelId), deletedMessages); } if (updatedMessages.length > 0) { this.emit('updated-messages', this.channels.get(channelId), updatedMessages); } if (addedAfter.size > 0 || addedBefore.size > 0) { this.emit('added-messages', this.channels.get(channelId), addedAfter, addedBefore); } return diffFound; } async fetchMessagesRecent(channelId: string, number: number): Promise { await this.ensureMembers(); await this.ensureChannels(); let messages = await this._fetchCachedAndVerify({ serverFunc: async () => { let dataMessages = await this._queryServer('fetch-messages-recent', channelId, number) as any[]; return dataMessages.map((dataMessage: any) => Message.fromDBData(dataMessage, this.members, this.channels)); }, cacheFunc: async () => { return await this.dbCache.getMessagesRecent(this.id, channelId, number, this.members, this.channels); }, cacheUpdateFunc: async (cacheData: Message[] | null, serverData: Message[]) => { return await this.updateMessageCache(channelId, null, null, cacheData, serverData); }, updateEventName: null // all events are handled in diffFunc }); this._recentMessages.putRecentMessages(this.id, channelId, messages); return messages; } async fetchMessagesBefore(channelId: string, messageId: string, number: number): Promise { await this.ensureMembers(); await this.ensureChannels(); let messages = await this._fetchCachedAndVerify({ serverFunc: async () => { let dataMessages = await this._queryServer('fetch-messages-before', channelId, messageId, number) as any[]; return dataMessages.map((dataMessage: any) => Message.fromDBData(dataMessage, this.members, this.channels)); }, cacheFunc: async () => { return await this.dbCache.getMessagesBefore(this.id, channelId, messageId, number, this.members, this.channels); }, cacheUpdateFunc: async (cacheData: Message[] | null, serverData: Message[]) => { return await this.updateMessageCache(channelId, messageId, null, cacheData, serverData); }, updateEventName: null // all events are handled in diffFunc }); return messages; } async fetchMessagesAfter(channelId: string, messageId: string, number: number): Promise { await this.ensureMembers(); await this.ensureChannels(); let messages = await this._fetchCachedAndVerify({ serverFunc: async () => { let dataMessages = await this._queryServer('fetch-messages-after', channelId, messageId, number) as any[]; return dataMessages.map((dataMessage: any) => Message.fromDBData(dataMessage, this.members, this.channels)); }, cacheFunc: async () => { return await this.dbCache.getMessagesAfter(this.id, channelId, messageId, number, this.members, this.channels); }, cacheUpdateFunc: async (cacheData: Message[] | null, serverData: Message[]) => { return await this.updateMessageCache(channelId, null, messageId, cacheData, serverData); }, updateEventName: null // all events are handled in diffFunc }); return messages; } async _fetchResourceInternal(resourceId: string): Promise { // not using standard _fetchCached here because server-side resources never change. // rather, the resource_id would be updated if it changes for a message, server icon, avatar, etc. // this provides for a much simpler cache system (3 stages, client-memory, client-db, server-side) // since all serverId / resourceId pairs will never update their resource buffers, this cache becomes exceedingly simple let resourceCacheDataBuff = ResourceRAMCache.getResource(this.id, resourceId); if (resourceCacheDataBuff != null) { return resourceCacheDataBuff; } let cacheData = await this.dbCache.getResource(this.id, resourceId); if (cacheData !== null) { ResourceRAMCache.putResource(this.id, resourceId, cacheData.data); return cacheData.data; } // Note: Not pre-requesting from server asynchronously to reduce fetch-resource requests let serverData = await this._queryServer('fetch-resource', resourceId); ResourceRAMCache.putResource(this.id, resourceId, serverData.data); await this.dbCache.upsertServerResources(this.id, [ serverData ]); return serverData.data; } /** * Deduplicates client-side resource requests. Useful for when the client wants to fetch an avatar * multiple times for the channel feed. Or if there is a duplicate image in the feed. * @param resourceId */ async fetchResource(resourceId: string): Promise { return await new Promise(async (resolve, reject) => { let resultFunc = (err: any, resourceBuff: Buffer | null) => { if (err) { reject(err); } else if (resourceBuff) { resolve(resourceBuff); } else { reject(new ShouldNeverHappenError('no buffer or error!')); } } if (this.resourceCallbacks.has(resourceId)) { this.resourceCallbacks.get(resourceId)?.push(resultFunc); } else { this.resourceCallbacks.set(resourceId, [ resultFunc ]); let result: Buffer | null = null; let err: any = null; try { result = await this._fetchResourceInternal(resourceId); } catch (e) { err = e; } for (let callbackFunc of this.resourceCallbacks.get(resourceId) ?? []) { callbackFunc(err, result); } this.resourceCallbacks.delete(resourceId); } }); } async sendMessage(channelId: string, text: string) { let dataMessage = await this._queryServer('send-message', channelId, text); await this.ensureMembers(); await this.ensureChannels(); return Message.fromDBData(dataMessage, this.members, this.channels); } async sendMessageWithResource(channelId: string, text: string | null, resourceBuff: Buffer, resourceName: string) { let dataMessage = await this._queryServer('send-message-with-resource', channelId, text, resourceBuff, resourceName); await this.ensureMembers(); await this.ensureChannels(); return Message.fromDBData(dataMessage, this.members, this.channels); } async setStatus(status: string): Promise { // wow, that's simple for a change await this._queryServer('set-status', status); } async setDisplayName(displayName: string): Promise { await this._queryServer('set-display-name', displayName); } async setAvatar(avatarBuff: Buffer): Promise { await this._queryServer('set-avatar', avatarBuff); } async setName(name: string): Promise { return ServerMetaData.fromServerDBData(await this._queryServer('set-name', name)); } async setIcon(iconBuff: Buffer): Promise { return ServerMetaData.fromServerDBData(await this._queryServer('set-icon', iconBuff)); } async updateChannel(channelId: string, name: string, flavorText: string | null): Promise { return Channel.fromDBData(await this._queryServer('update-channel', channelId, name, flavorText)); } async createChannel(name: string, flavorText: string | null): Promise { return Channel.fromDBData(await this._queryServer('create-text-channel', name, flavorText)); } async queryTokens(): Promise { // No cacheing for now, this is a relatively small request and comes // after a context-menu click so it's not as important to cache as the // channels, members, and messages let dataTokens = await this._queryServer('fetch-tokens'); await this.ensureMembers(); return dataTokens.map(dataToken => { return Token.fromDBData(dataToken, this.members); }); } async revokeToken(token: string): Promise { return await this._queryServer('revoke-token', token); } close(): void { this.socket.close(); } }