diff --git a/src/bot.ts b/src/bot.ts index 8051c0b3c4190955ef6012f4e39ec5613f8ede6f..22791a07b812ff8983d86de5863642d1b3def0ff 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -471,8 +471,13 @@ export class DiscordBot { opts.embed = embed; msg = await chan.send("", opts); } - await this.StoreMessagesSent(msg, chan, event); - this.unlockChannel(chan); + // Don't block on this. + this.StoreMessagesSent(msg, chan, event).then(() => { + this.unlockChannel(chan); + }).catch(() => { + log.warn("Failed to store sent message for ", event.event_id); + }); + } catch (err) { log.error("Couldn't send message. ", err); } diff --git a/src/db/roomstore.ts b/src/db/roomstore.ts index 2cb9d9f0747244c069a3538a9480d966601233f8..ed311a8ca450258f1aeb83fd2d7fb33ba37cc293 100644 --- a/src/db/roomstore.ts +++ b/src/db/roomstore.ts @@ -18,8 +18,8 @@ import { IDatabaseConnector } from "./connector"; import { Util } from "../util"; import * as uuid from "uuid/v4"; -import { Postgres } from "./postgres"; import { MetricPeg } from "../metrics"; +import { TimedCache } from "../structures/timedcache"; const log = new Log("DbRoomStore"); @@ -93,9 +93,9 @@ const ENTRY_CACHE_LIMETIME = 30000; // XXX: This only implements functions used in the bridge at the moment. export class DbRoomStore { - private entriesMatrixIdCache: Map<string, {e: IRoomStoreEntry[], ts: number}>; + private entriesMatrixIdCache: TimedCache<string, IRoomStoreEntry[]>; constructor(private db: IDatabaseConnector) { - this.entriesMatrixIdCache = new Map(); + this.entriesMatrixIdCache = new TimedCache(ENTRY_CACHE_LIMETIME); } public async upsertEntry(entry: IRoomStoreEntry) { @@ -155,9 +155,9 @@ export class DbRoomStore { public async getEntriesByMatrixId(matrixId: string): Promise<IRoomStoreEntry[]> { const cached = this.entriesMatrixIdCache.get(matrixId); - if (cached && cached.ts + ENTRY_CACHE_LIMETIME > Date.now()) { + if (cached) { MetricPeg.get.storeCall("RoomStore.getEntriesByMatrixId", true); - return cached.e; + return cached; } MetricPeg.get.storeCall("RoomStore.getEntriesByMatrixId", false); const entries = await this.db.All( @@ -187,7 +187,7 @@ export class DbRoomStore { } } if (res.length > 0) { - this.entriesMatrixIdCache.set(matrixId, {e: res, ts: Date.now()}); + this.entriesMatrixIdCache.set(matrixId, res); } return res; } diff --git a/src/db/userstore.ts b/src/db/userstore.ts index 0f307d6158e527ea79dde544fd9dea8e2771add6..7e84dc0a84faf4767a188349e11236ef34586d9c 100644 --- a/src/db/userstore.ts +++ b/src/db/userstore.ts @@ -17,6 +17,7 @@ limitations under the License. import { IDatabaseConnector } from "./connector"; import { Log } from "../log"; import { MetricPeg } from "../metrics"; +import { TimedCache } from "../structures/timedcache"; /** * A UserStore compatible with @@ -45,17 +46,17 @@ export interface IUserStoreEntry { } export class DbUserStore { - private remoteUserCache: Map<string, {e: RemoteUser, ts: number}>; + private remoteUserCache: TimedCache<string, RemoteUser>; constructor(private db: IDatabaseConnector) { - this.remoteUserCache = new Map(); + this.remoteUserCache = new TimedCache(ENTRY_CACHE_LIMETIME); } public async getRemoteUser(remoteId: string): Promise<RemoteUser|null> { const cached = this.remoteUserCache.get(remoteId); - if (cached && cached.ts + ENTRY_CACHE_LIMETIME > Date.now()) { + if (cached) { MetricPeg.get.storeCall("UserStore.getRemoteUser", true); - return cached.e; + return cached; } MetricPeg.get.storeCall("UserStore.getRemoteUser", false); @@ -84,7 +85,7 @@ export class DbUserStore { remoteUser.guildNicks.set(guild_id as string, nick as string); }); } - this.remoteUserCache.set(remoteId, {e: remoteUser, ts: Date.now()}); + this.remoteUserCache.set(remoteId, remoteUser); return remoteUser; } diff --git a/src/matrixeventprocessor.ts b/src/matrixeventprocessor.ts index 5751c8d962edca82431fd663075cfe55a41dd18f..ee6dd3d20457255fe998ea27b372031c82ca4c81 100644 --- a/src/matrixeventprocessor.ts +++ b/src/matrixeventprocessor.ts @@ -28,6 +28,8 @@ import { MatrixMessageProcessor, IMatrixMessageProcessorParams } from "./matrixm import { MatrixCommandHandler } from "./matrixcommandhandler"; import { Log } from "./log"; +import { TimedCache } from "./structures/timedcache"; +import { MetricPeg } from "./metrics"; const log = new Log("MatrixEventProcessor"); const MaxFileSize = 8000000; @@ -37,6 +39,7 @@ const DISCORD_AVATAR_WIDTH = 128; const DISCORD_AVATAR_HEIGHT = 128; const ROOM_NAME_PARTS = 2; const AGE_LIMIT = 900000; // 15 * 60 * 1000 +const PROFILE_CACHE_LIFETIME = 900000; export class MatrixEventProcessorOpts { constructor( @@ -59,12 +62,14 @@ export class MatrixEventProcessor { private discord: DiscordBot; private matrixMsgProcessor: MatrixMessageProcessor; private mxCommandHandler: MatrixCommandHandler; + private mxUserProfileCache: TimedCache<string, {displayname: string, avatar_url: string|undefined}>; constructor(opts: MatrixEventProcessorOpts, cm?: MatrixCommandHandler) { this.config = opts.config; this.bridge = opts.bridge; this.discord = opts.discord; this.matrixMsgProcessor = new MatrixMessageProcessor(this.discord); + this.mxUserProfileCache = new TimedCache(PROFILE_CACHE_LIFETIME); if (cm) { this.mxCommandHandler = cm; } else { @@ -76,6 +81,7 @@ export class MatrixEventProcessor { const event = request.getData() as IMatrixEvent; if (event.unsigned.age > AGE_LIMIT) { log.warn(`Skipping event due to age ${event.unsigned.age} > ${AGE_LIMIT}`); + MetricPeg.get.requestOutcome(event.event_id, false, "dropped"); return; } if ( @@ -116,17 +122,15 @@ export class MatrixEventProcessor { event.content!.body!.startsWith("!discord"); if (isBotCommand) { await this.mxCommandHandler.Process(event, context); - return; } else if (context.rooms.remote) { const srvChanPair = context.rooms.remote.roomId.substr("_discord".length).split("_", ROOM_NAME_PARTS); try { await this.ProcessMsgEvent(event, srvChanPair[0], srvChanPair[1]); - return; } catch (err) { log.warn("There was an error sending a matrix event", err); - return; } } + return; } else if (event.type === "m.room.encryption" && context.rooms.remote) { try { await this.HandleEncryptionWarning(event.room_id); @@ -134,10 +138,9 @@ export class MatrixEventProcessor { } catch (err) { throw new Error(`Failed to handle encrypted room, ${err}`); } - } else { - log.verbose("Got non m.room.message event"); } log.verbose("Event not processed by bridge"); + MetricPeg.get.requestOutcome(event.event_id, false, "dropped"); } public async HandleEncryptionWarning(roomId: string): Promise<void> { @@ -164,7 +167,6 @@ export class MatrixEventProcessor { log.verbose(`Looking up ${guildId}_${channelId}`); const roomLookup = await this.discord.LookupRoom(guildId, channelId, event.sender); const chan = roomLookup.channel; - const botUser = roomLookup.botUser; const embedSet = await this.EventToEmbed(event, chan); const opts: Discord.MessageOptions = {}; @@ -176,7 +178,10 @@ export class MatrixEventProcessor { } await this.discord.send(embedSet, opts, roomLookup, event); - await this.sendReadReceipt(event); + // Don't await this. + this.sendReadReceipt(event).catch((ex) => { + log.verbose("Failed to send read reciept for ", event.event_id, ex); + }); } public async ProcessStateEvent(event: IMatrixEvent) { @@ -196,7 +201,6 @@ export class MatrixEventProcessor { let msg = `\`${event.sender}\` `; - const isNew = event.unsigned === undefined || event.unsigned.prev_content === undefined; const allowJoinLeave = !this.config.bridge.disableJoinLeaveNotifications; if (event.type === "m.room.name") { @@ -205,7 +209,22 @@ export class MatrixEventProcessor { msg += `set the topic to \`${event.content!.topic}\``; } else if (event.type === "m.room.member") { const membership = event.content!.membership; - if (membership === "join" && isNew && allowJoinLeave) { + const intent = this.bridge.getIntent(); + const isNewJoin = event.unsigned.replaces_state === undefined ? true : ( + await intent.getEvent(event.room_id, event.unsigned.replaces_state)).content.membership !== "join"; + if (membership === "join") { + this.mxUserProfileCache.delete(`${event.room_id}:${event.sender}`); + this.mxUserProfileCache.delete(event.sender); + if (event.content!.displayname) { + this.mxUserProfileCache.set(`${event.room_id}:${event.sender}`, { + avatar_url: event.content!.avatar_url, + displayname: event.content!.displayname!, + }); + } + // We don't know if the user also updated their profile, but to be safe.. + this.mxUserProfileCache.delete(event.sender); + } + if (membership === "join" && isNewJoin && allowJoinLeave) { msg += "joined the room"; } else if (membership === "invite") { msg += `invited \`${event.state_key}\` to the room`; @@ -230,19 +249,7 @@ export class MatrixEventProcessor { event: IMatrixEvent, channel: Discord.TextChannel, getReply: boolean = true, ): Promise<IMatrixEventProcessorResult> { const mxClient = this.bridge.getClientFactory().getClientAs(); - let profile: IMatrixEvent | null = null; - try { - profile = await mxClient.getStateEvent(event.room_id, "m.room.member", event.sender); - if (!profile) { - profile = await mxClient.getProfileInfo(event.sender); - } - if (!profile) { - log.warn(`User ${event.sender} has no member state and no profile. That's odd.`); - } - } catch (err) { - log.warn(`Trying to fetch member state or profile for ${event.sender} failed`, err); - } - + const profile = await this.GetUserProfileForRoom(event.room_id, event.sender); const params = { mxClient, roomId: event.room_id, @@ -367,6 +374,45 @@ export class MatrixEventProcessor { return embed; } + private async GetUserProfileForRoom(roomId: string, userId: string) { + const mxClient = this.bridge.getClientFactory().getClientAs(); + const intent = this.bridge.getIntent(); + let profile: {displayname: string, avatar_url: string|undefined} | undefined; + try { + // First try to pull out the room-specific profile from the cache. + profile = this.mxUserProfileCache.get(`${roomId}:${userId}`); + if (profile) { + return profile; + } + log.verbose(`Profile ${userId}:${roomId} not cached`); + + // Failing that, try fetching the state. + profile = await mxClient.getStateEvent(roomId, "m.room.member", userId); + if (profile) { + this.mxUserProfileCache.set(`${roomId}:${userId}`, profile); + return profile; + } + + // Try fetching the users profile from the cache + profile = this.mxUserProfileCache.get(userId); + if (profile) { + return profile; + } + + // Failing that, try fetching the profile. + log.verbose(`Profile ${userId} not cached`); + profile = await intent.getProfileInfo(userId); + if (profile) { + this.mxUserProfileCache.set(userId, profile); + return profile; + } + log.warn(`User ${userId} has no member state and no profile. That's odd.`); + } catch (err) { + log.warn(`Trying to fetch member state or profile for ${userId} failed`, err); + } + return undefined; + } + private async sendReadReceipt(event: IMatrixEvent) { if (!this.config.bridge.disableReadReceipts) { try { @@ -394,8 +440,9 @@ export class MatrixEventProcessor { return hasAttachment; } - private async SetEmbedAuthor(embed: Discord.RichEmbed, sender: string, profile?: IMatrixEvent | null) { - const intent = this.bridge.getIntent(); + private async SetEmbedAuthor(embed: Discord.RichEmbed, sender: string, profile?: { + displayname: string, + avatar_url: string|undefined }) { let displayName = sender; let avatarUrl; @@ -418,13 +465,6 @@ export class MatrixEventProcessor { } // Let it fall through. } - if (!profile) { - try { - profile = await intent.getProfileInfo(sender); - } catch (ex) { - log.warn(`Failed to fetch profile for ${sender}`, ex); - } - } if (profile) { if (profile.displayname && diff --git a/src/matrixtypes.ts b/src/matrixtypes.ts index 7535b59a89063a02e1f1946321550287f483dd94..f08ae1301c571fdbc866e7aafff94eed21e8229f 100644 --- a/src/matrixtypes.ts +++ b/src/matrixtypes.ts @@ -23,6 +23,7 @@ export interface IMatrixEventContent { msgtype?: string; url?: string; displayname?: string; + avatar_url?: string; reason?: string; "m.relates_to"?: any; // tslint:disable-line no-any } diff --git a/src/metrics.ts b/src/metrics.ts index db1556f3482c7526ec613e7fac7fe9490ef02e06..fce2a989ccb28d8e96d33b2c44d9abdad79a750b 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -133,11 +133,10 @@ export class PrometheusBridgeMetrics implements IBridgeMetrics { public requestOutcome(id: string, isRemote: boolean, outcome: string) { const startTime = this.requestsInFlight.get(id); - this.requestsInFlight.delete(id); if (!startTime) { - log.verbose(`Got "requestOutcome" for ${id}, but this request was never started`); return; } + this.requestsInFlight.delete(id); const duration = Date.now() - startTime; (isRemote ? this.remoteRequest : this.matrixRequest).observe({outcome}, duration / 1000); } diff --git a/src/structures/timedcache.ts b/src/structures/timedcache.ts new file mode 100644 index 0000000000000000000000000000000000000000..93424af9d488d493439bd4294dc969297f79eeee --- /dev/null +++ b/src/structures/timedcache.ts @@ -0,0 +1,106 @@ +interface ITimedValue<V> { + value: V; + ts: number; +} + +export class TimedCache<K, V> implements Map<K, V> { + private readonly map: Map<K, ITimedValue<V>>; + + public constructor(private readonly liveFor: number) { + this.map = new Map(); + } + + public clear(): void { + this.map.clear(); + } + + public delete(key: K): boolean { + return this.map.delete(key); + } + + public forEach(callbackfn: (value: V, key: K, map: Map<K, V>) => void|Promise<void>): void { + for (const item of this) { + callbackfn(item[1], item[0], this); + } + } + + public get(key: K): V | undefined { + const v = this.map.get(key); + if (v === undefined) { + return; + } + const val = this.filterV(v); + if (val !== undefined) { + return val; + } + // Cleanup expired key + this.map.delete(key); + } + + public has(key: K): boolean { + return this.get(key) !== undefined; + } + + public set(key: K, value: V): this { + this.map.set(key, { + ts: Date.now(), + value, + }); + return this; + } + + public get size(): number { + return this.map.size; + } + + public [Symbol.iterator](): IterableIterator<[K, V]> { + let iterator: IterableIterator<[K, ITimedValue<V>]>; + return { + next: () => { + if (!iterator) { + iterator = this.map.entries(); + } + let item: IteratorResult<[K, ITimedValue<V>]>|undefined; + let filteredValue: V|undefined; + // Loop if we have no item, or the item has expired. + while (!item || filteredValue === undefined) { + item = iterator.next(); + // No more items in map. Bye bye. + if (item.done) { + break; + } + filteredValue = this.filterV(item.value[1]); + } + if (item.done) { + // Typscript doesn't like us returning undefined for value, which is dumb. + // tslint:disable-next-line: no-any + return {done: true, value: undefined} as any as IteratorResult<[K, V]>; + } + return {done: false, value: [item.value[0], filteredValue]} as IteratorResult<[K, V]>; + }, + [Symbol.iterator]: () => this[Symbol.iterator](), + }; + } + + public entries(): IterableIterator<[K, V]> { + return this[Symbol.iterator](); + } + + public keys(): IterableIterator<K> { + throw new Error("Method not implemented."); + } + + public values(): IterableIterator<V> { + throw new Error("Method not implemented."); + } + + get [Symbol.toStringTag](): "Map" { + return "Map"; + } + + private filterV(v: ITimedValue<V>): V|undefined { + if (Date.now() - v.ts < this.liveFor) { + return v.value; + } + } +} diff --git a/test/config.ts b/test/config.ts index b6d7f447d8cf09471281abc3d3c84107f5fe1b2b..13b2ea444cf91e349167587d68f836afcf5252c1 100644 --- a/test/config.ts +++ b/test/config.ts @@ -18,11 +18,6 @@ import { argv } from "process"; import { Log } from "../src/log"; import * as WhyRunning from "why-is-node-running"; -const logger = new Log("MessageProcessor"); - -// we are a test file and thus need those -/* tslint:disable:no-unused-expression max-file-line-count */ - if (!argv.includes("--noisy")) { Log.ForceSilent(); } diff --git a/test/structures/test_timedcache.ts b/test/structures/test_timedcache.ts new file mode 100644 index 0000000000000000000000000000000000000000..f002fc77ee182cb14f839c52a39f3228ce02ef67 --- /dev/null +++ b/test/structures/test_timedcache.ts @@ -0,0 +1,124 @@ +/* +Copyright 2019 matrix-appservice-discord + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { expect } from "chai"; +import { TimedCache } from "../../src/structures/timedcache"; +import { Util } from "../../src/util"; + +// we are a test file and thus need those +/* tslint:disable:no-unused-expression max-file-line-count no-any */ + +describe("TimedCache", () => { + it("should construct", () => { + const timedCache = new TimedCache<string, number>(1000); + expect(timedCache.size).to.equal(0); + }); + + it("should add and get values", () => { + const timedCache = new TimedCache<string, number>(1000); + timedCache.set("foo", 1); + timedCache.set("bar", -1); + timedCache.set("baz", 0); + expect(timedCache.get("foo")).to.equal(1); + expect(timedCache.get("bar")).to.equal(-1); + expect(timedCache.get("baz")).to.equal(0); + }); + + it("should be able to overwrite values", () => { + const timedCache = new TimedCache<string, number>(1000); + timedCache.set("foo", 1); + expect(timedCache.get("foo")).to.equal(1); + timedCache.set("bar", 0); + timedCache.set("foo", -1); + expect(timedCache.get("bar")).to.equal(0); + expect(timedCache.get("foo")).to.equal(-1); + }); + + it("should be able to check if a value exists", () => { + const timedCache = new TimedCache<string, number>(1000); + expect(timedCache.has("foo")).to.be.false; + timedCache.set("foo", 1); + expect(timedCache.has("foo")).to.be.true; + timedCache.set("bar", 1); + expect(timedCache.has("bar")).to.be.true; + }); + + it("should be able to delete a value", () => { + const timedCache = new TimedCache<string, number>(1000); + timedCache.set("foo", 1); + expect(timedCache.has("foo")).to.be.true; + timedCache.delete("foo"); + expect(timedCache.has("foo")).to.be.false; + expect(timedCache.get("foo")).to.be.undefined; + }); + + it("should expire a value", async () => { + const LIVE_FOR = 50; + const timedCache = new TimedCache<string, number>(LIVE_FOR); + timedCache.set("foo", 1); + expect(timedCache.has("foo")).to.be.true; + expect(timedCache.get("foo")).to.equal(1); + await Util.DelayedPromise(LIVE_FOR); + expect(timedCache.has("foo")).to.be.false; + expect(timedCache.get("foo")).to.be.undefined; + }); + + it("should be able to iterate around a long-lasting collection", () => { + const timedCache = new TimedCache<string, number>(1000); + timedCache.set("foo", 1); + timedCache.set("bar", -1); + timedCache.set("baz", 0); + let i = 0; + for (const iterator of timedCache) { + if (i === 0) { + expect(iterator[0]).to.equal("foo"); + expect(iterator[1]).to.equal(1); + } else if (i === 1) { + expect(iterator[0]).to.equal("bar"); + expect(iterator[1]).to.equal(-1); + } else { + expect(iterator[0]).to.equal("baz"); + expect(iterator[1]).to.equal(0); + } + i++; + } + }); + + it("should be able to iterate around a short-term collection", async () => { + const LIVE_FOR = 100; + const timedCache = new TimedCache<string, number>(LIVE_FOR); + timedCache.set("foo", 1); + timedCache.set("bar", -1); + timedCache.set("baz", 0); + let i = 0; + for (const iterator of timedCache) { + if (i === 0) { + expect(iterator[0]).to.equal("foo"); + expect(iterator[1]).to.equal(1); + } else if (i === 1) { + expect(iterator[0]).to.equal("bar"); + expect(iterator[1]).to.equal(-1); + } else { + expect(iterator[0]).to.equal("baz"); + expect(iterator[1]).to.equal(0); + } + i++; + } + await Util.DelayedPromise(LIVE_FOR); + const vals = [...timedCache.entries()]; + expect(vals).to.be.empty; + }); +}); diff --git a/test/test_matrixeventprocessor.ts b/test/test_matrixeventprocessor.ts index e5dbf0c9f14582bd47369391de7938d4f568a628..43eb0c8a6cfc6b98eea212ef6b21193eb416a787 100644 --- a/test/test_matrixeventprocessor.ts +++ b/test/test_matrixeventprocessor.ts @@ -642,6 +642,7 @@ describe("MatrixEventProcessor", () => { }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -663,6 +664,7 @@ This is where the reply goes`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -684,6 +686,7 @@ This is where the reply goes`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -705,6 +708,7 @@ This is the second reply`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -726,6 +730,7 @@ This is the reply`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -745,6 +750,7 @@ This is the reply`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -762,6 +768,7 @@ This is the reply`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -786,6 +793,7 @@ This is the reply`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any); @@ -803,6 +811,7 @@ This is the reply`, }, }, }, + room_id: "!fakeroom:localhost", sender: "@test:localhost", type: "m.room.message", } as IMatrixEvent, mockChannel as any);