569 lines
22 KiB
TypeScript
569 lines
22 KiB
TypeScript
import * as crypto from 'crypto';
|
|
|
|
import * as pg from 'pg';
|
|
import { stringify } from 'querystring';
|
|
|
|
import ConcurrentQueue from '../concurrent-queue/concurrent-queue';
|
|
import Logger from '../logger/logger';
|
|
|
|
const LOG = Logger.create('db');
|
|
|
|
const db = new pg.Client({
|
|
host: 'localhost',
|
|
user: 'cordis',
|
|
password: 'cordis_pass',
|
|
database: 'cordis',
|
|
});
|
|
|
|
export default class DB {
|
|
static async connect(): Promise<void> {
|
|
await db.connect();
|
|
}
|
|
|
|
static async end(): Promise<void> {
|
|
await db.end();
|
|
}
|
|
|
|
private static TRANSACTION_QUEUE = new ConcurrentQueue<void>(1);
|
|
|
|
static async beginTransaction(): Promise<void> {
|
|
await db.query('BEGIN');
|
|
}
|
|
|
|
static async rollbackTransaction(): Promise<void> {
|
|
await db.query('ROLLBACK');
|
|
}
|
|
|
|
static async commitTransaction(): Promise<void> {
|
|
await db.query('COMMIT');
|
|
}
|
|
|
|
static async queueTransaction(func): Promise<void> {
|
|
await DB.TRANSACTION_QUEUE.push(async () => {
|
|
try {
|
|
await this.beginTransaction();
|
|
await func();
|
|
await this.commitTransaction();
|
|
} catch (e) {
|
|
await this.rollbackTransaction();
|
|
throw e;
|
|
}
|
|
});
|
|
}
|
|
|
|
static async getAllServers(): Promise<any[]> {
|
|
let result = await db.query('SELECT * FROM "servers" LEFT JOIN "servers_meta" ON "servers"."id"="servers_meta"."id"');
|
|
return result.rows;
|
|
}
|
|
|
|
// Returns the member_id and server_id for the member with specified public key
|
|
static async getMemberInfo(publicKey: crypto.KeyObject): Promise<any> {
|
|
let der = publicKey.export({ type: 'spki', format: 'der' });
|
|
|
|
let der_start = der.subarray(0, 10);
|
|
LOG.silly(`searching for public key (der starts with ${LOG.inspect(der_start)})`);
|
|
|
|
let result = await db.query('SELECT "id" AS member_id, "server_id" FROM "members" WHERE "public_key"=$1', [ der ]);
|
|
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to find member with specified public key');
|
|
}
|
|
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async getServer(serverId: string): Promise<any> {
|
|
let result = await db.query('SELECT "name", "icon_resource_id" FROM "servers_meta" WHERE "id"=$1', [ serverId ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to find server with specified id');
|
|
}
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async setName(serverId: string, name: string): Promise<any> {
|
|
let result = await db.query(`
|
|
UPDATE "servers_meta" SET "name"=$1 WHERE "id"=$2
|
|
RETURNING "name", "icon_resource_id"
|
|
`, [ name, serverId ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to update server name');
|
|
}
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async setIcon(serverId: string, iconResourceId: string): Promise<any> {
|
|
let result = await db.query(`
|
|
UPDATE "servers_meta" SET "icon_resource_id"=$1 WHERE "id"=$2
|
|
RETURNING "name", "icon_resource_id"
|
|
`, [ iconResourceId, serverId ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to update server icon');
|
|
}
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async getMembers(serverId: string): Promise<any[]> {
|
|
let result = await db.query('SELECT * FROM "members_with_roles" WHERE "server_id"=$1', [ serverId ]);
|
|
return result.rows;
|
|
}
|
|
|
|
static async getMember(serverId: string, memberId: string): Promise<any> {
|
|
let result = await db.query('SELECT * FROM "members_with_roles" WHERE "server_id"=$1 AND "id"=$2', [ serverId, memberId ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to get member');
|
|
}
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async getChannels(serverId: string): Promise<any[]> {
|
|
let result = await db.query('SELECT "id", "index", "name", "flavor_text" FROM "channels" WHERE "server_id"=$1 ORDER BY "index"', [ serverId ]);
|
|
return result.rows;
|
|
}
|
|
|
|
static async createChannel(serverId: string, name: string, flavorText: string): Promise<any> {
|
|
let channel = null;
|
|
await DB.queueTransaction(async () => {
|
|
let indexResult = await db.query('SELECT MAX("index") AS max_index FROM "channels" WHERE "server_id"=$1', [ serverId ]);
|
|
if (indexResult.rows.length != 1) {
|
|
throw new Error('invalid index result');
|
|
}
|
|
let newIndex = (indexResult.rows[0].max_index || 0) + 1;
|
|
let insertResult = await db.query(`
|
|
INSERT INTO "channels" ("server_id", "index", "name", "flavor_text")
|
|
VALUES ($1, $2, $3, $4)
|
|
RETURNING "id", "index", "name", "flavor_text"
|
|
`, [ serverId, newIndex, name, flavorText ]);
|
|
if (insertResult.rows.length != 1) {
|
|
throw new Error('unable to insert channel');
|
|
}
|
|
channel = insertResult.rows[0];
|
|
});
|
|
return channel;
|
|
}
|
|
|
|
static async updateChannel(serverId: string, channelId: string, name: string, flavorText: string): Promise<any> {
|
|
let result = await db.query(`
|
|
UPDATE "channels" SET "name"=$1, "flavor_text"=$2 WHERE "server_id"=$3 AND "id"=$4
|
|
RETURNING *
|
|
`, [ name, flavorText, serverId, channelId ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to update channel');
|
|
}
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async getMessagesRecent(serverId: string, channelId: string, number: string): Promise<any> {
|
|
let result = await db.query(`
|
|
SELECT * FROM (
|
|
SELECT
|
|
"id", "channel_id", "member_id"
|
|
,"sent_dtg", "text"
|
|
, "resource_id"
|
|
, "resource_name"
|
|
, "resource_width"
|
|
, "resource_height"
|
|
, "resource_preview_id"
|
|
FROM "messages"
|
|
WHERE "server_id"=$1 AND "channel_id"=$2
|
|
ORDER BY "sent_dtg" DESC
|
|
LIMIT $3
|
|
) AS "r" ORDER BY "r"."sent_dtg" ASC`, [ serverId, channelId, number ]);
|
|
return result.rows;
|
|
}
|
|
|
|
static async getMessagesBefore(serverId: string, channelId: string, messageId: string, number: number): Promise<any[]> {
|
|
let result = await db.query(`
|
|
SELECT * FROM (
|
|
SELECT
|
|
"id", "channel_id", "member_id"
|
|
, "sent_dtg", "text"
|
|
, "resource_id"
|
|
, "resource_name"
|
|
, "resource_width"
|
|
, "resource_height"
|
|
, "resource_preview_id"
|
|
FROM "messages"
|
|
WHERE
|
|
"server_id"=$1
|
|
AND "channel_id"=$2
|
|
AND "sent_dtg" < (SELECT "sent_dtg" FROM "messages" WHERE "id"=$3)
|
|
ORDER BY "sent_dtg" DESC
|
|
LIMIT $4
|
|
) AS "r" ORDER BY "r"."sent_dtg" ASC`, [ serverId, channelId, messageId, number ]);
|
|
return result.rows;
|
|
}
|
|
|
|
static async getMessagesAfter(serverId: string, channelId: string, messageId: string, number: number): Promise<any[]> {
|
|
let result = await db.query(`
|
|
SELECT
|
|
"id", "channel_id", "member_id"
|
|
, "sent_dtg", "text"
|
|
, "resource_id"
|
|
, "resource_name"
|
|
, "resource_width"
|
|
, "resource_height"
|
|
, "resource_preview_id"
|
|
FROM "messages"
|
|
WHERE
|
|
"server_id"=$1
|
|
AND "channel_id"=$2
|
|
AND "sent_dtg" > (SELECT "sent_dtg" FROM "messages" WHERE "id"=$3)
|
|
ORDER BY "sent_dtg" ASC
|
|
LIMIT $4`, [ serverId, channelId, messageId, number ]);
|
|
return result.rows;
|
|
}
|
|
|
|
static async getResource(serverId: string, resourceId: string): Promise<any> {
|
|
let result = await db.query(`
|
|
SELECT
|
|
"id", "server_id", "hash", "data"
|
|
FROM
|
|
"resources"
|
|
WHERE
|
|
"server_id"=$1
|
|
AND "id"=$2
|
|
|
|
`, [ serverId, resourceId ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error(`unable to find specified resource s#${serverId}, r#${resourceId}`);
|
|
}
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async insertMessage(serverId: string, channelId: string, memberId: string, text: string): Promise<any> {
|
|
let result = await db.query(
|
|
`INSERT INTO "messages" (
|
|
"server_id", "channel_id", "member_id", "text", "sent_dtg"
|
|
) VALUES ($1, $2, $3, $4, NOW())
|
|
RETURNING
|
|
"id", "channel_id", "member_id"
|
|
, "text", "sent_dtg"
|
|
, "resource_id"
|
|
, "resource_name"
|
|
, "resource_width"
|
|
, "resource_height"
|
|
, "resource_preview_id"`, [ serverId, channelId, memberId, text ]);
|
|
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to properly insert message');
|
|
}
|
|
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async insertMessageWithResource(
|
|
serverId: string,
|
|
channelId: string,
|
|
memberId: string,
|
|
text: string,
|
|
resourceId: string,
|
|
resourceName: string,
|
|
resourceWidth: number | null,
|
|
resourceHeight: number | null,
|
|
resourcePreviewId: string | null
|
|
): Promise<any> {
|
|
let result = await db.query(
|
|
`INSERT INTO "messages" (
|
|
"server_id", "channel_id", "member_id", "text", "resource_id", "resource_name", "resource_width", "resource_height", "resource_preview_id", "sent_dtg"
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
|
|
RETURNING
|
|
"id", "channel_id", "member_id"
|
|
, "text", "sent_dtg"
|
|
, "resource_id"
|
|
, "resource_name"
|
|
, "resource_width"
|
|
, "resource_height"
|
|
, "resource_preview_id"`, [ serverId, channelId, memberId, text, resourceId, resourceName, resourceWidth, resourceHeight, resourcePreviewId ]);
|
|
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to properly insert message (with resource)');
|
|
}
|
|
|
|
return result.rows[0];
|
|
}
|
|
|
|
// Returns the resource id. Resources are deduped based on their hash
|
|
static async insertResource(serverId: string, resourceBuff: Buffer): Promise<string> {
|
|
// HACK: using on conflict set server_id=server_id to ensure that RETURNING gives the proper id and we don't need another select
|
|
let resourceResult = await db.query(
|
|
`INSERT INTO "resources" ("server_id", "hash", "data") VALUES ($1, digest($2::bytea, 'sha256'), $2::bytea)
|
|
ON CONFLICT ("server_id", "hash") DO UPDATE
|
|
SET "server_id"=EXCLUDED."server_id"
|
|
RETURNING "id"`, [ serverId, resourceBuff ]);
|
|
|
|
if (resourceResult.rows.length != 1) {
|
|
throw new Error('unable to insert resource');
|
|
}
|
|
return resourceResult.rows[0].id;
|
|
}
|
|
|
|
// Resets all members status to Offline for specified serverId
|
|
static async clearAllMemberStatus(serverId: string): Promise<void> {
|
|
await db.query(`UPDATE "members" SET "status"='offline' WHERE "server_id"=$1`, [ serverId ]);
|
|
}
|
|
|
|
// Set the status of a specified member
|
|
static async setMemberStatus(serverId: string, memberId: string, status: string): Promise<void> {
|
|
let result = await db.query(`
|
|
UPDATE "members" SET "status"=$1 WHERE "server_id"=$2 AND "id"=$3
|
|
`, [ status, serverId, memberId ]);
|
|
if (result.rowCount != 1) {
|
|
throw new Error('unable to update status');
|
|
}
|
|
}
|
|
|
|
// Set the display name of a specified member
|
|
static async setMemberDisplayName(serverId: string, memberId: string, displayName: string): Promise<void> {
|
|
let result = await db.query(`
|
|
UPDATE "members" SET "display_name"=$1 WHERE "server_id"=$2 AND "id"=$3
|
|
`, [ displayName, serverId, memberId ]);
|
|
if (result.rowCount != 1) {
|
|
throw new Error('unable to update status');
|
|
}
|
|
}
|
|
|
|
// Creates a role in the server (returns the id)
|
|
static async createRole(serverId: string, name: string, color: string, priority: number): Promise<string> {
|
|
let result = await db.query(`
|
|
INSERT INTO "roles"
|
|
("server_id", "name", "color", "priority")
|
|
VALUES
|
|
($1, $2, $3, $4)
|
|
RETURNING
|
|
"id"
|
|
`, [ serverId, name, color, priority ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to create role');
|
|
}
|
|
return result.rows[0].id;
|
|
}
|
|
|
|
static async removeRole(serverId: string, roleId: string): Promise<void> {
|
|
let result = await db.query(
|
|
`DELETE FROM "roles" WHERE "id"=$1 AND "server_id"=$2`,
|
|
[ roleId, serverId ]
|
|
);
|
|
if (result.rowCount != 1) {
|
|
throw new Error('unable to remove role');
|
|
}
|
|
}
|
|
|
|
static async assignRoleToMember(serverId: string, roleId: string, memberId: string): Promise<void> {
|
|
let existsResult = await db.query(`
|
|
SELECT COUNT(*) AS c FROM "member_roles" WHERE
|
|
"role_id" = $1
|
|
AND "server_id" = $2
|
|
AND "member_id" = $3
|
|
`, [ roleId, serverId, memberId ]);
|
|
if (existsResult.rows.length != 1) {
|
|
throw new Error('unable to check for existing privilege');
|
|
}
|
|
if (existsResult.rows[0].c != 0) {
|
|
LOG.warn(`r#${roleId} already assigned to u#${memberId} in s#${serverId}`);
|
|
return;
|
|
}
|
|
let result = await db.query(`
|
|
INSERT INTO "member_roles"
|
|
("role_id", "server_id", "member_id")
|
|
VALUES
|
|
($1, $2, $3)
|
|
RETURNING
|
|
"role_id"
|
|
`, [ roleId, serverId, memberId ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to assign user to role');
|
|
}
|
|
}
|
|
|
|
static async revokeRoleFromMember(serverId: string, roleId: string, memberId: string): Promise<void> {
|
|
let existsResult = await db.query(`
|
|
SELECT COUNT(*) AS c FROM "member_roles" WHERE
|
|
"role_id" = $1
|
|
AND "server_id" = $2
|
|
AND "member_id" = $3
|
|
`, [ roleId, serverId, memberId ]);
|
|
if (existsResult.rows.length != 1) {
|
|
throw new Error('unable to check for existing privilege');
|
|
}
|
|
if (existsResult.rows[0].c == 0) {
|
|
LOG.warn(`r#${roleId} was never assigned to u#${memberId} in s#${serverId}`);
|
|
return;
|
|
}
|
|
let result = await db.query(`
|
|
DELETE FROM "member_roles"
|
|
WHERE
|
|
"role_id"=$1
|
|
AND "server_id"=$2
|
|
AND "member_id"=$3
|
|
`, [ roleId, serverId, memberId ]);
|
|
if (result.rowCount != 1) {
|
|
throw new Error('unable to revoke role from user');
|
|
}
|
|
}
|
|
|
|
static async assignRolePrivilege(serverId: string, roleId: string, privilege: string): Promise<void> {
|
|
let existsResult = await db.query(`
|
|
SELECT COUNT(*) AS c FROM "role_privileges" WHERE
|
|
"role_id" = $1
|
|
AND "server_id" = $2
|
|
AND "privilege" = $3
|
|
`, [ roleId, serverId, privilege ]);
|
|
if (existsResult.rows.length != 1) {
|
|
throw new Error('unable to check for existing privilege');
|
|
}
|
|
if (existsResult.rows[0].c != 0) {
|
|
LOG.warn(`privilege ${privilege} already exists for r#${roleId} on s#${serverId}`);
|
|
return;
|
|
}
|
|
let result = await db.query(`
|
|
INSERT INTO "role_privileges"
|
|
("role_id", "server_id", "privilege")
|
|
VALUES
|
|
($1, $2, $3)
|
|
RETURNING
|
|
"role_id"
|
|
`, [ roleId, serverId, privilege ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to add privilege to role privileges');
|
|
}
|
|
}
|
|
|
|
static async revokeRolePrivilege(roleId: string, serverId: string, privilege: string): Promise<void> {
|
|
let existsResult = await db.query(`
|
|
SELECT COUNT(*) AS c FROM "role_privileges" WHERE
|
|
"role_id" = $1
|
|
AND "server_id" = $2
|
|
AND "privilege" = $3
|
|
`, [ roleId, serverId, privilege ]);
|
|
if (existsResult.rows.length != 1) {
|
|
throw new Error('unable to check for existing privilege');
|
|
}
|
|
if (existsResult.rows[0].c == 0) {
|
|
LOG.warn(`privilege ${privilege} did not exist for r#${roleId} on s#${serverId}`);
|
|
return;
|
|
}
|
|
let result = await db.query(`
|
|
DELETE FROM "role_privileges"
|
|
WHERE
|
|
"role_id"=$1
|
|
, "server_id"=$2
|
|
, "privilege"=$3
|
|
`, [ roleId, serverId, privilege ]);
|
|
if (result.rowCount != 1) {
|
|
throw new Error('unable to revoke privilege to role privileges');
|
|
}
|
|
}
|
|
|
|
static async hasPrivilege(serverId: string, memberId: string, privilege: string): Promise<boolean> {
|
|
let result = await db.query(`
|
|
SELECT COUNT(*) AS c FROM
|
|
member_roles
|
|
, role_privileges
|
|
WHERE
|
|
member_roles.role_id=role_privileges.role_id
|
|
AND member_roles.server_id=$1
|
|
AND member_roles.member_id=$2
|
|
AND role_privileges.privilege=$3
|
|
`, [ serverId, memberId, privilege ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to check for privilege');
|
|
}
|
|
return result.rows[0].c > 0;
|
|
}
|
|
|
|
static async setMemberAvatarResourceId(serverId: string, memberId: string, avatarResourceId: string): Promise<void> {
|
|
let result = await db.query(`
|
|
UPDATE "members" SET "avatar_resource_id"=$1 WHERE "server_id"=$2 AND "id"=$3
|
|
`, [ avatarResourceId, serverId, memberId ]);
|
|
if (result.rowCount != 1) {
|
|
throw new Error('unable to update status');
|
|
}
|
|
}
|
|
|
|
static async getTokens(serverId: string): Promise<any[]> {
|
|
let result = await db.query(`
|
|
SELECT "token", "member_id", "created", "expires" FROM tokens
|
|
WHERE "server_id"=$1
|
|
`, [ serverId]);
|
|
return result.rows;
|
|
}
|
|
|
|
//insert into tokens (server_id, expires) VALUES ('226b3e9e-5220-4205-bf5b-6738b9b3bb39', NOW() + '7 days'::interval) RETURNING "token", "expires";
|
|
static async createToken(serverId: string, expiresAfter: string): Promise<any> {
|
|
let result = await db.query(`
|
|
INSERT INTO "tokens" ("server_id", "expires")
|
|
VALUES ($1, NOW() + $2::interval)
|
|
RETURNING "token", "expires"
|
|
`, [ serverId, expiresAfter ]);
|
|
if (result.rows.length != 1) {
|
|
throw new Error('unable to insert a token');
|
|
}
|
|
return result.rows[0];
|
|
}
|
|
|
|
static async isTokenReal(token: string): Promise<boolean> {
|
|
let result = await db.query(`SELECT COUNT(*) AS c FROM "tokens" WHERE "token"=$1`, [ token ]);
|
|
return result.rows[0].c == 1;
|
|
}
|
|
|
|
static async isTokenForServer(token: string, serverId: string): Promise<boolean> {
|
|
let result = await db.query(`SELECT COUNT(*) AS c FROM "tokens" WHERE "token"=$1 AND "server_id"=$2`, [ token, serverId ]);
|
|
return result.rows[0].c == 1;
|
|
}
|
|
|
|
static async isTokenTaken(token: string): Promise<boolean> {
|
|
let result = await db.query(`SELECT COUNT(*) AS c FROM "tokens" WHERE "token"=$1 AND "member_id" IS NOT NULL`, [ token ]);
|
|
return result.rows[0].c == 1;
|
|
}
|
|
|
|
static async isTokenActive(token: string): Promise<boolean> {
|
|
let result = await db.query(`SELECT COUNT(*) AS c FROM "tokens" WHERE "token"=$1 AND "member_id" IS NULL AND "expires">NOW()`, [ token ]);
|
|
return result.rows[0].c == 1;
|
|
}
|
|
|
|
// registers a user for with a token
|
|
// NOTE: Tokens are unique across servers so they can be used to get the serverId
|
|
static async registerWithToken(token: string, derBuff: Buffer, displayName: string, avatarBuff: Buffer): Promise<{ serverId: string, memberId: string }> {
|
|
// insert avatar as resource
|
|
let result: { memberId: string, serverId: string } | null = null;
|
|
await DB.queueTransaction(async () => {
|
|
let resultServerId = await db.query(`
|
|
SELECT "server_id" FROM "tokens" WHERE "token"=$1
|
|
`, [ token ]);
|
|
if (resultServerId.rows.length != 1) {
|
|
throw new Error('unable to get token server id');
|
|
}
|
|
let serverId = resultServerId.rows[0].server_id as string;
|
|
let avatarResourceId = await DB.insertResource(serverId, avatarBuff);
|
|
let resultInsertMember = await db.query(`
|
|
INSERT INTO "members" (
|
|
"server_id", "public_key", "display_name", "avatar_resource_id"
|
|
) VALUES ($1, $2, $3, $4)
|
|
RETURNING "id"
|
|
`, [ serverId, derBuff, displayName, avatarResourceId ]);
|
|
if (resultInsertMember.rows.length != 1) {
|
|
throw new Error('unable to insert member');
|
|
}
|
|
let memberId = resultInsertMember.rows[0].id;
|
|
let resultUpdate = await db.query(`
|
|
UPDATE "tokens" SET "member_id"=$1 WHERE "token"=$2
|
|
`, [ memberId, token ]);
|
|
if (resultUpdate.rowCount != 1) {
|
|
throw new Error('unable to update token with new member');
|
|
}
|
|
result = { serverId, memberId };
|
|
});
|
|
if (!result) {
|
|
throw new Error('result was not set');
|
|
}
|
|
return result;
|
|
}
|
|
|
|
static async revokeToken(token: string): Promise<void> {
|
|
let result = await db.query('DELETE FROM "tokens" WHERE "token"=$1', [ token ]);
|
|
if (result.rowCount != 1) {
|
|
throw new Error('unable to remove token');
|
|
}
|
|
}
|
|
}
|