cordis/client/webapp/client-controller.ts
2021-11-07 15:57:09 -06:00

969 lines
38 KiB
TypeScript

import * as electronRemote from '@electron/remote';
const electronConsole = electronRemote.getGlobal('console') as Console;
import Logger from '../../logger/logger';
const LOG = Logger.create(__filename, electronConsole);
import * as crypto from 'crypto';
import { EventEmitter } from 'events';
import * as socketio from 'socket.io-client';
import ConcurrentQueue from '../../concurrent-queue/concurrent-queue';
import { Message, Member, Channel, Changes, ConnectionInfo, CacheServerData, ServerMetaData, ServerConfig, ShouldNeverHappenError, Token } from './data-types';
import DBCache from './db-cache';
import ResourceRAMCache from './resource-ram-cache';
import RecentMessageRAMCache from './message-ram-cache';
import { channel } from 'diagnostics_channel';
import Q from './q-module';
// Events:
// 'connected' function() called when connected to the server
// 'disconnected' function() called when connection to the server is lost
// 'verified' function() called when verification handshake is completed
// 'update-server' function(serverData) called when the server metadata updates
// 'deleted-members' function(members) called when members were deleted on the server-side
// 'updated-members' function(data: Array [ { oldMember, newMember } ]) called when a member was updated on the server-side
// 'added-members' function(members) called when members were added on the server-side
// 'deleted-channels' function(channels) called when channels were deleted on the server-side
// 'updated-channels' function(data: Array [ { oldChannel, newChannel } ]) called when a channel was updated on the server-side
// 'added-channels' function(channels) called when channels are added on the server-side
// 'new-message' function(message) called when a message is received in a channel
// 'deleted-messages' function(messages) called when messages were deleted on the server-side
// 'updated-messages' function(data: Array [ { oldMessage, newMessage } ]) called when messages were updated on the server-side
// 'added-messages' function(addedAfter : Map<messageId -> addedMessage>, addedBefore : <messageId -> addedMessage>) called when messages were added on the server-side (that were not in our client-side db).
// Functions:
// these three have grab counterparts
// async fetchMetadata() Fetches the server information as { name, icon_resource_id }
// async fetchMembers() Fetches the server members as [ { id, display_name, avatar_resource_id }, ... ]
// async fetchChannels() Fetches the available channels as [ { id, name }, ... ]
// async fetchMessagesRecent(channelId, number) Fetches the most recent number messages in a channel [ Message, ... ]
// async fetchMessagesBefore(channelId, messageId, number) Fetches number messages before a specified message [ Message, ... ]
// async fetchMessagesAfter(channelId, messageId, number) Fetches number messages after a specified message [ Message, ... ]
// TODO: change this to grab
// async fetchResource(resourceId) Fetches the resource associated with a specified resrouceId
// async sendMessage(channelId, text) Sends a text message to a channel, returning the sent Message
// async sendMessageWithResource(channelId, text, resource, resourceName) Sends a text message with a resource to a channel, returning the sent Message. text is optional
// async setStatus(status) Set the current logged in user's status
// async setDisplayName(displayName) Sets the current logged in user's display name
// async setAvatar(avatarBuff)
// async updateChannel(channelId, name, flavorText) Updates a channel's name and flavor text
// async queryTokens() Queries for the login tokens as [ { token, member_id, created, expires }, ... ]
// async revokeToken(token) Revokes an outstanding token
interface FetchCachedAndVerifyProps<ClientType, ServerType> {
lock?: ConcurrentQueue<void>,
serverFunc: (() => Promise<ServerType>)
cacheFunc: (() => Promise<ClientType | null>),
cacheUpdateFunc: ((cacheData: ClientType | null, serverData: ServerType) => Promise<boolean>), // returns true if changes were made to the cache
updateEventName?: string | null,
}
export default class ClientController extends EventEmitter {
public id: string;
public memberId: string;
public url: string;
public publicKey: crypto.KeyObject;
public privateKey: crypto.KeyObject;
public serverCert: string
public socket: socketio.Socket;
private _metadata: ServerMetaData | CacheServerData | null;
private _recentMessages: RecentMessageRAMCache;
public channels: Map<string, Channel>;
public members: Map<string, Member>;
public channelsLock: ConcurrentQueue<void>;
public membersLock: ConcurrentQueue<void>;
public isVerified: boolean;
public resourceCallbacks: Map<string, ((err: any, resourceBuff: Buffer | null) => Promise<void> | void)[]>;
public dedupedCallbacks: Map<string, (() => Promise<void> | void)[]>;
constructor(config: ServerConfig) {
super();
// TODO: fetch these from the cache when they are needed rather than storing them in memory (especially private key)
let publicKey = typeof config.publicKey === 'string' ? crypto.createPublicKey(config.publicKey) : config.publicKey;
let privateKey = typeof config.privateKey === 'string' ? crypto.createPrivateKey(config.privateKey) : config.privateKey;
this.id = config.serverId + ''; // this is the client-side server id (from Cache)
this.memberId = config.memberId;
this.url = config.url;
this.publicKey = publicKey;
this.privateKey = privateKey;
this.serverCert = config.serverCert;
this.socket = socketio.connect(this.url, {
forceNew: true,
ca: this.serverCert, // this provides identity verification for the server
});
this._metadata = null; // use grabMetadata();
this._recentMessages = new RecentMessageRAMCache(); // use grabRecentMessages();
// These are used to make message objects more useful
// TODO: make these private
this.channels = new Map<string, Channel>(); // use grabChannels();
this.members = new Map<string, Member>(); // use grabMembers();
this.channelsLock = new ConcurrentQueue<void>(1);
this.membersLock = new ConcurrentQueue<void>(1);
this.isVerified = false;
this.resourceCallbacks = new Map<string, ((err: any, resourceBuff: Buffer | null) => Promise<void> | void)[]>(); // resourceId -> [ callbackFunc, ... ];
this.dedupedCallbacks = new Map<string, (() => Promise<void> | void)[]>(); // dedupeId -> [ callbackFunc, ... ];
this._bindSocket();
this._bindInternalEvents();
}
_bindSocket(): void {
this.socket.on('connect', async () => {
LOG.info('connected to server#' + this.id);
this.emit('connected');
// TODO: re-verify on verification failure?
await this.verify();
});
this.socket.on('disconnect', () => {
LOG.info('disconnected from server#' + this.id);
this.isVerified = false;
this._metadata = null;
this._recentMessages.clear();
this.channels.clear();
this.members.clear();
this.emit('disconnected');
});
this.socket.on('new-message', async (dataMessage) => {
await this.ensureMembers();
await this.ensureChannels();
let message = Message.fromDBData(dataMessage, this.members, this.channels);
await DBCache.upsertServerMessages(this.id, message.channel.id, [ message ]);
LOG.info(message.toString());
this._recentMessages.addNewMessage(this.id, message);
this.emit('new-message', message);
});
this.socket.on('update-member', async (member) => {
await this.ensureMembers();
let oldMember = this.members.get(member.id);
if (oldMember) {
this.emit('updated-members', [ { oldMember: oldMember, newMember: member } ]);
} else {
this.emit('added-members', [ member ]);
}
});
this.socket.on('update-channel', async (channel) => {
await this.ensureChannels();
let oldChannel = this.channels.get(channel.id);
if (oldChannel) {
this.emit('updated-channels', [ { oldChannel: oldChannel, newChannel: channel } ]);
} else {
this.emit('added-channels', [ channel ]);
}
});
this.socket.on('new-channel', async (channel) => {
await this.ensureChannels();
this.emit('added-channels', [ channel ]);
});
this.socket.on('update-server', async (serverMeta) => {
await DBCache.updateServer(this.id, serverMeta);
this.emit('update-server', serverMeta);
});
}
_bindInternalEvents(): void {
this.on('added-members', async (members: Member[]) => {
for (let member of members) {
this.members.set(member.id, member);
}
await DBCache.updateServerMembers(this.id, Array.from(this.members.values()));
});
this.on('updated-members', async (data: { oldMember: Member, newMember: Member }[]) => {
for (const { oldMember, newMember } of data) {
this.members.set(newMember.id, newMember);
}
await DBCache.updateServerMembers(this.id, Array.from(this.members.values()));
});
this.on('deleted-members', async (members: Member[]) => {
for (let member of members) {
this.members.delete(member.id);
}
await DBCache.updateServerMembers(this.id, Array.from(this.members.values()));
});
this.on('added-channels', async (channels: Channel[]) => {
for (let channel of channels) {
this.channels.set(channel.id, channel);
}
await DBCache.updateServerChannels(this.id, Array.from(this.channels.values()));
});
this.on('updated-channels', async (data: { oldChannel: Channel, newChannel: Channel }[]) => {
for (const { oldChannel, newChannel } of data) {
this.channels.set(newChannel.id, newChannel);
}
await DBCache.updateServerChannels(this.id, Array.from(this.channels.values()));
});
this.on('deleted-channels', async (channels: Channel[]) => {
for (let channel of channels) {
this.channels.delete(channel.id);
}
await DBCache.updateServerChannels(this.id, Array.from(this.channels.values()));
});
this.on('added-messages', async (channel: Channel, addedAfter: Map<string, Message>, addedBefore: Map<string, Message>) => {
// Adding messages is surprisingly complicated (see script.js's added-messages function)
// so we can just drop the channel and refresh it once if any messages got added while
// we were gone. Further, it is probably best to make 100% sure that the script.js
// implementation is correct before copying it elsewhere. (I'm 90% sure it is correct)
// Getting this correct and actually implementing the added-messages feature in the
// RAM cache would be useful for first-time joiners to a server, getting the channel
// messages for the first time
// Alternatively, just store the date in the message and use order-by
this._recentMessages.dropChannel(this.id, channel.id);
await DBCache.upsertServerMessages(this.id, channel.id, Array.from(addedAfter.values()));
});
this.on('updated-messages', async (channel: Channel, data: { oldMessage: Message, newMessage: Message }[]) => {
for (let { oldMessage, newMessage } of data) {
this._recentMessages.updateMessage(this.id, oldMessage, newMessage);
}
await DBCache.upsertServerMessages(this.id, channel.id, data.map(change => change.newMessage));
});
this.on('deleted-messages', async (_channel: Channel, messages: Message[]) => {
for (let message of messages) {
this._recentMessages.deleteMessage(this.id, message);
}
await DBCache.deleteServerMessages(this.id, messages.map(message => message.id));
});
}
_updateCachedMembers(members: Member[]): void {
this.members.clear();
for (let member of members) {
this.members.set(member.id, member);
}
}
_updateCachedChannels(channels: Channel[]): void {
this.channels.clear();
for (let channel of channels) {
this.channels.set(channel.id, channel);
}
}
/**
* Fetches data from the server but returns cached data if it already exists. If the server data comes back different
* from the cached data, emits an event with the server data
* @param lock A locking enqueue function to make sure that the cache data does not change while waiting for the server data to be requested.
* @param serverFunc A function() that returns the data from the server
* @param cacheFunc A function() that returns the data from the cache (returns null if no data)
* @param cacheUpdateFunc A function(cacheData, serverData) is called after the server data is fetched. It should be used to update the data in the cache
* @param updateEventName The name of the event to emit when the server data is different from the cache data (and the cache data is not null), null for no event
*/
async _fetchCachedAndVerify<T, U>(props: FetchCachedAndVerifyProps<T, U>): Promise<T | U> {
const { lock, serverFunc, cacheFunc, cacheUpdateFunc, updateEventName } = props;
return await new Promise(async (resolve, reject) => {
let func = async () => {
try {
let serverPromise = serverFunc();
let cacheData = await cacheFunc();
if (cacheData !== null) {
resolve(cacheData);
}
let serverData: U;
try {
serverData = await serverPromise;
} catch (e) {
if (cacheData !== null) {
// Print an error if this was already resolved
LOG.warn('Error fetching server data:', e);
return;
} else {
throw e;
}
}
// make sure the 'added/deleted/updated' events get run before returning the server data
try {
let changesMade = await cacheUpdateFunc(cacheData, serverData);
if (updateEventName && cacheData != null && changesMade) {
this.emit(updateEventName, serverData);
}
} catch (e) {
LOG.error('error handling cache update', e);
}
if (cacheData == null) {
resolve(serverData);
}
} catch (e) {
reject(e);
}
};
if (lock) {
await lock.push(func);
} else {
await func();
}
});
}
// This function is a promise for error stack tracking purposes.
async _socketEmitTimeout(ms: number, name: string, ...args: any[]): Promise<void> {
return new Promise<void>((resolve) => {
let socketArgs = args.slice(0, args.length - 1);
let respond = args[args.length - 1];
let cutoff = false;
let timeout = setTimeout(() => {
cutoff = true;
respond('emit timeout');
resolve();
}, ms);
this.socket.emit(name, ...socketArgs, (...respondArgs: any[]) => {
if (cutoff) {
return;
}
clearTimeout(timeout);
respond(...respondArgs);
resolve();
});
});
}
/**
* Queries data from the server
* @param endpoint The server-side socket endpoint
* @param args The server socket arguments
*/
async _queryServer(endpoint: string, ...args: any[]): Promise<any> {
// NOTE: socket.io may cause client-side memory leaks if the ack function is never called
await this.ensureVerified(5000);
LOG.silly('querying s#' + this.id + ' @' + endpoint + ' / [' + args.map(arg => LOG.inspect(arg)).join(', ') + ']');
return await new Promise((resolve, reject) => {
this._socketEmitTimeout(5000, endpoint, ...args, async (errMsg: string, serverData: any) => {
if (errMsg) {
reject(new Error('error fetching server data @' + endpoint + ' / [' + args.map(arg => LOG.inspect(arg)).join(', ') + ']: ' + errMsg));
} else {
resolve(serverData);
}
});
});
}
// TODO: Make this "T extends something with an id"
static _getChanges<T>(cacheData: T[] | null, serverData: T[], equal: ((a: T, b: T) => boolean)): Changes<T> {
if (cacheData === null) {
return { updated: [], added: serverData, deleted: [] };
}
let updated: { oldDataPoint: T, newDataPoint: T }[] = [];
let added: T[] = [];
let deleted: T[] = [];
for (let serverDataPoint of serverData) {
let cacheDataPoint = cacheData.find((m: T) => (m as any).id == (serverDataPoint as any).id);
if (cacheDataPoint) {
if (!equal(cacheDataPoint, serverDataPoint)) {
updated.push({ oldDataPoint: cacheDataPoint, newDataPoint: serverDataPoint });
}
} else {
added.push(serverDataPoint);
}
}
for (let cacheDataPoint of cacheData) {
let serverDataPoint = serverData.find((s: T) => (s as any).id == (cacheDataPoint as any).id);
if (serverDataPoint == null) {
deleted.push(cacheDataPoint);
}
}
return { updated, added, deleted };
}
disconnect(): void {
this.socket.disconnect();
}
async verifyWithServer(): Promise<void> {
await new Promise<void>(async (resolve, reject) => {
// Solve the server's challenge
let publicKeyBuff = this.publicKey.export({ type: 'spki', format: 'der' });
this._socketEmitTimeout(5000, 'challenge', publicKeyBuff, (errMsg, algo, type, challenge) => {
if (errMsg) {
reject(new Error('challenge request failed: ' + errMsg));
return;
}
const sign = crypto.createSign(algo);
sign.write(challenge);
sign.end();
let signature = sign.sign(this.privateKey, type);
this._socketEmitTimeout(5000, 'verify', signature, (errMsg, memberId) => {
if (errMsg) {
reject(new Error('verification request failed: ' + errMsg));
return;
}
this.memberId = memberId;
DBCache.updateServerMemberId(this.id, this.memberId);
this.isVerified = true;
LOG.info(`s#${this.id} client verified as u#${this.memberId}`);
resolve();
this.emit('verified');
});
});
});
}
async verify(): Promise<void> {
await new Promise<void>(async (resolve, reject) => {
// Solve the server's challenge
let publicKeyBuff = this.publicKey.export({ type: 'spki', format: 'der' });
this._socketEmitTimeout(5000, 'challenge', publicKeyBuff, (errMsg, algo, type, challenge) => {
if (errMsg) {
reject(new Error('challenge request failed: ' + errMsg));
return;
}
const sign = crypto.createSign(algo);
sign.write(challenge);
sign.end();
let signature = sign.sign(this.privateKey, type);
this._socketEmitTimeout(5000, 'verify', signature, (errMsg, memberId) => {
if (errMsg) {
reject(new Error('verification request failed: ' + errMsg));
return;
}
this.memberId = memberId;
DBCache.updateServerMemberId(this.id, this.memberId);
this.isVerified = true;
LOG.info(`verified at server#${this.id} as u#${this.memberId}`);
resolve();
this.emit('verified');
});
});
});
}
// timeout is in ms
async ensureVerified(timeout: number): Promise<void> {
if (this.isVerified) {
return;
}
await new Promise<void>((resolve, reject) => {
let timeoutId: any = null;
let listener = () => {
clearTimeout(timeoutId);
resolve();
}
this.once('verified', listener);
timeoutId = setTimeout(() => {
this.off('verified', listener);
reject(new Error('verification timeout'));
}, timeout);
});
}
async ensureMetadata(): Promise<void> {
if (this._metadata !== null) return;
await this.fetchMetadata();
}
async ensureMembers(): Promise<void> {
if (this.members.size > 0) return;
await this.fetchMembers();
}
async ensureChannels(): Promise<void> {
if (this.channels.size > 0) return;
await this.fetchChannels();
}
async getMyMember(): Promise<Member> {
await this.ensureMembers();
return this.members.get(this.memberId) as Member;
}
async grabMetadata(): Promise<ServerMetaData | CacheServerData> {
await this.ensureMetadata();
return this._metadata as ServerMetaData | CacheServerData;
}
async grabMembers(): Promise<Member[]> {
await this.ensureMembers();
return Array.from(this.members.values());
}
async grabChannels(): Promise<Channel[]> {
await this.ensureChannels();
return Array.from(this.channels.values());
}
async grabRecentMessages(channelId: string, number: number): Promise<Message[]> {
let cached = this._recentMessages.getRecentMessages(this.id, channelId, number);
if (cached !== null) return cached;
return await this.fetchMessagesRecent(channelId, number);
}
async fetchMetadata(): Promise<ServerMetaData | CacheServerData> {
function isDifferent(cacheData: CacheServerData | null, serverData: ServerMetaData) {
if (cacheData === null) return true;
return !!(cacheData.name != serverData.name || cacheData.iconResourceId != serverData.iconResourceId)
}
let metadata = await this._fetchCachedAndVerify<CacheServerData, ServerMetaData>({
serverFunc: async () => { return ServerMetaData.fromServerDBData(await this._queryServer('fetch-server')); },
cacheFunc: async () => { return await DBCache.getServer(this.id); },
cacheUpdateFunc: async (cacheData: CacheServerData | null, serverData: ServerMetaData) => {
if (!isDifferent(cacheData, serverData)) return false;
await DBCache.updateServer(this.id, serverData);
return true;
},
updateEventName: 'update-server',
});
this._metadata = metadata as ServerMetaData | CacheServerData;
return metadata;
}
// if not verified, will attempt to load from cache rather than waiting for verification
// returns { avatar_resource_id, display_name, status }
async fetchConnectionInfo(): Promise<ConnectionInfo> {
let connection: ConnectionInfo = {
id: null,
avatarResourceId: null,
displayName: 'Connecting...',
status: '',
privileges: [],
roleName: null,
roleColor: null,
rolePriority: null
}
if (this.isVerified) {
await this.ensureMembers();
let member = this.members.get(this.memberId);
if (member) {
connection.id = member.id;
connection.avatarResourceId = member.avatarResourceId;
connection.displayName = member.displayName;
connection.status = member.status;
connection.roleName = member.roleName;
connection.roleColor = member.roleColor;
connection.rolePriority = member.rolePriority;
connection.privileges = member.privileges;
} else {
LOG.warn('Unable to find self in members', this.members);
}
} else {
let cacheMembers = await DBCache.getMembers(this.id);
if (cacheMembers) {
let member = cacheMembers.find(m => m.id == this.memberId);
if (member) {
connection.id = member.id;
connection.avatarResourceId = member.avatarResourceId;
connection.displayName = member.displayName;
connection.status = 'connecting';
connection.privileges = [];
} else {
LOG.warn('Unable to find self in cached members');
}
}
}
return connection;
}
async fetchMembers(): Promise<Member[]> {
let members = await this._fetchCachedAndVerify<Member[], Member[]>({
lock: this.membersLock,
serverFunc: async () => {
let dataMembers = (await this._queryServer('fetch-members')) as any[];
return dataMembers.map((dataMember: any) => Member.fromDBData(dataMember));
},
cacheFunc: async () => { return await DBCache.getMembers(this.id); },
cacheUpdateFunc: async (cacheData: Member[] | null, serverData: Member[]) => {
function equal(cacheMember: Member, serverMember: Member) {
return (
cacheMember.id === serverMember.id &&
cacheMember.displayName === serverMember.displayName &&
cacheMember.status === serverMember.status &&
cacheMember.avatarResourceId === serverMember.avatarResourceId &&
cacheMember.roleName === serverMember.roleName &&
cacheMember.roleColor === serverMember.roleColor &&
cacheMember.rolePriority === serverMember.rolePriority &&
cacheMember.privileges.join(',') === serverMember.privileges.join(',')
);
}
let changes = ClientController._getChanges<Member>(cacheData, serverData, equal);
if (changes.updated.length == 0 && changes.added.length == 0 && changes.deleted.length == 0) {
return false;
}
await DBCache.updateServerMembers(this.id, serverData);
this._updateCachedMembers(serverData);
if (changes.deleted.length > 0) {
this.emit('deleted-members', changes.deleted);
}
if (changes.added.length > 0) {
this.emit('added-members', changes.added);
}
if (changes.updated.length > 0) {
this.emit('updated-members', changes.updated.map(change => ({ oldMember: change.oldDataPoint, newMember: change.newDataPoint })));
}
return true;
},
updateEventName: null,
});
this._updateCachedMembers(members);
return members;
}
async fetchChannels(): Promise<Channel[]> {
let changes: Changes<Channel> | null = null;
let channels = await this._fetchCachedAndVerify<Channel[], Channel[]>({
lock: this.channelsLock,
serverFunc: async () => {
let dataChannels = (await this._queryServer('fetch-channels')) as any[];
return dataChannels.map((dataChannel: any) => Channel.fromDBData(dataChannel));
},
cacheFunc: async () => { return await DBCache.getChannels(this.id); },
cacheUpdateFunc: async (cacheData: Channel[] | null, serverData: Channel[]) => {
function equal(cacheChannel: Channel, serverChannel: Channel) {
return cacheChannel.id == serverChannel.id &&
cacheChannel.index == serverChannel.index &&
cacheChannel.name == serverChannel.name &&
cacheChannel.flavorText == serverChannel.flavorText;
}
let changes = ClientController._getChanges(cacheData, serverData, equal);
if (changes.updated.length == 0 && changes.added.length == 0 && changes.deleted.length == 0) {
return false;
}
// All cache updates are handled by internal event handlers
if (changes.deleted.length > 0) {
this.emit('deleted-channels', changes.deleted);
}
if (changes.added.length > 0) {
this.emit('added-channels', changes.added);
}
if (changes.updated.length > 0) {
this.emit('updated-channels', changes.updated.map(change => ({ oldChannel: change.oldDataPoint, newChannel: change.newDataPoint })));
}
return true;
},
updateEventName: null,
});
this._updateCachedChannels(channels);
return channels;
}
// channelId: the id of the channel the messages were fetched from (used in the events)
// firstMessageId: the first message in the current list of messages
// lastMessageId: the last element in the current list of messages
private async updateMessageCache(
channelId: string,
firstMessageId: string | null,
lastMessageId: string | null,
cacheMessages: Message[] | null,
serverMessages: Message[]
): Promise<boolean> {
function equal(cacheMessage: Message, serverMessage: Message) {
return cacheMessage.id == serverMessage.id &&
cacheMessage.channel.id == serverMessage.channel.id &&
cacheMessage.member.id == serverMessage.member.id &&
cacheMessage.sent.getTime() == serverMessage.sent.getTime() &&
cacheMessage.text == serverMessage.text &&
cacheMessage.resourceId == serverMessage.resourceId &&
cacheMessage.resourceName == serverMessage.resourceName &&
cacheMessage.resourceWidth == serverMessage.resourceWidth &&
cacheMessage.resourceHeight == serverMessage.resourceHeight &&
cacheMessage.resourcePreviewId == serverMessage.resourcePreviewId
}
let diffFound = false;
let updatedMessages: { oldMessage: Message, newMessage: Message }[] = [];
let addedAfter = new Map<string, Message>(); // messageId -> message added after this message
let addedBefore = new Map<string, Message>(); // messageId -> message added before this message
for (let i = 0; i < serverMessages.length; ++i) {
let serverMessage = serverMessages[i];
let cacheMessage = cacheMessages?.find(m => serverMessage.id == m.id);
if (cacheMessage) {
if (!equal(cacheMessage, serverMessage)) {
diffFound = true;
updatedMessages.push({
oldMessage: cacheMessage,
newMessage: serverMessage,
});
}
} else {
// items in server not in cache are added
diffFound = true;
let comesAfter = serverMessages[i - 1] || { id: lastMessageId }; // this message comesAfter comesAfter
let comesBefore = serverMessages[i + 1] || { id: firstMessageId }; // this message comesBefore comesBefore
addedAfter.set(comesAfter.id, serverMessage);
addedBefore.set(comesBefore.id, serverMessage);
}
}
let deletedMessages: Message[] = [];
if (cacheMessages !== null) {
for (let cacheMessage of cacheMessages) {
let serverMessage = serverMessages.find(m => cacheMessage.id == m.id);
if (serverMessage == null) {
// items in cache not in server are deleted
diffFound = true;
deletedMessages.push(cacheMessage);
}
}
}
// Send out the events and update the cache
if (cacheMessages !== null) {
if (cacheMessages.length > 0 && deletedMessages.length === cacheMessages.length) {
// if all of the cache data was invalid, it is likely that it needs to be cleared
// this typically happens when the server got a lot of new messages since the cache
// was last updated
await DBCache.clearServerMessages(this.id, deletedMessages[0].channel.id);
} else if (deletedMessages.length > 0) {
// Messages from the cache that come on the far side of the request are marked as deleted
// so they are deleted from the UI. However, they should not be removed from the cache
// yet since they most likely have not been pulled from the server yet. (and will be
// very likely pulled from the server on the next fetch).
let cacheDeletedMessages = deletedMessages.slice();
let i: number | null = null;
if (firstMessageId || (firstMessageId == null && lastMessageId == null)) { // before & recent
i = 0;
while (cacheDeletedMessages.length > 0 && cacheDeletedMessages[0].id == cacheMessages[i].id) {
cacheDeletedMessages.shift();
i++;
}
}
if (lastMessageId) { // after
i = 0;
while (cacheDeletedMessages.length > 0 && cacheDeletedMessages[cacheDeletedMessages.length - 1].id == cacheMessages[cacheMessages.length - 1 - i].id) {
cacheDeletedMessages.pop();
i++;
}
}
//LOG.debug('skipping ' + i + ' deleted messages on the cache side -> deleting ' + cacheDeletedMessages.length + ' cache messages instead of ' + deletedMessages.length);
if (cacheDeletedMessages.length > 0) {
await DBCache.deleteServerMessages(this.id, cacheDeletedMessages.map(m => m.id));
}
}
}
if (deletedMessages.length > 0) { // make sure to do deleted before added
this.emit('deleted-messages', this.channels.get(channelId), deletedMessages);
}
if (updatedMessages.length > 0) {
this.emit('updated-messages', this.channels.get(channelId), updatedMessages);
}
if (addedAfter.size > 0 || addedBefore.size > 0) {
this.emit('added-messages', this.channels.get(channelId), addedAfter, addedBefore);
}
return diffFound;
}
async fetchMessagesRecent(channelId: string, number: number): Promise<Message[]> {
await this.ensureMembers();
await this.ensureChannels();
let messages = await this._fetchCachedAndVerify<Message[], Message[]>({
serverFunc: async () => {
let dataMessages = await this._queryServer('fetch-messages-recent', channelId, number) as any[];
return dataMessages.map((dataMessage: any) => Message.fromDBData(dataMessage, this.members, this.channels));
},
cacheFunc: async () => {
return await DBCache.getMessagesRecent(this.id, channelId, number, this.members, this.channels);
},
cacheUpdateFunc: async (cacheData: Message[] | null, serverData: Message[]) => {
return await this.updateMessageCache(channelId, null, null, cacheData, serverData);
},
updateEventName: null // all events are handled in diffFunc
});
this._recentMessages.putRecentMessages(this.id, channelId, messages);
return messages;
}
async fetchMessagesBefore(channelId: string, messageId: string, number: number): Promise<Message[] | null> {
await this.ensureMembers();
await this.ensureChannels();
let messages = await this._fetchCachedAndVerify<Message[], Message[] | null>({
serverFunc: async () => {
let dataMessages = await this._queryServer('fetch-messages-before', channelId, messageId, number) as any[];
return dataMessages.map((dataMessage: any) => Message.fromDBData(dataMessage, this.members, this.channels));
},
cacheFunc: async () => {
return await DBCache.getMessagesBefore(this.id, channelId, messageId, number, this.members, this.channels);
},
cacheUpdateFunc: async (cacheData: Message[] | null, serverData: Message[]) => {
return await this.updateMessageCache(channelId, messageId, null, cacheData, serverData);
},
updateEventName: null // all events are handled in diffFunc
});
return messages;
}
async fetchMessagesAfter(channelId: string, messageId: string, number: number): Promise<Message[] | null> {
await this.ensureMembers();
await this.ensureChannels();
let messages = await this._fetchCachedAndVerify<Message[], Message[] | null>({
serverFunc: async () => {
let dataMessages = await this._queryServer('fetch-messages-after', channelId, messageId, number) as any[];
return dataMessages.map((dataMessage: any) => Message.fromDBData(dataMessage, this.members, this.channels));
},
cacheFunc: async () => { return await DBCache.getMessagesAfter(this.id, channelId, messageId, number, this.members, this.channels); },
cacheUpdateFunc: async (cacheData: Message[] | null, serverData: Message[]) => {
return await this.updateMessageCache(channelId, null, messageId, cacheData, serverData);
},
updateEventName: null // all events are handled in diffFunc
});
return messages;
}
async _fetchResourceInternal(resourceId: string): Promise<Buffer> {
// not using standard _fetchCached here because server-side resources never change.
// rather, the resource_id would be updated if it changes for a message, server icon, avatar, etc.
// this provides for a much simpler cache system (3 stages, client-memory, client-db, server-side)
// since all serverId / resourceId pairs will never update their resource buffers, this cache becomes exceedingly simple
let resourceCacheDataBuff = ResourceRAMCache.getResource(this.id, resourceId);
if (resourceCacheDataBuff != null) {
return resourceCacheDataBuff;
}
let cacheData = await DBCache.getResource(this.id, resourceId);
if (cacheData !== null) {
ResourceRAMCache.putResource(this.id, resourceId, cacheData.data);
return cacheData.data;
}
// Note: Not pre-requesting from server asynchronously to reduce fetch-resource requests
let serverData = await this._queryServer('fetch-resource', resourceId);
ResourceRAMCache.putResource(this.id, resourceId, serverData.data);
await DBCache.upsertServerResources(this.id, [ serverData ]);
return serverData.data;
}
/**
* Deduplicates client-side resource requests. Useful for when the client wants to fetch an avatar
* multiple times for the channel feed. Or if there is a duplicate image in the feed.
* @param resourceId
*/
async fetchResource(resourceId: string): Promise<Buffer> {
return await new Promise(async (resolve, reject) => {
let resultFunc = (err: any, resourceBuff: Buffer | null) => {
if (err) {
reject(err);
} else if (resourceBuff) {
resolve(resourceBuff);
} else {
reject(new ShouldNeverHappenError('no buffer or error!'));
}
}
if (this.resourceCallbacks.has(resourceId)) {
this.resourceCallbacks.get(resourceId)?.push(resultFunc);
} else {
this.resourceCallbacks.set(resourceId, [ resultFunc ]);
let result: Buffer | null = null;
let err: any = null;
try {
result = await this._fetchResourceInternal(resourceId);
} catch (e) {
err = e;
}
for (let callbackFunc of this.resourceCallbacks.get(resourceId) ?? []) {
callbackFunc(err, result);
}
this.resourceCallbacks.delete(resourceId);
}
});
}
async sendMessage(channelId: string, text: string) {
let dataMessage = await this._queryServer('send-message', channelId, text);
await this.ensureMembers();
await this.ensureChannels();
return Message.fromDBData(dataMessage, this.members, this.channels);
}
async sendMessageWithResource(channelId: string, text: string | null, resourceBuff: Buffer, resourceName: string) {
let dataMessage = await this._queryServer('send-message-with-resource', channelId, text, resourceBuff, resourceName);
await this.ensureMembers();
await this.ensureChannels();
return Message.fromDBData(dataMessage, this.members, this.channels);
}
async setStatus(status: string): Promise<void> {
// wow, that's simple for a change
await this._queryServer('set-status', status);
}
async setDisplayName(displayName: string): Promise<void> {
await this._queryServer('set-display-name', displayName);
}
async setAvatar(avatarBuff: Buffer): Promise<void> {
await this._queryServer('set-avatar', avatarBuff);
}
async setName(name: string): Promise<ServerMetaData> {
return ServerMetaData.fromServerDBData(await this._queryServer('set-name', name));
}
async setIcon(iconBuff: Buffer): Promise<ServerMetaData> {
return ServerMetaData.fromServerDBData(await this._queryServer('set-icon', iconBuff));
}
async updateChannel(channelId: string, name: string, flavorText: string | null): Promise<Channel> {
return Channel.fromDBData(await this._queryServer('update-channel', channelId, name, flavorText));
}
async createChannel(name: string, flavorText: string | null): Promise<Channel> {
return Channel.fromDBData(await this._queryServer('create-text-channel', name, flavorText));
}
async queryTokens(): Promise<Token[]> {
// No cacheing for now, this is a relatively small request and comes
// after a context-menu click so it's not as important to cache as the
// channels, members, and messages
let dataTokens = await this._queryServer('fetch-tokens');
await this.ensureMembers();
return dataTokens.map(dataToken => {
return Token.fromDBData(dataToken, this.members);
});
}
async revokeToken(token: string): Promise<void> {
return await this._queryServer('revoke-token', token);
}
close(): void {
this.socket.close();
}
}