openai-telegram-bot/main.py

338 wiersze
14 KiB
Python

import os
import re
import openai
import logging
import asyncio
import math
from dotenv import load_dotenv
from pydub import AudioSegment
from telegram import Update
from functools import wraps
from telegram.constants import ChatAction
from functools import wraps
from telegram.error import BadRequest, RetryAfter, TimedOut
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, CallbackQueryHandler
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
load_dotenv()
if os.environ.get("OPENAI_API_KEY") is None:
print("OpenAI_API_KEY is not set in.env file or OPENAI_API_KEY environment variable is not set")
exit(1)
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
SYSTEM_PROMPT=os.environ.get("CHATGPT_SYSTEM_PROMPT")
TEMPERATURE=os.environ.get("CHATGPT_TEMPERATURE")
MODEL=os.environ.get("OPENAI_MODEL")
WHISPER_TO_CHAT=bool(int(os.environ.get("WHISPER_TO_CHAT")))
MAX_USER_CONTEXT=int(os.environ.get("CHATGPT_MAX_USER_CONTEXT"))
openai.api_key = os.environ.get("OPENAI_API_KEY")
users = {
"userid": {
"context": [],
"usage": {
"chatgpt": 0,
"whisper": 0,
"dalle": 0,
},
"options": {
"whisper-to-chat": WHISPER_TO_CHAT,
"temperature": 0.9,
"max-context": 5
}
},
}
def restricted(func):
@wraps(func)
async def wrapped(update, context, *args, **kwargs):
user_id = update.effective_user.id
if str(user_id) not in ALLOWED_USERS:
if "*" != ALLOWED_USERS[0]:
print(f"Unauthorized access denied for {user_id}.")
return
else:
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}, "options": {"whisper-to-chat": WHISPER_TO_CHAT, "temperature": float(TEMPERATURE), "max-context": MAX_USER_CONTEXT}}
return await func(update, context, *args, **kwargs)
return wrapped
async def messageGPT(text: str, chat_id: str):
# Initialize user if not present
if chat_id not in users:
users[chat_id] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}, "options": {"whisper-to-chat": WHISPER_TO_CHAT, "temperature": float(TEMPERATURE), "max-context": MAX_USER_CONTEXT}}
# Update context
user_context = users[chat_id]["context"]
user_context.append({"role": "user", "content": text})
if len(user_context) > users[chat_id]["options"]["max-context"]:
user_context.pop(0)
# Interact with ChatGPT API and stream the response
response = None
try:
response = openai.ChatCompletion.create(
model=MODEL,
messages=[{"role": "system", "content": SYSTEM_PROMPT}] + user_context,
temperature=users[chat_id]["options"]["temperature"],
)
except:
return "There was a problem with OpenAI, so I can't answer you."
# Initialize variables for streaming
assistant_message = ""
if 'choices' in response:
assistant_message = response['choices'][0]['message']['content']
else:
assistant_message = "There was a problem with OpenAI. Maybe your prompt is forbidden? They like to censor a lot!"
# Update context
user_context.append({"role": "assistant", "content": assistant_message})
if len(user_context) > users[chat_id]["options"]["max-context"]:
user_context.pop(0)
# Update usage
users[chat_id]["usage"]['chatgpt'] += int(response['usage']['total_tokens'])
return assistant_message
@restricted
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}, "options": {"whisper-to-chat": WHISPER_TO_CHAT, "temperature": float(TEMPERATURE), "max-context": MAX_USER_CONTEXT}}
await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!")
@restricted
async def imagine(update: Update, context: ContextTypes.DEFAULT_TYPE):
users[f"{update.effective_chat.id}"]["usage"]['dalle'] += 1
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=ChatAction.TYPING)
response = openai.Image.create(
prompt=update.message.text,
n=1,
size="1024x1024"
)
try:
image_url = response['data'][0]['url']
await context.bot.send_message(chat_id=update.effective_chat.id, text=image_url)
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Error generating. Your prompt may contain text that is not allowed by OpenAI safety system.")
@restricted
async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Initialize variables
chat_id = update.effective_chat.id
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
users[f"{chat_id}"]["usage"]['whisper'] = 0
transcript = {'text': ''}
audioMessage = False
# Check if the attachment is a voice message
if update.message.voice:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.voice.duration
file_id = update.message.voice.file_id
file_format = "ogg"
audioMessage = True
# Check if the attachment is a video
elif update.message.video:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.video.duration
file_id = update.message.video.file_id
file_format = "mp4"
# Check if the attachment is an audio file
elif update.message.audio:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.audio.duration
file_id = update.message.audio.file_id
file_format = "mp3"
else:
await context.bot.send_message(chat_id=chat_id, text="Can't handle such file. Reason: unknown.")
return
# Download the file and convert it if necessary
file = await context.bot.get_file(file_id)
user_id = update.effective_user.id
await file.download_to_drive(f"{user_id}.{file_format}")
if file_format == "ogg":
ogg_audio = AudioSegment.from_file(f"{user_id}.ogg", format="ogg")
ogg_audio.export(f"{user_id}.mp3", format="mp3")
os.remove(f"{user_id}.ogg")
file_format = "mp3"
# Transcribe the audio
with open(f"{user_id}.{file_format}", "rb") as audio_file:
try:
transcript = openai.Audio.transcribe("whisper-1", audio_file)
except:
await context.bot.send_message(chat_id=chat_id, text="Transcript failed.")
os.remove(f"{user_id}.{file_format}")
return
os.remove(f"{user_id}.{file_format}")
# Send the transcript
if transcript['text'] == "":
transcript['text'] = "[Silence]"
if audioMessage and users[f"{chat_id}"]["options"]["whisper-to-chat"]:
chatGPT_response = await messageGPT(transcript['text'], str(chat_id))
transcript['text'] = "> " + transcript['text'] + "\n\n" + chatGPT_response
# Check if the transcript length is longer than 4095 characters
if len(transcript['text']) > 4095:
# Split the transcript into multiple messages without breaking words in half
max_length = 4096
words = transcript['text'].split()
current_message = ""
for word in words:
if len(current_message) + len(word) + 1 > max_length:
await context.bot.send_message(chat_id=chat_id, text=current_message)
current_message = ""
current_message += f"{word} "
if current_message:
await context.bot.send_message(chat_id=chat_id, text=current_message)
else:
await context.bot.send_message(chat_id=chat_id, text=transcript['text'])
@restricted
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
# Initialize user if not present
if chat_id not in users:
users[chat_id] = {"context": [], "usage": {"chatgpt": 0, "whisper": 0, "dalle": 0}}
# Check if replying and add context
if hasattr(update.message.reply_to_message, "text"):
user_prompt = f"In reply to: '{update.message.reply_to_message.text}' \n---\n{update.message.text}"
else:
user_prompt = update.message.text
# Use messageGPT function to get the response
assistant_message = await messageGPT(user_prompt, chat_id)
await context.bot.send_message(chat_id=update.effective_chat.id, text=assistant_message)
@restricted
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"]["context"] = []
print(f"Cleared context for {update.effective_chat.id}")
await update.message.reply_text(f'Your message context history was cleared.')
@restricted
async def usage(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_info=users[f"{update.effective_chat.id}"]["usage"]
total_spent=0.0
total_spent+=(user_info['chatgpt']/750)*0.002
total_spent+=float(user_info['dalle'])*0.02
total_spent+=(user_info['whisper']/60.0)*0.006
info_message=f"""User: {update.effective_user.name}\n- Used ~{user_info["chatgpt"]} tokens with ChatGPT.\n- Generated {user_info["dalle"]} images with DALL-E.\n- Transcribed {round(float(user_info["whisper"])/60.0, 2)}min with Whisper.\n\nTotal spent: ${str(total_spent)}"""
await context.bot.send_message(chat_id=update.effective_chat.id, text=info_message)
@restricted
async def _help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
help_message="""Here's what you can do:\n\n
- /imagine <prompt> to generate an image with DALL-E\n- Send a message to chat with ChatGPT\n
- Send an audio to transcribe to text with Whisper.\n\n
- /settings To change your settings.\n
- /usage To get your usage statistics.\n
- /clear To clear you chatgpt message context (start a new chat)."""
await context.bot.send_message(chat_id=update.effective_chat.id, text=help_message)
# Function to generate the settings buttons
def generate_settings_markup(chat_id: str) -> InlineKeyboardMarkup:
keyboard = [
[
InlineKeyboardButton("Increase Temperature", callback_data=f"setting_increase_temperature_{chat_id}"),
InlineKeyboardButton("Decrease Temperature", callback_data=f"setting_decrease_temperature_{chat_id}")
],
[
InlineKeyboardButton("Enable Whisper to Chat", callback_data=f"setting_enable_whisper_{chat_id}"),
InlineKeyboardButton("Disable Whisper to Chat", callback_data=f"setting_disable_whisper_{chat_id}")
],
[
InlineKeyboardButton("Increase Context", callback_data=f"setting_increase_context_{chat_id}"),
InlineKeyboardButton("Decrease Context", callback_data=f"setting_decrease_context_{chat_id}")
]
]
return InlineKeyboardMarkup(keyboard)
@restricted
async def settings(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
settings_markup = generate_settings_markup(chat_id)
await context.bot.send_message(chat_id=chat_id, text="Settings:", reply_markup=settings_markup)
async def settings_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
action, chat_id = query.data.rsplit("_", 1)
if action.startswith("setting_increase_temperature"):
users[chat_id]["options"]["temperature"] = min(users[chat_id]["options"]["temperature"] + 0.1, 1)
elif action.startswith("setting_decrease_temperature"):
users[chat_id]["options"]["temperature"] = max(users[chat_id]["options"]["temperature"] - 0.1, 0)
elif action.startswith("setting_enable_whisper"):
print(f"enabling whisper for {chat_id}")
users[chat_id]["options"]["whisper-to-chat"] = True
elif action.startswith("setting_disable_whisper"):
print(f"disabling whisper for {chat_id}")
users[chat_id]["options"]["whisper-to-chat"] = False
elif action.startswith("setting_increase_context"):
users[chat_id]["options"]["max-context"] = min(users[chat_id]["options"]["max-context"] + 1, MAX_USER_CONTEXT)
elif action.startswith("setting_decrease_context"):
users[chat_id]["options"]["max-context"] = max(users[chat_id]["options"]["max-context"] - 1, 1)
settings_markup = generate_settings_markup(chat_id)
await query.edit_message_text(text="Choose a setting option:", reply_markup=settings_markup)
# Remove the settings message
await context.bot.delete_message(chat_id=query.message.chat_id, message_id=query.message.message_id)
# Send a message displaying the updated settings
settings_message = f"""Updated settings:\n\nTemperature: {users[chat_id]['options']['temperature']}\nWhisper to Chat: {users[chat_id]['options']['whisper-to-chat']}\nContext Length: {users[chat_id]["options"]["max-context"]}"""
await context.bot.send_message(chat_id=chat_id, text=settings_message)
if __name__ == '__main__':
try:
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
except:
ALLOWED_USERS=ALLOWED_USERS
print(f"Allowed users: {ALLOWED_USERS}")
print(f"System prompt: {SYSTEM_PROMPT}")
application = ApplicationBuilder().token(os.environ.get("BOT_TOKEN")).build()
start_handler = CommandHandler('start', start)
application.add_handler(start_handler)
clear_handler = CommandHandler('clear', clear)
application.add_handler(clear_handler)
info_handler = CommandHandler('usage', usage)
application.add_handler(info_handler)
help_handler = CommandHandler('help', _help)
application.add_handler(help_handler)
imagine_handler = CommandHandler('imagine', imagine)
application.add_handler(imagine_handler)
settings_handler = CommandHandler('settings', settings)
application.add_handler(settings_handler)
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
application.add_handler(MessageHandler(filters.ATTACHMENT & ~filters.COMMAND, attachment))
settings_callback_handler = CallbackQueryHandler(settings_callback)
application.add_handler(settings_callback_handler)
application.run_polling()