openai-telegram-bot/main.py

168 wiersze
7.5 KiB
Python

import os
import re
import openai
import logging
from pydub import AudioSegment
from telegram import Update
from functools import wraps
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
openai.api_key = os.getenv("OPENAI_API_KEY")
users = {
"userid": {
"context": [],
"usage": {
"chatgpt": 0,
"whisper": 0,
"dalle": 0,
}
},
}
ALLOWED_USERS=[]
SYSTEM_PROMPT=os.getenv("CHATGPT_SYSTEM_PROMPT")
MAX_USER_CONTEXT=int(os.getenv("CHATGPT_MAX_USER_CONTEXT"))
def restricted(func):
@wraps(func)
async def wrapped(update, context, *args, **kwargs):
user_id = update.effective_user.id
if str(user_id) not in ALLOWED_USERS:
if "*" != ALLOWED_USERS[0]:
print(f"Unauthorized access denied for {user_id}.")
return
else:
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
return await func(update, context, *args, **kwargs)
return wrapped
@restricted
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!")
@restricted
async def imagine(update: Update, context: ContextTypes.DEFAULT_TYPE):
users[f"{update.effective_chat.id}"]["usage"]['dalle'] += 1
response = openai.Image.create(
prompt=update.message.text,
n=1,
size="1024x1024"
)
try:
image_url = response['data'][0]['url']
await context.bot.send_message(chat_id=update.effective_chat.id, text=image_url)
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Error generating. Your prompt may contain text that is not allowed by OpenAI safety system.")
@restricted
async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
print(update.message)
try:
users[f"{update.effective_chat.id}"]["usage"]['whisper'] += update.message.voice.duration
file = await context.bot.get_file(update.message.voice.file_id)
await file.download_to_drive(f"{update.effective_user.id}.ogg")
ogg_audio = AudioSegment.from_file(f"{update.effective_user.id}.ogg", format="ogg")
ogg_audio.export(f"{update.effective_user.id}.mp3", format="mp3")
os.remove(f"{update.effective_user.id}.ogg")
audio_file= open(f"{update.effective_user.id}.mp3", "rb")
try:
transcript = openai.Audio.transcribe("whisper-1", audio_file)
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Transcript failed.")
os.remove(f"{update.effective_user.id}.mp3")
if transcript['text'] == "":
transcript['text'] = "[Silence]"
await context.bot.send_message(chat_id=update.effective_chat.id, text=transcript['text'])
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Can't handle such file. Reason: unkown.")
@restricted
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
# Save context
if len(users[f"{update.effective_chat.id}"]["context"]) <= MAX_USER_CONTEXT:
users[f"{update.effective_chat.id}"]["context"].append({"role": "user", "content": f"{update.message.text}"})
else:
users[f"{update.effective_chat.id}"]["context"].pop(0)
users[f"{update.effective_chat.id}"]["context"].append({"role": "user", "content": f"{update.message.text}"})
# Interact with ChatGPT api
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": SYSTEM_PROMPT}]+users[f"{update.effective_chat.id}"]["context"]
)
# Save context
if len(users[f"{update.effective_chat.id}"]["context"]) <= MAX_USER_CONTEXT:
users[f"{update.effective_chat.id}"]["context"].append({"role": "assistant", "content": f"{response['choices'][0]['message']['content']}"})
else:
users[f"{update.effective_chat.id}"]["context"].pop(0)
users[f"{update.effective_chat.id}"]["context"].append({"role": "assistant", "content": f"{response['choices'][0]['message']['content']}"})
# Update usage
users[f"{update.effective_chat.id}"]["usage"]['chatgpt'] += len(str(users[f"{update.effective_chat.id}"]["context"]))
# Send reply
await context.bot.send_message(chat_id=update.effective_chat.id, text=response['choices'][0]['message']['content'])
@restricted
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"]["context"] = []
print(f"Cleared context for {update.effective_chat.id}")
await update.message.reply_text(f'Your message context history was cleared.')
@restricted
async def usage(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_info=users[f"{update.effective_chat.id}"]["usage"]
total_spent=0.0
total_spent+=(user_info['chatgpt']/750)*0.002
total_spent+=float(user_info['dalle'])*0.02
total_spent+=(user_info['whisper'])*0.006
info_message=f"""User: {update.effective_user.name}\n- Used {user_info["chatgpt"]} characters with ChatGPT.\n- Generated {user_info["dalle"]} images with DALL-E.\n- Transcribed {user_info["whisper"]}min with Whisper.\n\nTotal spent: ${str(total_spent)}"""
await context.bot.send_message(chat_id=update.effective_chat.id, text=info_message)
@restricted
async def _help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
help_message="""Here's what you can do:\n\n- /imagine <prompt> to generate an image with DALL-E\n- Send a message to chat with ChatGPT\n- Send an audio to transcribe to text with Whisper.\n\n- /usage To get your usage statistics.\n - /clear To clear you chatgpt message context (start a new chat)."""
await context.bot.send_message(chat_id=update.effective_chat.id, text=help_message)
if __name__ == '__main__':
try:
ALLOWED_USERS=os.getenv("BOT_ALLOWED_USERS").split(",")
except:
ALLOWED_USERS=ALLOWED_USERS
print(f"Allowed users: {ALLOWED_USERS}")
print(f"System prompt: {SYSTEM_PROMPT}")
application = ApplicationBuilder().token(os.getenv("BOT_TOKEN")).build()
start_handler = CommandHandler('start', start)
application.add_handler(start_handler),
clear_handler = CommandHandler('clear', clear)
application.add_handler(clear_handler),
info_handler = CommandHandler('usage', usage)
application.add_handler(info_handler),
help_handler = CommandHandler('help', _help)
application.add_handler(help_handler),
imagine_handler = CommandHandler('imagine', imagine)
application.add_handler(imagine_handler),
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
application.add_handler(MessageHandler(filters.ATTACHMENT & ~filters.COMMAND, attachment))
application.run_polling()