import DiscordApi, { GuildTextBasedChannel, TextBasedChannel } from "discord.js"; import { ChatCompletionRequestMessage, ChatCompletionResponseMessage } from "openai"; import Axios from "axios"; import { database, openai, config } from "./index"; import Moderation from "./moderation"; import toOpenAIMessages from "./toOpenAIMessages"; import FunctionManager from "./funcitonManager"; type NonNullableInObject = { [k in keyof T]: k extends V ? NonNullable : T[k] }; type apiRequest = DiscordApi.Message | DiscordApi.RepliableInteraction; export type RequestMessage = apiRequest & NonNullableInObject; class ChannelsRunningValue extends Array { tries = 0; channel: TextBasedChannel; private typingWorker = setInterval(() => this.sendTyping(), 5000).unref(); private typingStopper = setTimeout(() => this.stopTyping(), 60000).unref(); constructor(channel: TextBasedChannel) { super(); this.channel = channel; clearInterval(this.typingStopper); clearTimeout(this.typingStopper); } startTyping() { this.sendTyping(); this.typingWorker.refresh(); this.typingStopper.refresh(); } stopTyping() { clearInterval(this.typingWorker); clearTimeout(this.typingStopper); } sendTyping() { this.channel.sendTyping().catch(() => {/* GRACEFAIL: we fail to send the typing then */}); } shift() { this.tries = 0; return super.shift(); } } /** Stores the queue requests on the channels. */ const channelsRunning: DiscordApi.Collection = new DiscordApi.Collection(); /** * Gets the user that requested the execution * @param request The request to get the user from * @returns the user or guild member */ export function getAuthor(request: apiRequest) { if (request instanceof DiscordApi.Message) return request.author; return request.user; } /** * gets user remaining limit (or lack of it) * @param user the user to check * @param requestTimestamp the timestamp of the user request * @returns object containing the limit and remaining usage or `false` if there is no limit */ export async function getUserLimit(user: string | { id: string }, requestTimestamp: Date) { const userId: string = typeof user === "string" ? user : user.id; const userLimits = await database.limits.findUnique({ where: { user: BigInt(userId) } }); if (userLimits?.vip) return false; const usedLimit = (await database.usage.count({ select: { _all: true }, where: { user: BigInt(userId), timestamp: { gte: new Date(requestTimestamp.getTime() - 1000 * 60 * 60 * 24 /* 24 hours */) } }, }))._all; if (!userLimits || !userLimits.limit) return {limit: config.userLimits.requests, remaining: config.userLimits.requests - usedLimit}; return {limit: userLimits.limit, remaining: userLimits.limit - usedLimit}; } /** * gets the timestamp of nth use inside time limit * @param user the user or id to check * @param requestTimestamp the timestamp of the request (message/interaction createdAt) * @param nth which timestamp in time limit to get (orderedd from oldest to newest) * @returns `false` if user is vip * @returns `null` if there is no request * @returns `Date` timestamp of the nth request */ export async function getNthUseInLimitTimestamp(user: string | { id: string }, requestTimestamp: Date, nth = 1) { const userId: string = typeof user === "string" ? user : user.id; const userLimits = await database.limits.findUnique({ where: { user: BigInt(userId)} }); if (userLimits?.vip) return false; const nthUseInLimit = await database.usage.findFirst({ where: { user: BigInt(userId), timestamp: { gte: new Date(requestTimestamp.getTime() - 1000 * 60 * 60 * 24 /* 24 hours */) } }, orderBy: { timestamp: "asc" }, skip: nth - 1, }); if (!nthUseInLimit) return null; return nthUseInLimit.timestamp; } /** * Replies to a request * @param request the request to reply to * @param message the message * @param replyOptions additional options if the request is a message * @param interactionOptions additional options if the request is an interaction * @returns Promise of the done action */ function requestReply( request: RequestMessage, message: DiscordApi.MessageReplyOptions & DiscordApi.InteractionReplyOptions, // TODO: add support for these below replyOptions: DiscordApi.MessageReplyOptions = {}, interactionOptions: DiscordApi.InteractionReplyOptions = {}, ) { if (request instanceof DiscordApi.Message) { return request.reply(Object.assign(message, replyOptions)); } else { if (!request.deferred) return request.reply(Object.assign(message, interactionOptions)); else if (request.replied) return request.followUp(Object.assign(message, interactionOptions)); else return request.editReply(Object.assign(message, interactionOptions)); } } /** * Checks if the request can be replied in a channel * @param request the request to check * @returns if the request can be replied to */ function canReplyToRequest(request: apiRequest) { return (request.guild?.members.me?.permissionsIn(request.channel as GuildTextBasedChannel).has("SendMessages") ?? true); } /** * Check and queues up the request and runs it if there is nothing in queue. * @param request the message to check and queue */ export async function queueRequest(request: apiRequest) { if (!request.channelId) { if (request instanceof DiscordApi.Message) await request.reply("request does not have channelId"); else if (request.isRepliable()) await request.reply("request does not have channelId"); console.log("There was incoming execution without channelId set, ignoring"); console.log(request); return; } if (!canReplyToRequest(request)) return; if (!(request instanceof DiscordApi.Message) && channelsRunning.get(request.channelId)?.length !== 0) { requestReply( request as RequestMessage, { embeds: [{ color: 0xb000ff, description: "I'm already typing here, please wait!", }] }, {}, { ephemeral: true, }, ) return; } const userLimit = await getUserLimit(getAuthor(request), request.createdAt); if (userLimit !== false && userLimit.remaining <= 0) { if (request instanceof DiscordApi.Message) { request.react("🛑").catch(() => { /* NOTE: We send an informaton about limit reached in DM */ }); if (!request.author.dmChannel) await request.author.createDM(); request.author.dmChannel?.send({ embeds: [{ color: 0xff0000, description: "You've used up your message limit for today,\n" + `${userLimit.limit} requests in last 24 hours`, }] }).catch(() => {/* FIXME: What should the bot do in this case to inform of limit reached?*/}); } else if (request.isRepliable()) { request.reply({ content: `You've used up your message limit for today, ${userLimit.limit} requests in last 24 hours`, ephemeral: true, }).catch(() => { /* Impossible to get there unless connection lost*/ }); } return; } const messagesForChannel = channelsRunning.ensure( request.channelId, () => { return new ChannelsRunningValue(request.channel as TextBasedChannel); }, ); const shouldStart = messagesForChannel.length === 0; messagesForChannel.push(request as RequestMessage); if (shouldStart) void executeFromQueue(request.channelId); } /** * Logs used tokens to the terminal and to the database * @param answer the response that OpenAI returned * @param message the message that initiated the execution * @param functionRan counter of how many function have been ran */ function logUsedTokens( answer: Awaited>, message: RequestMessage, functionRan: number, ) { const usage = answer.data.usage; const functionName = answer.data.choices[0].message?.function_call?.name; if (usage !== undefined) { const channelName: string = !message.channel.isDMBased() ? `${message.channel.name} (${message.guild?.name})` : `@${getAuthor(message).tag}`; console.log(`Used ${usage.total_tokens} (${usage.prompt_tokens} + ${usage.completion_tokens}) tokens for ${getAuthor(message).tag} (${getAuthor(message).id}) in #${channelName}${functionName ? " [Function: " + functionName + "]" : ""}`); database.usage.create({ data: { timestamp: message.createdAt, user: BigInt(getAuthor(message).id), channel: BigInt(message.channelId), guild: message.guildId ? BigInt(message.guildId) : null, usageRequest: usage.prompt_tokens, usageResponse: usage.completion_tokens, functionName: functionName ?? null, functionRan: functionName ? functionRan : 0, } }).catch((e => { console.error("Failed to push to a database"); console.error(e); })); } } /** * Executes the queue for the channel * @param channel the channel to run the queue for */ async function executeFromQueue(channel: string) { const channelQueue = channelsRunning.get(channel) as ChannelsRunningValue; const message = channelQueue.at(0) as RequestMessage; let functionRanCounter = 0; let OpenAImessages: ChatCompletionRequestMessage[] = []; // ignore if we can't even send anything to reply if (!canReplyToRequest(message)) return; try { let messages: DiscordApi.Collection = await message.channel.messages.fetch({ limit: config.readLimits.messages, cache: false }); messages = messages.filter(m => message.createdTimestamp - m.createdTimestamp < config.readLimits.time ); messages.forEach(m => { Moderation.checkMessageNoReturn(m); }); const questionMessageId: string = message instanceof DiscordApi.Message ? message.id : ''; if (message instanceof DiscordApi.Message) { channelQueue.startTyping(); } else if (message.isRepliable()) { await message.deferReply(); } messages.sort((a, b) => { if (a.id === questionMessageId) return -1; if (b.id === questionMessageId) return 1; return b.createdTimestamp - a.createdTimestamp; }); OpenAImessages = toOpenAIMessages(messages.values()); let generatedMessage: ChatCompletionResponseMessage | undefined = undefined; let answer: Awaited>; do { answer = await openai.createChatCompletion({ ...config.chatCompletionParams, messages: OpenAImessages, // FIXME: don't use new instance of FunctionManager functions: new FunctionManager().getFunctions(), }); logUsedTokens(answer, message, ++functionRanCounter); generatedMessage = answer.data.choices[0].message; if (!generatedMessage) throw new Error("Empty message received"); // handle function calls if (generatedMessage.function_call) { OpenAImessages.push(generatedMessage); // FIXME: don't use new instance of FunctionManager OpenAImessages.push( new FunctionManager().handleFunction(generatedMessage.function_call) ); } } while (generatedMessage.function_call); channelQueue.stopTyping(); const answerContent = answer.data.choices[0].message?.content; if (answerContent === undefined || answerContent === "") { if (message instanceof DiscordApi.Message) message.react("😶").catch(() => {/* GRACEFAIL: It's okay if the bot won't reply */}); } else { const answerMessagesContent :string[] = [""]; for (const i of answerContent.split(/\n\n/)) { if (answerMessagesContent[answerMessagesContent.length-1].length + i.length < 2000) { answerMessagesContent[answerMessagesContent.length-1] += "\n\n" + i; } else { answerMessagesContent.push(i); } } for (const i of answerMessagesContent) { const response = requestReply(message, {content: i}, {allowedMentions: { repliedUser: false }}); await response.then(rval => Moderation.checkMessageNoReturn(rval)); } } } catch (e) { channelQueue.stopTyping(); console.error(`Error ocurred while handling chat completion request (${(e as object).constructor.name}):`); console.error(e); if (OpenAImessages.length !== 0) { console.error("Messages:"); console.error(OpenAImessages); } let errorText = "\n"; if (e instanceof Error) { errorText += e.message; } else errorText = ""; if (Axios.isAxiosError(e) && e.code?.match(/^5..$/) && channelQueue.tries < 3) { channelQueue.tries++; await new Promise(r => setTimeout(r, 2000)); // pause for 2 seconds before retrying return executeFromQueue(channel); } requestReply( message, { embeds: [{ color: 0xff0000, description: "Something bad happened! :frowning:" + errorText }], }, {allowedMentions: { repliedUser: false } }, { ephemeral: true }, ).catch (() => {/* GRACEFAIL: It's okay if we don't reply with an error */}); } channelQueue.shift(); if (channelQueue.length === 0) channelsRunning.delete(channel); else return executeFromQueue(channel); }