not enough timestamp precision for true chads, added an order column to messages to deal with it

This commit is contained in:
Michael Peters 2021-12-04 05:03:35 -06:00
parent f00127f486
commit 445fc17edc
12 changed files with 153 additions and 54 deletions

View File

@ -1,8 +1,11 @@
// Intended to be used with message lists
import * as electronRemote from '@electron/remote';
const electronConsole = electronRemote.getGlobal('console') as Console;
import Logger from '../../logger/logger';
const LOG = Logger.create(__filename, electronConsole);
import { stringify } from "querystring";
import { AutoVerifier, AutoVerifierChangesType } from "./auto-verifier";
import { Changes, WithEquals } from "./data-types";
import Q from './q-module';
export interface PartialMessageListQuery {
channelId: string;
@ -37,7 +40,17 @@ export class AutoVerifierWithArg<T, K> {
query => primaryFunc(query),
query => trustedFunc(query),
async (query: PartialMessageListQuery, primaryResult: T[] | null, trustedResult: T[] | null) => {
LOG.debug('messages verify: ', {
query,
// primaryResult: primaryResult?.map((e: any) => e.sent).sort(),
// trustedResult: trustedResult?.map((e: any) => e.sent).sort(),
zipped: primaryResult && trustedResult && primaryResult.length === trustedResult.length && Q.zip(
primaryResult?.sort((a: any, b: any) => a.sent.getTime() - b.sent.getTime()).map((e: any) => e.text),
trustedResult?.sort((a: any, b: any) => a.sent.getTime() - b.sent.getTime()).map((e: any) => e.text)
)
});
let changes = AutoVerifier.getChanges<T>(primaryResult, trustedResult);
LOG.debug('changes:', { changes });
let changesType = AutoVerifier.getListChangesType<T>(primaryResult, trustedResult, changes);
await changesFunc(query, changesType, changes);
}

View File

@ -6,6 +6,7 @@ const LOG = Logger.create(__filename, electronConsole);
import * as moment from 'moment';
import * as crypto from 'crypto';
import strcmp from '../../strcmp/strcmp';
function formatDate(date: Date) {
return moment(date).format('YYYY-MM-DD HH:mm:ss');
@ -108,6 +109,7 @@ export class Message implements WithEquals<Message> {
public channel: Channel | { id: string },
public member: Member | { id: string },
public readonly sent: Date,
public readonly _order: string, // access through order functions/methods
public readonly text: string | null,
public readonly resourceId: string | null,
public readonly resourceName: string | null,
@ -154,6 +156,7 @@ export class Message implements WithEquals<Message> {
{ id: dataMessage.channel_id },
{ id: dataMessage.member_id },
new Date(dataMessage.sent_dtg),
dataMessage.order,
dataMessage.text ?? null,
dataMessage.resource_id ?? null,
dataMessage.resource_name ?? null,
@ -174,12 +177,25 @@ export class Message implements WithEquals<Message> {
}
}
static sortOrder(a: Message, b: Message) {
return strcmp(a._order, b._order);
}
sortsBefore(other: Message) {
return strcmp(this._order, other._order) < 0;
}
sortsAfter(other: Message) {
return strcmp(this._order, other._order) > 0;
}
equals(other: Message) {
return (
this.id === other.id &&
this.member.id === other.member.id &&
this.channel.id === other.channel.id &&
this.sent.getTime() === other.sent.getTime() &&
this._order === other._order &&
this.resourceId === other.resourceId &&
this.resourceName === other.resourceName &&
this.resourceWidth === other.resourceWidth &&

View File

@ -8,18 +8,15 @@ export default class DedupAwaiter<T> {
public async call(): Promise<T> {
if (!this.promise) {
let result: T | null = null;
let promise = new Promise<T>(async (resolve) => {
this.promise = new Promise<T>(async (resolve) => {
resolve(await this.func());
});
// This if statement could trigger if func is not async
// typescript is missing some fun stuff going on here :)
if (result) {
return result;
} else {
this.promise = promise;
}
}
return await this.promise;
let promise = this.promise;
let result = await promise;
if (promise === this.promise) {
this.promise = null;
}
return result;
}
}

View File

@ -140,7 +140,7 @@ export default class PairVerifierFetchable extends EventEmitter<Conflictable> im
await this.primary.handleResourceAdded(trustedResource as Resource);
} else if (changesType === AutoVerifierChangesType.CONFLICT) {
await this.primary.handleResourceChanged(trustedResource as Resource);
this.emit('conflict-resource', changesType, primaryResource as Resource, trustedResource as Resource);
this.emit('conflict-resource', changesType, query, primaryResource as Resource, trustedResource as Resource);
}
}
@ -150,7 +150,7 @@ export default class PairVerifierFetchable extends EventEmitter<Conflictable> im
if (changes.deleted.length > 0) await this.primary.handleMessagesDeleted(changes.deleted);
if (changesType === AutoVerifierChangesType.CONFLICT) {
this.emit('conflict-messages', changesType, changes);
this.emit('conflict-messages', changesType, query, changes);
}
}

View File

@ -18,6 +18,7 @@ import PairVerifierFetchable from './fetchable-pair-verifier';
import EnsuredFetchable from './fetchable-ensured';
import { EventEmitter } from 'tsee';
import { AutoVerifierChangesType } from './auto-verifier';
import { IDQuery, PartialMessageListQuery } from './auto-verifier-with-args';
export default class CombinedGuild extends EventEmitter<Connectable & Conflictable> implements AsyncGuaranteedFetchable, AsyncRequestable {
private readonly ramGuild: RAMGuild;
@ -138,21 +139,21 @@ export default class CombinedGuild extends EventEmitter<Connectable & Conflictab
LOG.info(`g#${this.id} channels conflict`, { changes });
this.emit('conflict-channels', changesType, changes);
});
ramDiskSocket.on('conflict-messages', async (changesType: AutoVerifierChangesType, changes: Changes<Message>) => {
ramDiskSocket.on('conflict-messages', async (changesType: AutoVerifierChangesType, query: PartialMessageListQuery, changes: Changes<Message>) => {
let members = await this.grabRAMMembersMap();
let channels = await this.grabRAMChannelsMap();
for (let message of changes.added) message.fill(members, channels);
for (let dataPoint of changes.updated) dataPoint.newDataPoint.fill(members, channels);
for (let message of changes.deleted) message.fill(members, channels);
this.emit('conflict-messages', changesType, changes);
this.emit('conflict-messages', changesType, query, changes);
});
ramDiskSocket.on('conflict-tokens', (changesType: AutoVerifierChangesType, changes: Changes<Token>) => {
LOG.info(`g#${this.id} tokens conflict`, { changes });
this.emit('conflict-tokens', changesType, changes);
});
ramDiskSocket.on('conflict-resource', (changesType: AutoVerifierChangesType, oldResource: Resource, newResource: Resource) => {
ramDiskSocket.on('conflict-resource', (changesType: AutoVerifierChangesType, query: IDQuery, oldResource: Resource, newResource: Resource) => {
LOG.warn(`g#${this.id} resource conflict`, { oldResource, newResource });
this.emit('conflict-resource', changesType, oldResource, newResource);
this.emit('conflict-resource', changesType, query, oldResource, newResource);
});
this.fetchable = new EnsuredFetchable(ramDiskSocket);

View File

@ -1,6 +1,7 @@
import { Changes, Channel, GuildMetadata, Member, Message, Resource, Token } from './data-types';
import { DefaultEventMap, EventEmitter } from 'tsee';
import { AutoVerifierChangesType } from './auto-verifier';
import { IDQuery, PartialMessageListQuery } from './auto-verifier-with-args';
// Fetchable
@ -135,9 +136,9 @@ export type Conflictable = {
'conflict-metadata': (changesType: AutoVerifierChangesType, oldGuildMeta: GuildMetadata, newGuildMeta: GuildMetadata) => void;
'conflict-channels': (changesType: AutoVerifierChangesType, changes: Changes<Channel>) => void;
'conflict-members': (changesType: AutoVerifierChangesType, changes: Changes<Member>) => void;
'conflict-messages': (changesType: AutoVerifierChangesType, changes: Changes<Message>) => void;
'conflict-tokens': (changesType: AutoVerifierChangesType, changes: Changes<Token>) => void;
'conflict-resource': (changesType: AutoVerifierChangesType, oldResource: Resource, newResource: Resource) => void;
'conflict-resource': (changesType: AutoVerifierChangesType, query: IDQuery, oldResource: Resource, newResource: Resource) => void;
'conflict-messages': (changesType: AutoVerifierChangesType, query: PartialMessageListQuery, changes: Changes<Message>) => void;
}
export const GuildEventNames = [

View File

@ -1,3 +1,4 @@
import strcmp from "../../strcmp/strcmp";
import { Message, ShouldNeverHappenError } from "./data-types";
import Globals from "./globals";
@ -34,8 +35,7 @@ export default class MessageRAMCache {
if (!value) return;
if (value.totalCharacters > Globals.MAX_RAM_CACHED_MESSAGES_CHANNEL_CHARACTERS) {
let messages = Array.from(value.messages.values())
.sort((a, b) => a.sent.getTime() - b.sent.getTime());
let messages = Array.from(value.messages.values()).sort(Message.sortOrder);
while (value.totalCharacters > Globals.MAX_RAM_CACHED_MESSAGES_CHANNEL_CHARACTERS) {
let message = messages.shift();
if (!message) throw new ShouldNeverHappenError('could not find a message to clear');
@ -87,7 +87,7 @@ export default class MessageRAMCache {
let value = this.data.get(id);
if (!value) throw new ShouldNeverHappenError('unable to get message map');
value.lastUsed = new Date();
let allRecentMessages = Array.from(value.messages.values()).sort((a, b) => a.sent.getTime() - b.sent.getTime());
let allRecentMessages = Array.from(value.messages.values()).sort(Message.sortOrder);
let start = Math.min(allRecentMessages.length - number, 0);
let result = allRecentMessages.slice(start);
if (result.length === 0) {

View File

@ -1,3 +1,8 @@
import * as electronRemote from '@electron/remote';
const electronConsole = electronRemote.getGlobal('console') as Console;
import Logger from '../../logger/logger';
const LOG = Logger.create(__filename, electronConsole);
import * as crypto from 'crypto';
import ConcurrentQueue from "../../concurrent-queue/concurrent-queue";
@ -130,6 +135,7 @@ export default class PersonalDB {
, resource_width INTEGER
, resource_height INTEGER
, resource_preview_id TEXT
, "order" TEXT
, CONSTRAINT messages_id_guild_id_con UNIQUE (id, guild_id)
)
`);
@ -433,10 +439,19 @@ export default class PersonalDB {
`INSERT INTO messages (
id, guild_id, channel_id, member_id, sent_dtg, text
, resource_id, resource_name, resource_width, resource_height, resource_preview_id
, "order"
) VALUES (
:id, :guild_id, :channel_id, :member_id, :sent_dtg, :text
, :resource_id, :resource_name, :resource_width, :resource_height, :resource_preview_id
)`
, :order
)
ON CONFLICT (id, guild_id) DO
UPDATE SET
id=:id, guild_id=:guild_id, channel_id=:channel_id, member_id=:member_id, sent_dtg=:sent_dtg,
text=:text, resource_id=:resource_id, resource_name=:resource_name,
resource_width=:resource_width, resource_height=:resource_height, resource_preview_id=:resource_preview_id,
"order"=:order
`
);
for (let message of messages) {
await stmt.run({
@ -445,7 +460,8 @@ export default class PersonalDB {
':sent_dtg': message.sent.getTime(), ':text': message.text,
':resource_id': message.resourceId, ':resource_name': message.resourceName,
':resource_width': message.resourceWidth, ':resource_height': message.resourceHeight,
':resource_preview_id': message.resourcePreviewId
':resource_preview_id': message.resourcePreviewId,
':order': message._order
});
}
await stmt.finalize();
@ -457,6 +473,10 @@ export default class PersonalDB {
let stmt = await this.db.prepare(
`UPDATE resources SET
hash=:hash, data=:data, data_size=:data_size, last_used=:last_used
channel_id=:channel_id, member_id=:member_id, sent_id=:sent_dtg, text=:text
, resource_id=:resource_id, :resource_name=resource_name, resource_width=:resource_width
, resource_height=:resource_height, resource_preview_id=:resource_preview_id
, "order"=:order
WHERE
id=:id AND guild_id=:guild_id`
);
@ -467,7 +487,8 @@ export default class PersonalDB {
':sent_dtg': message.sent.getTime(), ':text': message.text,
':resource_id': message.resourceId, ':resource_name': message.resourceName,
':resource_width': message.resourceWidth, ':resource_height': message.resourceHeight,
':resource_preview_id': message.resourcePreviewId
':resource_preview_id': message.resourcePreviewId,
':order': message._order
});
}
await stmt.finalize();
@ -492,11 +513,12 @@ export default class PersonalDB {
SELECT *
FROM "messages"
WHERE "guild_id"=:guild_id AND "channel_id"=:channel_id
ORDER BY "sent_dtg" DESC
ORDER BY "order" DESC
LIMIT :number
) AS "r" ORDER BY "r"."sent_dtg" ASC
) AS "r" ORDER BY "r"."order"
`, { ':guild_id': guildId, ':channel_id': channelId, ':number': number });
if (messages.length === 0) return null;
LOG.debug(`return ${messages.length}/${number} recent messages`);
return messages.map(dataMessage => Message.fromDBData(dataMessage));
}
@ -508,10 +530,10 @@ export default class PersonalDB {
WHERE
"guild_id"=:guild_id
AND "channel_id"=:channel_id
AND "sent_dtg" < (SELECT "sent_dtg" FROM "messages" WHERE "id"=:message_id)
ORDER BY "sent_dtg" DESC
AND "order" < (SELECT "order" FROM "messages" WHERE "id"=:message_id)
ORDER BY "order" DESC, "id" DESC
LIMIT :number
) AS "r" ORDER BY "r"."sent_dtg" ASC
) AS "r" ORDER BY "r"."order" ASC
`, { ':guild_id': guildId, ':channel_id': channelId, ':message_id': messageId, ':number': number });
if (messages.length === 0) return null;
return messages.map(dataMessage => Message.fromDBData(dataMessage));
@ -524,8 +546,8 @@ export default class PersonalDB {
WHERE
"guild_id"=:guild_id
AND "channel_id"=:channel_id
AND "sent_dtg" > (SELECT "sent_dtg" FROM "messages" WHERE "id"=:message_id)
ORDER BY "sent_dtg" ASC
AND "order" > (SELECT "order" FROM "messages" WHERE "id"=:message_id)
ORDER BY "order" ASC
LIMIT :number
`, { ':guild_id': guildId, ':channel_id': channelId, ':message_id': messageId, ':number': number });
if (messages.length === 0) return null;

View File

@ -17,6 +17,7 @@ import createChannel from './elements/channel';
import createMember from './elements/member';
import GuildsManager from './guilds-manager';
import createMessage from './elements/message';
import strcmp from '../../strcmp/strcmp';
interface SetMessageProps {
atTop: boolean;
@ -414,7 +415,7 @@ export default class UI {
let channelIds = new Set(messages.map(message => message.channel.id));
for (let channelId of channelIds) {
let channelMessages = messages.filter(message => message.channel.id === channelId);
channelMessages = channelMessages.sort((a, b) => a.sent.getTime() - b.sent.getTime());
channelMessages = channelMessages.sort(Message.sortOrder);
// No Previous Messages is an easy case
if (this.messagePairs.size === 0) {
@ -425,9 +426,9 @@ export default class UI {
let topMessagePair = this.getTopMessagePair() as { message: Message, element: HTMLElement };
let bottomMessagePair = this.getBottomMessagePair() as { message: Message, element: HTMLElement };
let aboveMessages = messages.filter(message => message.sent.getTime() < topMessagePair.message.sent.getTime());
let belowMessages = messages.filter(message => message.sent.getTime() > bottomMessagePair.message.sent.getTime());
let betweenMessages = messages.filter(message => message.sent.getTime() >= topMessagePair.message.sent.getTime() && message.sent.getTime() <= bottomMessagePair.message.sent.getTime());
let aboveMessages = messages.filter(message => message.sortsBefore(topMessagePair.message));
let belowMessages = messages.filter(message => message.sortsAfter(bottomMessagePair.message));
let betweenMessages = messages.filter(message => !message.sortsBefore(topMessagePair.message) && !message.sortsAfter(bottomMessagePair.message));
if (aboveMessages.length > 0) await this.addMessagesBefore(guild, { id: channelId }, aboveMessages, topMessagePair.message);
if (belowMessages.length > 0) await this.addMessagesAfter(guild, { id: channelId }, belowMessages, bottomMessagePair.message);

View File

@ -1,7 +1,8 @@
import * as crypto from 'crypto';
import * as pg from 'pg';
import { stringify } from 'querystring';
import * as uuid from 'uuid';
import ConcurrentQueue from '../concurrent-queue/concurrent-queue';
import Logger from '../logger/logger';
@ -162,11 +163,12 @@ export default class DB {
, "resource_width"
, "resource_height"
, "resource_preview_id"
, "order"
FROM "messages"
WHERE "guild_id"=$1 AND "channel_id"=$2
ORDER BY "sent_dtg" DESC
ORDER BY "order" DESC
LIMIT $3
) AS "r" ORDER BY "r"."sent_dtg" ASC`, [ guildId, channelId, number ]);
) AS "r" ORDER BY "r"."order" ASC`, [ guildId, channelId, number ]);
return result.rows;
}
@ -181,14 +183,15 @@ export default class DB {
, "resource_width"
, "resource_height"
, "resource_preview_id"
, "order"
FROM "messages"
WHERE
"guild_id"=$1
AND "channel_id"=$2
AND "sent_dtg" < (SELECT "sent_dtg" FROM "messages" WHERE "id"=$3)
ORDER BY "sent_dtg" DESC
AND "order" < (SELECT "order" FROM "messages" WHERE "id"=$3)
ORDER BY "order" DESC
LIMIT $4
) AS "r" ORDER BY "r"."sent_dtg" ASC`, [ guildId, channelId, messageId, number ]);
) AS "r" ORDER BY "r"."order" ASC`, [ guildId, channelId, messageId, number ]);
return result.rows;
}
@ -202,12 +205,13 @@ export default class DB {
, "resource_width"
, "resource_height"
, "resource_preview_id"
, "order"
FROM "messages"
WHERE
"guild_id"=$1
AND "channel_id"=$2
AND "sent_dtg" > (SELECT "sent_dtg" FROM "messages" WHERE "id"=$3)
ORDER BY "sent_dtg" ASC
AND "order" > (SELECT "order" FROM "messages" WHERE "id"=$3)
ORDER BY "order" ASC, "id" ASC
LIMIT $4`, [ guildId, channelId, messageId, number ]);
return result.rows;
}
@ -230,10 +234,11 @@ export default class DB {
}
static async insertMessage(guildId: string, channelId: string, memberId: string, text: string): Promise<any> {
let id = uuid.v4();
let result = await db.query(
`INSERT INTO "messages" (
"guild_id", "channel_id", "member_id", "text", "sent_dtg"
) VALUES ($1, $2, $3, $4, NOW())
"id", "guild_id", "channel_id", "member_id", "text", "sent_dtg", "order"
) VALUES ($1::UUID, $2, $3, $4, $5, NOW(), LPAD(FLOOR(EXTRACT(epoch FROM NOW())::decimal * 100000)::text, 20, '0') || $1::text)
RETURNING
"id", "channel_id", "member_id"
, "text", "sent_dtg"
@ -241,7 +246,9 @@ export default class DB {
, "resource_name"
, "resource_width"
, "resource_height"
, "resource_preview_id"`, [ guildId, channelId, memberId, text ]);
, "resource_preview_id"
, "order"
`, [ id, guildId, channelId, memberId, text ]);
if (result.rows.length != 1) {
throw new Error('unable to properly insert message');
@ -254,17 +261,20 @@ export default class DB {
guildId: string,
channelId: string,
memberId: string,
text: string,
text: string | null,
resourceId: string,
resourceName: string,
resourceWidth: number | null,
resourceHeight: number | null,
resourcePreviewId: string | null
): Promise<any> {
let id = uuid.v4();
let result = await db.query(
`INSERT INTO "messages" (
"guild_id", "channel_id", "member_id", "text", "resource_id", "resource_name", "resource_width", "resource_height", "resource_preview_id", "sent_dtg"
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
"id", "guild_id", "channel_id", "member_id", "text",
"resource_id", "resource_name", "resource_width", "resource_height",
"resource_preview_id", "sent_dtg", "order"
) VALUES ($1:UUID, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), LPAD(FLOOR(EXTRACT(epoch FROM NOW())::decimal * 100000)::text, 20, '0') || $1::text)
RETURNING
"id", "channel_id", "member_id"
, "text", "sent_dtg"
@ -272,7 +282,9 @@ export default class DB {
, "resource_name"
, "resource_width"
, "resource_height"
, "resource_preview_id"`, [ guildId, channelId, memberId, text, resourceId, resourceName, resourceWidth, resourceHeight, resourcePreviewId ]);
, "resource_preview_id"
, "order"
`, [ id, guildId, channelId, memberId, text, resourceId, resourceName, resourceWidth, resourceHeight, resourcePreviewId ]);
if (result.rows.length != 1) {
throw new Error('unable to properly insert message (with resource)');
@ -296,6 +308,31 @@ export default class DB {
return resourceResult.rows[0].id;
}
// static async updateMessage(
// guildId: string,
// messageId: string,
// channelId: string,
// memberId: string,
// text: string | null,
// resourceId: string | null,
// resourceName: string | null,
// resourceWidth: number | null,
// resourceHeight: number | null,
// resourcePreviewId: string | null
// ) {
// let updateResult = await db.query(
// `INSERT INTO "messages" (
// "guild_id", "channel_id", "member_id", "text",
// "resource_id", "resource_name", "resource_width", "resource_height",
// "resource_preview_id", "sent_dtg"
// ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
// WHERE
// "guild_id"=$1
// AND "message_id"=$2
// `
// )
// }
// Resets all members status to Offline for specified guildId
static async clearAllMemberStatus(guildId: string): Promise<void> {
await db.query(`UPDATE "members" SET "status"='offline' WHERE "guild_id"=$1`, [ guildId ]);

View File

@ -100,17 +100,18 @@ ALTER TABLE "channels" OWNER TO "cordis";
-- TODO: Maybe use varchar for message text to prevent oversized messages?
-- NOTE: this will probably all be handled in node.js code instead (for more configurability).
CREATE TABLE "messages" (
"id" UUID NOT NULL DEFAULT uuid_generate_v4()
"id" UUID NOT NULL-- DEFAULT uuid_generate_v4() -- Removed since this is manually generated for ordering purposes
, "guild_id" UUID NOT NULL REFERENCES "guilds"("id") ON DELETE CASCADE
, "channel_id" UUID NOT NULL REFERENCES "channels"("id") ON DELETE CASCADE
, "member_id" UUID NOT NULL REFERENCES "members"("id") -- probably want set null
, "sent_dtg" TIMESTAMP WITH TIME ZONE NOT NULL
, "sent_dtg" TIMESTAMP(3) WITH TIME ZONE NOT NULL
, "text" TEXT
, "resource_id" UUID REFERENCES "resources"("id")
, "resource_name" VARCHAR(256)
, "resource_width" INT
, "resource_height" INT
, "resource_preview_id" UUID REFERENCES "resources"("id")
, "order" TEXT
, PRIMARY KEY ("id")
);
ALTER TABLE "messages" OWNER TO "cordis";

10
strcmp/strcmp.ts Normal file
View File

@ -0,0 +1,10 @@
/**
* Basic string compare (good for sorting)
* strcmp('abc', 'abc') -> 0
* strcmp('def', 'abc') -> 1
* strcmp('abc', 'def') -> -1
* See also: https://stackoverflow.com/q/1179366
*/
export default function strcmp(a: string, b: string): 0 | 1 | -1 {
return ((a === b) ? 0 : ((a > b) ? 1 : -1));
}