import * as electronRemote from '@electron/remote'; const electronConsole = electronRemote.getGlobal('console') as Console; import Logger from '../../logger/logger'; const LOG = Logger.create(__filename, electronConsole); import * as fs from 'fs/promises'; import ConcurrentQueue from '../../concurrent-queue/concurrent-queue'; import Globals from './globals'; import * as sqlite from 'sqlite'; import * as sqlite3 from 'sqlite3'; import { Message, Member, Channel, Resource, NotInitializedError, ServerMetaData, ServerConfig, CacheServerData } from './data-types'; // A cache implemented using an sqlite database // Also stores configuration for server connections export default class DBCache { private TRANSACTION_QUEUE = new ConcurrentQueue(1); private constructor( private readonly db: sqlite.Database ) {} async beginTransaction(): Promise { await this.db.run('BEGIN TRANSACTION'); } async rollbackTransaction(): Promise { await this.db.run('ROLLBACK'); } async commitTransaction(): Promise { await this.db.run('COMMIT'); } async queueTransaction(func: (() => Promise)): Promise { await this.TRANSACTION_QUEUE.push(async () => { try { await this.beginTransaction(); await func(); await this.commitTransaction(); } catch (e) { await this.rollbackTransaction(); throw e; } }); } static async connect(): Promise { try { await fs.access('./db'); } catch (e) { await fs.mkdir('./db'); } return new DBCache(await sqlite.open({ driver: sqlite3.Database, filename: './db/cache.db' })); } async init(): Promise { await this.queueTransaction(async () => { await this.db.run(` CREATE TABLE IF NOT EXISTS identities ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT , public_key TEXT NOT NULL , private_key TEXT NOT NULL ) `); await this.db.run(` CREATE TABLE IF NOT EXISTS servers ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT , url TEXT NOT NULL , cert TEXT NOT NULL , name TEXT , icon_resource_id TEXT , member_id TEXT ) `); await this.db.run(` CREATE TABLE IF NOT EXISTS server_identities ( server_id INTEGER NOT NULL , identity_id INTEGER NOT NULL , FOREIGN KEY (server_id) REFERENCES servers(id) , FOREIGN KEY (identity_id) REFERENCES identities(id) ) `); await this.db.run(` CREATE TABLE IF NOT EXISTS members ( id TEXT NOT NULL , server_id INTEGER NOT NULL REFERENCES servers(id) , display_name TEXT NOT NULL , status TEXT NOT NULL , avatar_resource_id TEXT NOT NULL , role_name TEXT , role_color TEXT , role_priority INTEGER , privileges TEXT , CONSTRAINT members_id_server_id_con UNIQUE (id, server_id) ) `); await this.db.run(` CREATE TABLE IF NOT EXISTS channels ( id TEXT NOT NULL , server_id INTEGER NOT NULL REFERENCES servers(id) , "index" INTEGER NOT NULL , name TEXT NOT NULL , flavor_text TEXT , CONSTRAINT channels_id_server_id_con UNIQUE (id, server_id) ) `); await this.db.run(` CREATE TABLE IF NOT EXISTS resources ( id TEXT NOT NULL , server_id INTEGER NOT NULL REFERENCES servers(id) , hash BLOB NOT NULL , data BLOB NOT NULL , data_size INTEGER NOT NULL , last_used INTEGER NOT NULL , CONSTRAINT resources_id_server_id_con UNIQUE (id, server_id) ) `); await this.db.run('CREATE INDEX IF NOT EXISTS resources_data_size_idx ON resources (data_size)'); // note: no foreign key on resource_id since we may not have cached the resource yet await this.db.run(` CREATE TABLE IF NOT EXISTS messages ( id TEXT NOT NULL , server_id INTEGER NOT NULL REFERENCES servers(id) , channel_id TEXT NOT NULL REFERENCES channels(id) , member_id TEXT NOT NULL REFERENCES members(id) , sent_dtg INTEGER NOT NULL , text TEXT , resource_id TEXT , resource_name TEXT , resource_width INTEGER , resource_height INTEGER , resource_preview_id TEXT , CONSTRAINT messages_id_server_id_con UNIQUE (id, server_id) ) `); await this.db.run('CREATE INDEX IF NOT EXISTS messages_id_idx ON messages (id)'); await this.db.run('CREATE INDEX IF NOT EXISTS messages_sent_dtg_idx ON messages (sent_dtg)'); }); } async close(): Promise { await this.db.close(); } // dangerous! async reset(): Promise { await this.queueTransaction(async () => { await this.db.run('DROP TABLE IF EXISTS identities'); await this.db.run('DROP TABLE IF EXISTS servers'); await this.db.run('DROP TABLE IF EXISTS server_identities'); await this.db.run('DROP TABLE IF EXISTS members'); await this.db.run('DROP TABLE IF EXISTS channels'); await this.db.run('DROP TABLE IF EXISTS resources'); await this.db.run('DROP TABLE IF EXISTS messages'); }); } // returns the id of the identity inserted async addIdentity(publicKeyPem: string, privateKeyPem: string): Promise { let result = await this.db.run(` INSERT INTO identities (public_key, private_key) VALUES (?, ?) `, [ publicKeyPem, privateKeyPem ]); if (!result || result.changes !== 1) { throw new Error('unable to insert identity'); } return result.lastID as number; } // returns the id (client-side) of the server inserted async addServer(url: string, cert?: string, name?: string): Promise { let result = await this.db.run(` INSERT INTO servers (url, cert, name, icon_resource_id) VALUES (?, ?, ?, NULL) `, [ url, cert, name ]); if (!result || result.changes !== 1) { throw new Error('unable to insert server'); } return result.lastID as number; } async removeServer(serverId: string): Promise { let result = await this.db.run('DELETE FROM servers WHERE id=?', [ serverId ]); if (result.changes != 1) { throw new Error('unable to remove server'); } } async addServerIdentity(serverId: number, identityId: number): Promise { let result = await this.db.run(` INSERT INTO server_identities (server_id, identity_id) VALUES (?, ?) `, [ serverId, identityId ]); if (result.changes != 1) { throw new Error('unable to insert server identity'); } } async updateServer(serverId: string, serverMeta: ServerMetaData): Promise { let result = await this.db.run('UPDATE servers SET name=?, icon_resource_id=? WHERE id=?', [ serverMeta.name, serverMeta.iconResourceId, serverId ]); if (result.changes != 1) { throw new Error('unable to update server'); } } async updateServerMemberId(serverId: string, memberId: string): Promise { let result = await this.db.run('UPDATE servers SET member_id=? WHERE id=?', [ memberId, serverId ]); if (result.changes != 1) { throw new Error(`unable to update member id, s#${serverId}, mem#${memberId}`); } } async updateServerMembers(serverId: string, members: Member[]): Promise { await this.queueTransaction(async () => { await this.db.run('DELETE FROM members WHERE server_id=?', [ serverId ]); let stmt = await this.db.prepare('INSERT INTO members (id, server_id, display_name, status, avatar_resource_id, role_name, role_color, role_priority, privileges) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)'); for (let member of members) { let result = await stmt.run([ member.id, serverId, member.displayName, member.status, member.avatarResourceId, member.roleName, member.roleColor, member.rolePriority, member.privileges?.join(',') ]); if (result.changes != 1) { // note: probably want to warn and continue throw new Error('failed to insert member'); } } await stmt.finalize(); }); } async clearAllMemberStatus(serverId: string): Promise { await this.db.run(`UPDATE members SET status='unknown' WHERE server_id=?`, [ serverId ]); } async updateServerChannels(serverId: string, channels: Channel[]): Promise { console.log('setting to ' + channels.length + ' channels'); await this.queueTransaction(async () => { await this.db.run('DELETE FROM channels WHERE server_id=?', [ serverId ]); let stmt = await this.db.prepare('INSERT INTO channels (id, server_id, "index", name, flavor_text) VALUES (?, ?, ?, ?, ?)'); for (let channel of channels) { let result = await stmt.run([ channel.id, serverId, channel.index, channel.name, channel.flavorText ]); if (result.changes != 1) { // note: probably want to warn and continue throw new Error('failed to insert channel'); } } await stmt.finalize(); }); } // TODO: make this singular and a non-transaction based function? async upsertServerResources(serverId: string, resources: Resource[]): Promise { await this.queueTransaction(async () => { let currentSizeResult = await this.db.get('SELECT SUM(data_size) AS current_size FROM resources WHERE server_id=?', [ serverId ]); let currentSize = parseInt(currentSizeResult.current_size || 0); let stmt = await this.db.prepare(` INSERT INTO resources (id, server_id, hash, data, data_size, last_used) VALUES (?1, ?2, ?3, ?4, ?5, ?6) ON CONFLICT (id, server_id) DO UPDATE SET hash=?3, data=?4, last_used=?6 `); for (let resource of resources) { if (resource.data.length > Globals.MAX_CACHED_RESOURCE_SIZE) { continue; } while (resource.data.length + currentSize > Globals.MAX_SERVER_RESOURCE_CACHE_SIZE) { let targetResult = await this.db.get('SELECT id, data_size FROM resources ORDER BY last_used ASC LIMIT 1'); let deleteResult = await this.db.run('DELETE FROM resources WHERE id=?', [ targetResult.id ]); if (deleteResult.changes != 1) { throw new Error('failed to delete excess resource'); } currentSize -= targetResult.data_size; } let result = await stmt.run([ resource.id, serverId, resource.hash, resource.data, resource.data.length, new Date().getTime() ]); if (result.changes != 1) { throw new Error('failed to insert resource'); } } await stmt.finalize(); }); } // Note: messages are directly from the server response, not parsed async upsertServerMessages(serverId: string, channelId: string, messages: Message[]): Promise { await this.queueTransaction(async () => { let stmt = await this.db.prepare(` INSERT INTO messages ( id, server_id, channel_id, member_id, sent_dtg, text , resource_id, resource_name, resource_width, resource_height, resource_preview_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) ON CONFLICT (id, server_id) DO UPDATE SET server_id=?2, channel_id=?3, member_id=?4, sent_dtg=?5, text=?6 , resource_id=?7, resource_name=?8, resource_width=?9, resource_height=?10, resource_preview_id=?11 `); for (let message of messages) { let result = await stmt.run([ message.id, serverId, message.channel.id, message.member.id, message.sent.getTime(), message.text, message.resourceId, message.resourceName, message.resourceWidth, message.resourceHeight, message.resourcePreviewId ]); if (result.changes != 1) { // note: probably want to warn and continue throw new Error('failed to insert message'); } } await stmt.finalize(); // delete the oldest messages if the cache is too big await this.db.run(` DELETE FROM messages WHERE id IN ( SELECT id FROM messages WHERE server_id=?1 AND channel_id=?2 ORDER BY sent_dtg LIMIT max(0, (SELECT COUNT(*) FROM messages WHERE server_id=?1 AND channel_id=?2) - ?3) ) `, [ serverId, channelId, Globals.MAX_CACHED_CHANNEL_MESSAGES ]); }); } async clearServerMessages(serverId: string, channelId: string): Promise { await this.db.run('DELETE FROM messages WHERE server_id=? AND channel_id=?', [ serverId, channelId ]); } async deleteServerMessages(serverId: string, messageIds: string[]): Promise { await this.queueTransaction(async () => { let stmt = await this.db.prepare('DELETE FROM messages WHERE id=? AND server_id=?'); // include server_id for security purposes for (let messageId of messageIds) { let result = await stmt.run([ messageId, serverId ]); if (result.changes != 1) { // note: probably want to warn and continue throw new Error('failed to delete message'); } } }); } async getServerConfigs(): Promise { let result = await this.db.all(` SELECT servers.id AS server_id , servers.url AS url , servers.cert AS server_cert , servers.member_id AS member_id , identities.public_key AS public_key , identities.private_key AS private_key FROM server_identities , servers , identities WHERE server_identities.identity_id = identities.id AND server_identities.server_id = servers.id `); return result.map((dataServerConfig: any) => ServerConfig.fromDBData(dataServerConfig)); } async getServerConfig(serverId: number, identityId: number): Promise { let result = await this.db.get(` SELECT servers.id AS server_id , servers.url AS url , servers.cert AS server_cert , servers.member_id AS member_id , identities.public_key AS public_key , identities.private_key AS private_key FROM server_identities , servers , identities WHERE server_identities.identity_id = identities.id AND server_identities.server_id = servers.id AND servers.id=? AND identities.id=? `, [ serverId, identityId ]); return ServerConfig.fromDBData(result); } /* CREATE TABLE IF NOT EXISTS servers ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT , url TEXT NOT NULL , cert TEXT NOT NULL , name TEXT , icon_resource_id TEXT , member_id TEXT ) */ async getServer(serverId: string): Promise { let result = await this.db.get(` SELECT id, url, cert, name, icon_resource_id, member_id FROM servers WHERE id=? `, [ serverId ]); if (result.name == null || result.icon_resource_id == null) { // server is not set up yet. return null; } return CacheServerData.fromDBData(result); } async getServerMemberId(serverId: string): Promise { let server = await this.db.get('SELECT member_id FROM servers WHERE id=?', [ serverId ]); return server.member_id; } // returns null if no members async getMembers(serverId: string): Promise { let members = await this.db.all('SELECT * FROM members WHERE server_id=?', [ serverId ]); if (members.length === 0) { return null; } return members.map((dataMember: any) => Member.fromDBData(dataMember)); } // returns null if no channels async getChannels(serverId: string): Promise { let channels = await this.db.all('SELECT * FROM channels WHERE server_id=?', [ serverId ]); if (channels.length === 0) { return null; } return channels.map((dataChannel: any) => Channel.fromDBData(dataChannel)); } // returns [] if no messages found async getMessagesRecent( serverId: string, channelId: string, number: number, members: Map, channels: Map ): Promise { let messages = await this.db.all(` SELECT * FROM ( SELECT "id", "channel_id", "member_id" ,"sent_dtg", "text" , "resource_id" , "resource_name" , "resource_width" , "resource_height" , "resource_preview_id" FROM "messages" WHERE "server_id"=? AND "channel_id"=? ORDER BY "sent_dtg" DESC LIMIT ? ) AS "r" ORDER BY "r"."sent_dtg" ASC `, [ serverId, channelId, number ]); return messages.map((dataMessage: any) => Message.fromDBData(dataMessage, members, channels)); } // returns null if no messages found async getMessagesBefore( serverId: string, channelId: string, messageId: string, number: number, members: Map, channels: Map ): Promise { // Note: this query succeeds returning no results if the message with specified id is not found let messages = await this.db.all(` SELECT * FROM ( SELECT "id", "channel_id", "member_id" , "sent_dtg", "text" , "resource_id" , "resource_name" , "resource_width" , "resource_height" , "resource_preview_id" FROM "messages" WHERE "server_id"=? AND "channel_id"=? AND "sent_dtg" < (SELECT "sent_dtg" FROM "messages" WHERE "id"=?) ORDER BY "sent_dtg" DESC LIMIT ? ) AS "r" ORDER BY "r"."sent_dtg" ASC `, [ serverId, channelId, messageId, number ]); if (messages.length == 0) { return null; } return messages.map((messageData: any) => Message.fromDBData(messageData, members, channels)); } // returns null if no messages found async getMessagesAfter( serverId: string, channelId: string, messageId: string, number: number, members: Map, channels: Map ): Promise { // Note: this query succeeds returning no results if the message with specified id is not found let messages = await this.db.all(` SELECT "id", "channel_id", "member_id" , "sent_dtg", "text" , "resource_id" , "resource_name" , "resource_width" , "resource_height" , "resource_preview_id" FROM "messages" WHERE "server_id"=? AND "channel_id"=? AND "sent_dtg" > (SELECT "sent_dtg" FROM "messages" WHERE "id"=?) ORDER BY "sent_dtg" ASC LIMIT ? `, [ serverId, channelId, messageId, number ]); if (messages.length == 0) { return null; } return messages.map((messageData: any) => Message.fromDBData(messageData, members, channels)); } async getResource(serverId: string, resourceId: string): Promise { let row = await this.db.get('SELECT id, data, hash FROM resources WHERE server_id=? AND id=?', [ serverId, resourceId ]); await this.db.run('UPDATE resources SET last_used=?1 WHERE server_id=?2 AND id=?3', [ new Date().getTime(), serverId, resourceId ]); if (!row) { return null; } return Resource.fromDBData(row); } }