import DiscordApi, { GuildTextBasedChannel, TextBasedChannel } from "discord.js"; import {APIError as OpenAIError} from "openai"; import { ChatCompletion, ChatCompletionMessage, ChatCompletionMessageParam } from "openai/resources/chat"; import { database, openai, config } from "./index"; import Moderation from "./moderation"; import toOpenAIMessages from "./toOpenAIMessages"; import FunctionManager from "./funcitonManager"; type NonNullableInObject = { [k in keyof T]: k extends V ? NonNullable : T[k] }; export type apiRequest = DiscordApi.Message | DiscordApi.RepliableInteraction; export type RequestMessage = apiRequest & NonNullableInObject; class ChannelsRunningValue extends Array { tries = 0; channel: TextBasedChannel; private typingWorker = setInterval(() => this.sendTyping(), 5000).unref(); private typingStopper = setTimeout(() => this.stopTyping(), 60000).unref(); constructor(channel: TextBasedChannel) { super(); this.channel = channel; clearInterval(this.typingStopper); clearTimeout(this.typingStopper); } startTyping() { this.sendTyping(); this.typingWorker.refresh(); this.typingStopper.refresh(); } stopTyping() { clearInterval(this.typingWorker); clearTimeout(this.typingStopper); } sendTyping() { this.channel.sendTyping().catch(() => {/* GRACEFAIL: we fail to send the typing then */}); } shift() { this.tries = 0; return super.shift(); } } /** Stores the queue requests on the channels. */ const channelsRunning: DiscordApi.Collection = new DiscordApi.Collection(); /** * Gets the user that requested the execution * @param request The request to get the user from * @returns the user or guild member */ export function getAuthor(request: apiRequest) { if (request instanceof DiscordApi.Message) return request.author; return request.user; } /** * Replies to a request * @param request the request to reply to * @param message the message * @param replyOptions additional options if the request is a message * @param interactionOptions additional options if the request is an interaction * @returns Promise of the done action */ function requestReply( request: RequestMessage, message: DiscordApi.MessageReplyOptions & DiscordApi.InteractionReplyOptions, // TODO: add support for these below replyOptions: DiscordApi.MessageReplyOptions = {}, interactionOptions: DiscordApi.InteractionReplyOptions = {}, ) { if (request instanceof DiscordApi.Message) { return request.reply(Object.assign(message, replyOptions)); } else { if (!request.deferred) return request.reply(Object.assign(message, interactionOptions)); else if (request.replied) return request.followUp(Object.assign(message, interactionOptions)); else return request.editReply(Object.assign(message, interactionOptions)); } } /** * Checks if the request can be replied in a channel * @param request the request to check * @returns if the request can be replied to */ function canReplyToRequest(request: apiRequest) { return (request.guild?.members.me?.permissionsIn(request.channel as GuildTextBasedChannel).has("SendMessages") ?? true); } /** * Check and queues up the request and runs it if there is nothing in queue. * @param request the message to check and queue */ export async function queueRequest(request: apiRequest) { if (!request.channelId) { if (request instanceof DiscordApi.Message) await request.reply("request does not have channelId"); else if (request.isRepliable()) await request.reply("request does not have channelId"); console.log("There was incoming execution without channelId set, ignoring"); console.log(request); return; } if (!canReplyToRequest(request)) return; if (!(request instanceof DiscordApi.Message) && channelsRunning.get(request.channelId) && channelsRunning.get(request.channelId)?.length !== 0) { requestReply( request as RequestMessage, { embeds: [{ color: 0xb000ff, description: "I'm already typing here, please wait!", }] }, {}, { ephemeral: true, }, ).catch(() => {/*GRACEFAIL: don't do anything*/}); return; } const userLimit = await config.quota.checkUser(getAuthor(request), request); if (userLimit.used >= userLimit.quota) { if (request instanceof DiscordApi.Message) { request.react("🛑").catch(() => { /* NOTE: We send an informaton about limit reached in DM */ }); if (!request.author.dmChannel) await request.author.createDM(); request.author.dmChannel?.send({ embeds: [{ color: 0xff0000, description: "You've used up your quota,\n" + userLimit.toString(), }] }).catch(() => {/* FIXME: What should the bot do in this case to inform of limit reached?*/}); } else if (request.isRepliable()) { request.reply({ content: "You've used up your quota\n" + userLimit.toString(), ephemeral: true, }).catch(() => { /* Impossible to get there unless connection lost*/ }); } return; } const messagesForChannel = channelsRunning.ensure( request.channelId, () => { return new ChannelsRunningValue(request.channel as TextBasedChannel); }, ); const shouldStart = messagesForChannel.length === 0; messagesForChannel.push(request as RequestMessage); if (shouldStart) void executeFromQueue(request.channelId); } /** * Logs used tokens to the terminal and to the database * @param answer the response that OpenAI returned * @param message the message that initiated the execution * @param functionRan counter of how many function have been ran */ function logUsedTokens( answer: ChatCompletion, message: RequestMessage, functionRan: number, ) { const usage = answer.usage; const functionName = answer.choices[0].message?.function_call?.name; if (usage !== undefined) { const channelName: string = !message.channel.isDMBased() ? `${message.channel.name} (${message.guild?.name})` : `@${getAuthor(message).tag}`; console.log(`Used ${usage.total_tokens} (${usage.prompt_tokens} + ${usage.completion_tokens}) tokens for ${getAuthor(message).tag} (${getAuthor(message).id}) in #${channelName}${functionName ? " [Function: " + functionName + "]" : ""}`); database.usage.create({ data: { timestamp: message.createdAt, user: BigInt(getAuthor(message).id), channel: BigInt(message.channelId), guild: message.guildId ? BigInt(message.guildId) : null, usageRequest: usage.prompt_tokens, usageResponse: usage.completion_tokens, functionName: functionName ?? null, functionRan: functionName ? functionRan : 0, } }).catch((e => { console.error("Failed to push to a database"); console.error(e); })); } } /** * Executes the queue for the channel * @param channel the channel to run the queue for */ async function executeFromQueue(channel: string) { const channelQueue = channelsRunning.get(channel) as ChannelsRunningValue; const message = channelQueue.at(0) as RequestMessage; let functionRanCounter = 0; let OpenAImessages: ChatCompletionMessageParam[] = []; // ignore if we can't even send anything to reply if (!canReplyToRequest(message)) return; try { let messages: DiscordApi.Collection = await message.channel.messages.fetch({ limit: config.readLimits.messages, cache: false }); messages = messages.filter(m => message.createdTimestamp - m.createdTimestamp < config.readLimits.time ); messages.forEach(m => { Moderation.checkMessageNoReturn(m); }); const questionMessageId: string = message instanceof DiscordApi.Message ? message.id : ''; if (message instanceof DiscordApi.Message) { channelQueue.startTyping(); } else if (message.isRepliable()) { await message.deferReply(); } messages.sort((a, b) => { if (a.id === questionMessageId) return -1; if (b.id === questionMessageId) return 1; return b.createdTimestamp - a.createdTimestamp; }); OpenAImessages = toOpenAIMessages(messages.values()); let generatedMessage: ChatCompletionMessage | undefined = undefined; let answer: Awaited>; do { answer = await openai.chat.completions.create({ ...config.chatCompletionParams, messages: OpenAImessages, // FIXME: don't use new instance of FunctionManager functions: new FunctionManager().getFunctions(), }); logUsedTokens(answer, message, ++functionRanCounter); generatedMessage = answer.choices[0].message; if (!generatedMessage) throw new Error("Empty message received"); // handle function calls if (generatedMessage.function_call) { OpenAImessages.push(generatedMessage); // FIXME: don't use new instance of FunctionManager OpenAImessages.push( new FunctionManager().handleFunction(generatedMessage.function_call) ); } } while (generatedMessage.function_call); channelQueue.stopTyping(); const answerContent = answer.choices[0].message?.content; if (answerContent === null || answerContent === "") { if (message instanceof DiscordApi.Message) message.react("😶").catch(() => {/* GRACEFAIL: It's okay if the bot won't reply */}); } else { const answerMessagesContent :string[] = [""]; for (const i of answerContent.split(/\n\n/)) { if (answerMessagesContent[answerMessagesContent.length-1].length + i.length < 2000) { answerMessagesContent[answerMessagesContent.length-1] += "\n\n" + i; } else { answerMessagesContent.push(i); } } for (const i of answerMessagesContent) { const response = requestReply(message, {content: i}, {allowedMentions: { repliedUser: false }}); await response.then(rval => Moderation.checkMessageNoReturn(rval)); } } } catch (e) { let errorText: string = ""; channelQueue.stopTyping(); if (typeof e !== "object") { console.error(`Error ocurred while handling chat completion request (${typeof e}):`); console.error(e); } else if (e === null) { console.error ("Error ocurred while handling chat completion request: null"); } else { console.error(`Error ocurred while handling chat completion request (${e.constructor.name}):`); if (e instanceof OpenAIError) { console.error(JSON.stringify(e)); } else { console.error(e); } if (OpenAImessages.length !== 0) { console.error("Messages:"); console.error(OpenAImessages); } if (e instanceof Error) { errorText = e.message; } else errorText = ""; if (e instanceof OpenAIError && e.code?.match(/^5..$/) && channelQueue.tries < 3) { channelQueue.tries++; await new Promise(r => setTimeout(r, 2000)); // pause for 2 seconds before retrying return executeFromQueue(channel); } } requestReply( message, { embeds: [{ color: 0xff0000, description: "Something bad happened! :frowning:" + errorText }], }, {allowedMentions: { repliedUser: false } }, { ephemeral: true }, ).catch (() => {/* GRACEFAIL: It's okay if we don't reply with an error */}); } channelQueue.shift(); if (channelQueue.length === 0) channelsRunning.delete(channel); else return executeFromQueue(channel); }