cordis/client/webapp/db-cache.ts

547 lines
20 KiB
TypeScript

import * as fs from 'fs/promises';
import ConcurrentQueue from '../../concurrent-queue/concurrent-queue';
import Globals from './globals';
import * as sqlite from 'sqlite';
import * as sqlite3 from 'sqlite3';
import { Message, Member, Channel, Resource, NotInitializedError, ServerMetaData, ServerConfig, CacheServerData } from './data-types';
let db: sqlite.Database | null = null;
// A cache implemented using an sqlite database
// Also stores configuration for server connections
export default class DBCache {
static TRANSACTION_QUEUE = new ConcurrentQueue(1);
static async beginTransaction(): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run('BEGIN TRANSACTION');
}
static async rollbackTransaction(): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run('ROLLBACK');
}
static async commitTransaction(): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run('COMMIT');
}
static async queueTransaction(func: (() => Promise<void>)): Promise<void> {
await DBCache.TRANSACTION_QUEUE.push(async () => {
try {
await this.beginTransaction();
await func();
await this.commitTransaction();
} catch (e) {
await this.rollbackTransaction();
throw e;
}
});
}
static async connect(): Promise<void> {
try {
await fs.access('./db');
} catch (e) {
await fs.mkdir('./db');
}
db = await sqlite.open({
driver: sqlite3.Database,
filename: './db/cache.db'
});
}
static async init(): Promise<void> {
await this.queueTransaction(async () => {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run(`
CREATE TABLE IF NOT EXISTS identities (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT
, public_key TEXT NOT NULL
, private_key TEXT NOT NULL
)
`);
await db.run(`
CREATE TABLE IF NOT EXISTS servers (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT
, url TEXT NOT NULL
, cert TEXT NOT NULL
, name TEXT
, icon_resource_id TEXT
, member_id TEXT
)
`);
await db.run(`
CREATE TABLE IF NOT EXISTS server_identities (
server_id INTEGER NOT NULL
, identity_id INTEGER NOT NULL
, FOREIGN KEY (server_id) REFERENCES servers(id)
, FOREIGN KEY (identity_id) REFERENCES identities(id)
)
`);
await db.run(`
CREATE TABLE IF NOT EXISTS members (
id TEXT NOT NULL
, server_id INTEGER NOT NULL REFERENCES servers(id)
, display_name TEXT NOT NULL
, status TEXT NOT NULL
, avatar_resource_id TEXT NOT NULL
, role_name TEXT
, role_color TEXT
, role_priority INTEGER
, privileges TEXT
, CONSTRAINT members_id_server_id_con UNIQUE (id, server_id)
)
`);
await db.run(`
CREATE TABLE IF NOT EXISTS channels (
id TEXT NOT NULL
, server_id INTEGER NOT NULL REFERENCES servers(id)
, "index" INTEGER NOT NULL
, name TEXT NOT NULL
, flavor_text TEXT
, CONSTRAINT channels_id_server_id_con UNIQUE (id, server_id)
)
`);
await db.run(`
CREATE TABLE IF NOT EXISTS resources (
id TEXT NOT NULL
, server_id INTEGER NOT NULL REFERENCES servers(id)
, hash BLOB NOT NULL
, data BLOB NOT NULL
, data_size INTEGER NOT NULL
, last_used INTEGER NOT NULL
, CONSTRAINT resources_id_server_id_con UNIQUE (id, server_id)
)
`);
await db.run('CREATE INDEX IF NOT EXISTS resources_data_size_idx ON resources (data_size)');
// note: no foreign key on resource_id since we may not have cached the resource yet
await db.run(`
CREATE TABLE IF NOT EXISTS messages (
id TEXT NOT NULL
, server_id INTEGER NOT NULL REFERENCES servers(id)
, channel_id TEXT NOT NULL REFERENCES channels(id)
, member_id TEXT NOT NULL REFERENCES members(id)
, sent_dtg INTEGER NOT NULL
, text TEXT
, resource_id TEXT
, resource_name TEXT
, resource_width INTEGER
, resource_height INTEGER
, resource_preview_id TEXT
, CONSTRAINT messages_id_server_id_con UNIQUE (id, server_id)
)
`);
await db.run('CREATE INDEX IF NOT EXISTS messages_id_idx ON messages (id)');
await db.run('CREATE INDEX IF NOT EXISTS messages_sent_dtg_idx ON messages (sent_dtg)');
});
}
static async close(): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
await db.close();
}
// dangerous!
static async reset(): Promise<void> {
await this.queueTransaction(async () => {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run('DROP TABLE IF EXISTS identities');
await db.run('DROP TABLE IF EXISTS servers');
await db.run('DROP TABLE IF EXISTS server_identities');
await db.run('DROP TABLE IF EXISTS members');
await db.run('DROP TABLE IF EXISTS channels');
await db.run('DROP TABLE IF EXISTS resources');
await db.run('DROP TABLE IF EXISTS messages');
});
}
// returns the id of the identity inserted
static async addIdentity(publicKeyPem: string, privateKeyPem: string): Promise<number> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.run(`
INSERT INTO identities (public_key, private_key) VALUES (?, ?)
`, [ publicKeyPem, privateKeyPem ]);
if (!result || result.changes !== 1) {
throw new Error('unable to insert identity');
}
return result.lastID as number;
}
// returns the id (client-side) of the server inserted
static async addServer(url: string, cert?: string, name?: string): Promise<number> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.run(`
INSERT INTO servers (url, cert, name, icon_resource_id) VALUES (?, ?, ?, NULL)
`, [ url, cert, name ]);
if (!result || result.changes !== 1) {
throw new Error('unable to insert server');
}
return result.lastID as number;
}
static async removeServer(serverId: string): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.run('DELETE FROM servers WHERE id=?', [ serverId ]);
if (result.changes != 1) {
throw new Error('unable to remove server');
}
}
static async addServerIdentity(serverId: number, identityId: number): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.run(`
INSERT INTO server_identities (server_id, identity_id) VALUES (?, ?)
`, [ serverId, identityId ]);
if (result.changes != 1) {
throw new Error('unable to insert server identity');
}
}
static async updateServer(serverId: string, serverMeta: ServerMetaData): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.run('UPDATE servers SET name=?, icon_resource_id=? WHERE id=?', [ serverMeta.name, serverMeta.iconResourceId, serverId ]);
if (result.changes != 1) {
throw new Error('unable to update server');
}
}
static async updateServerMemberId(serverId: string, memberId: string): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.run('UPDATE servers SET member_id=? WHERE id=?', [ memberId, serverId ]);
if (result.changes != 1) {
throw new Error(`unable to update member id, s#${serverId}, mem#${memberId}`);
}
}
static async updateServerMembers(serverId: string, members: Member[]): Promise<void> {
await this.queueTransaction(async () => {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run('DELETE FROM members WHERE server_id=?', [ serverId ]);
let stmt = await db.prepare('INSERT INTO members (id, server_id, display_name, status, avatar_resource_id, role_name, role_color, role_priority, privileges) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)');
for (let member of members) {
let result = await stmt.run([ member.id, serverId, member.displayName, member.status, member.avatarResourceId, member.roleName, member.roleColor, member.rolePriority, member.privileges?.join(',') ]);
if (result.changes != 1) {
// note: probably want to warn and continue
throw new Error('failed to insert member');
}
}
await stmt.finalize();
});
}
static async clearAllMemberStatus(serverId: string): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run(`UPDATE members SET status='unknown' WHERE server_id=?`, [ serverId ]);
}
static async updateServerChannels(serverId: string, channels: Channel[]): Promise<void> {
console.log('setting to ' + channels.length + ' channels');
await this.queueTransaction(async () => {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run('DELETE FROM channels WHERE server_id=?', [ serverId ]);
let stmt = await db.prepare('INSERT INTO channels (id, server_id, "index", name, flavor_text) VALUES (?, ?, ?, ?, ?)');
for (let channel of channels) {
let result = await stmt.run([ channel.id, serverId, channel.index, channel.name, channel.flavorText ]);
if (result.changes != 1) {
// note: probably want to warn and continue
throw new Error('failed to insert channel');
}
}
await stmt.finalize();
});
}
// TODO: make this singular and a non-transaction based function?
static async upsertServerResources(serverId: string, resources: Resource[]): Promise<void> {
await this.queueTransaction(async () => {
if (db === null) throw new NotInitializedError('db not initialized');
let currentSizeResult = await db.get('SELECT SUM(data_size) AS current_size FROM resources WHERE server_id=?', [ serverId ]);
let currentSize = parseInt(currentSizeResult.current_size || 0);
let stmt = await db.prepare(`
INSERT INTO resources (id, server_id, hash, data, data_size, last_used) VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT (id, server_id) DO UPDATE SET hash=?3, data=?4, last_used=?6
`);
for (let resource of resources) {
if (resource.data.length > Globals.MAX_CACHED_RESOURCE_SIZE) {
continue;
}
while (resource.data.length + currentSize > Globals.MAX_SERVER_RESOURCE_CACHE_SIZE) {
let targetResult = await db.get('SELECT id, data_size FROM resources ORDER BY last_used ASC LIMIT 1');
let deleteResult = await db.run('DELETE FROM resources WHERE id=?', [ targetResult.id ]);
if (deleteResult.changes != 1) {
throw new Error('failed to delete excess resource');
}
currentSize -= targetResult.data_size;
}
let result = await stmt.run([ resource.id, serverId, resource.hash, resource.data, resource.data.length, new Date().getTime() ]);
if (result.changes != 1) {
throw new Error('failed to insert resource');
}
}
await stmt.finalize();
});
}
// Note: messages are directly from the server response, not parsed
static async upsertServerMessages(serverId: string, channelId: string, messages: Message[]): Promise<void> {
await this.queueTransaction(async () => {
if (db === null) throw new NotInitializedError('db not initialized');
let stmt = await db.prepare(`
INSERT INTO messages (
id, server_id, channel_id, member_id, sent_dtg, text
, resource_id, resource_name, resource_width, resource_height, resource_preview_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
ON CONFLICT (id, server_id)
DO UPDATE SET
server_id=?2, channel_id=?3, member_id=?4, sent_dtg=?5, text=?6
, resource_id=?7, resource_name=?8, resource_width=?9, resource_height=?10, resource_preview_id=?11
`);
for (let message of messages) {
let result = await stmt.run([
message.id, serverId, message.channel.id,
message.member.id, message.sent.getTime(), message.text,
message.resourceId, message.resourceName,
message.resourceWidth, message.resourceHeight,
message.resourcePreviewId
]);
if (result.changes != 1) {
// note: probably want to warn and continue
throw new Error('failed to insert message');
}
}
await stmt.finalize();
// delete the oldest messages if the cache is too big
await db.run(`
DELETE FROM messages WHERE id IN (
SELECT id FROM messages WHERE server_id=?1 AND channel_id=?2 ORDER BY sent_dtg
LIMIT max(0, (SELECT COUNT(*) FROM messages WHERE server_id=?1 AND channel_id=?2) - ?3)
)
`, [ serverId, channelId, Globals.MAX_CACHED_CHANNEL_MESSAGES ]);
});
}
static async clearServerMessages(serverId: string, channelId: string): Promise<void> {
if (db === null) throw new NotInitializedError('db not initialized');
await db.run('DELETE FROM messages WHERE server_id=? AND channel_id=?', [ serverId, channelId ]);
}
static async deleteServerMessages(serverId: string, messageIds: string[]): Promise<void> {
await this.queueTransaction(async () => {
if (db === null) throw new NotInitializedError('db not initialized');
let stmt = await db.prepare('DELETE FROM messages WHERE id=? AND server_id=?'); // include server_id for security purposes
for (let messageId of messageIds) {
let result = await stmt.run([ messageId, serverId ]);
if (result.changes != 1) {
// note: probably want to warn and continue
throw new Error('failed to delete message');
}
}
});
}
static async getServerConfigs(): Promise<ServerConfig[]> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.all(`
SELECT
servers.id AS server_id
, servers.url AS url
, servers.cert AS server_cert
, servers.member_id AS member_id
, identities.public_key AS public_key
, identities.private_key AS private_key
FROM
server_identities
, servers
, identities
WHERE
server_identities.identity_id = identities.id
AND server_identities.server_id = servers.id
`);
return result.map((dataServerConfig: any) => ServerConfig.fromDBData(dataServerConfig));
}
static async getServerConfig(serverId: number, identityId: number): Promise<ServerConfig> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.get(`
SELECT
servers.id AS server_id
, servers.url AS url
, servers.cert AS server_cert
, servers.member_id AS member_id
, identities.public_key AS public_key
, identities.private_key AS private_key
FROM
server_identities
, servers
, identities
WHERE
server_identities.identity_id = identities.id
AND server_identities.server_id = servers.id
AND servers.id=?
AND identities.id=?
`, [ serverId, identityId ]);
return ServerConfig.fromDBData(result);
}
/*
CREATE TABLE IF NOT EXISTS servers (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT
, url TEXT NOT NULL
, cert TEXT NOT NULL
, name TEXT
, icon_resource_id TEXT
, member_id TEXT
)
*/
static async getServer(serverId: string): Promise<CacheServerData | null> {
if (db === null) throw new NotInitializedError('db not initialized');
let result = await db.get(`
SELECT
id, url, cert, name,
icon_resource_id, member_id
FROM servers
WHERE id=?
`, [ serverId ]);
if (result.name == null || result.icon_resource_id == null) {
// server is not set up yet.
return null;
}
return CacheServerData.fromDBData(result);
}
static async getServerMemberId(serverId: string): Promise<string> {
if (db === null) throw new NotInitializedError('db not initialized');
let server = await db.get('SELECT member_id FROM servers WHERE id=?', [ serverId ]);
return server.member_id;
}
// returns null if no members
static async getMembers(serverId: string): Promise<Member[] | null> {
if (db === null) throw new NotInitializedError('db not initialized');
let members = await db.all('SELECT * FROM members WHERE server_id=?', [ serverId ]);
if (members.length === 0) {
return null;
}
return members.map((dataMember: any) => Member.fromDBData(dataMember));
}
// returns null if no channels
static async getChannels(serverId: string): Promise<Channel[] | null> {
if (db === null) throw new NotInitializedError('db not initialized');
let channels = await db.all('SELECT * FROM channels WHERE server_id=?', [ serverId ]);
if (channels.length === 0) {
return null;
}
return channels.map((dataChannel: any) => Channel.fromDBData(dataChannel));
}
// returns [] if no messages found
static async getMessagesRecent(
serverId: string, channelId: string, number: number,
members: Map<string, Member>, channels: Map<string, Channel>
): Promise<Message[]> {
if (db === null) throw new NotInitializedError('db not initialized');
let messages = await db.all(`
SELECT * FROM (
SELECT
"id", "channel_id", "member_id"
,"sent_dtg", "text"
, "resource_id"
, "resource_name"
, "resource_width"
, "resource_height"
, "resource_preview_id"
FROM "messages"
WHERE "server_id"=? AND "channel_id"=?
ORDER BY "sent_dtg" DESC
LIMIT ?
) AS "r" ORDER BY "r"."sent_dtg" ASC
`, [ serverId, channelId, number ]);
return messages.map((dataMessage: any) => Message.fromDBData(dataMessage, members, channels));
}
// returns null if no messages found
static async getMessagesBefore(
serverId: string, channelId: string, messageId: string, number: number,
members: Map<string, Member>, channels: Map<string, Channel>
): Promise<Message[] | null> {
if (db === null) throw new NotInitializedError('db not initialized');
// Note: this query succeeds returning no results if the message with specified id is not found
let messages = await db.all(`
SELECT * FROM (
SELECT
"id", "channel_id", "member_id"
, "sent_dtg", "text"
, "resource_id"
, "resource_name"
, "resource_width"
, "resource_height"
, "resource_preview_id"
FROM "messages"
WHERE
"server_id"=?
AND "channel_id"=?
AND "sent_dtg" < (SELECT "sent_dtg" FROM "messages" WHERE "id"=?)
ORDER BY "sent_dtg" DESC
LIMIT ?
) AS "r" ORDER BY "r"."sent_dtg" ASC
`, [ serverId, channelId, messageId, number ]);
if (messages.length == 0) {
return null;
}
return messages.map((messageData: any) => Message.fromDBData(messageData, members, channels));
}
// returns null if no messages found
static async getMessagesAfter(
serverId: string, channelId: string, messageId: string, number: number,
members: Map<string, Member>, channels: Map<string, Channel>
): Promise<Message[] | null> {
if (db === null) throw new NotInitializedError('db not initialized');
// Note: this query succeeds returning no results if the message with specified id is not found
let messages = await db.all(`
SELECT
"id", "channel_id", "member_id"
, "sent_dtg", "text"
, "resource_id"
, "resource_name"
, "resource_width"
, "resource_height"
, "resource_preview_id"
FROM "messages"
WHERE
"server_id"=?
AND "channel_id"=?
AND "sent_dtg" > (SELECT "sent_dtg" FROM "messages" WHERE "id"=?)
ORDER BY "sent_dtg" ASC
LIMIT ?
`, [ serverId, channelId, messageId, number ]);
if (messages.length == 0) {
return null;
}
return messages.map((messageData: any) => Message.fromDBData(messageData, members, channels));
}
static async getResource(serverId: string, resourceId: string): Promise<Resource | null> {
if (db === null) throw new NotInitializedError('db not initialized');
let row = await db.get('SELECT id, data, hash FROM resources WHERE server_id=? AND id=?', [ serverId, resourceId ]);
await db.run('UPDATE resources SET last_used=?1 WHERE server_id=?2 AND id=?3', [ new Date().getTime(), serverId, resourceId ]);
if (!row) {
return null;
}
return Resource.fromDBData(row);
}
}