meshtastic-matrix-relay/matrix_utils.py

253 wiersze
8.3 KiB
Python

import asyncio
import time
import re
import certifi
import io
import ssl
from typing import List, Union
from nio import (
AsyncClient,
AsyncClientConfig,
MatrixRoom,
RoomMessageText,
RoomMessageNotice,
UploadResponse,
)
from config import relay_config
from log_utils import get_logger
from plugin_loader import load_plugins
from meshtastic_utils import connect_meshtastic
from PIL import Image
matrix_homeserver = relay_config["matrix"]["homeserver"]
matrix_rooms: List[dict] = relay_config["matrix_rooms"]
matrix_access_token = relay_config["matrix"]["access_token"]
bot_user_id = relay_config["matrix"]["bot_user_id"]
bot_user_name = None # Detected upon logon
bot_start_time = int(
time.time() * 1000
) # Timestamp when the bot starts, used to filter out old messages
logger = get_logger(name="Matrix")
matrix_client = None
def bot_command(command, payload):
return f"{bot_user_name}: !{command}" in payload
async def connect_matrix():
global matrix_client
global bot_user_name
if matrix_client:
return matrix_client
# Create SSL context using certifi's certificates
ssl_context = ssl.create_default_context(cafile=certifi.where())
# Initialize the Matrix client with custom SSL context
config = AsyncClientConfig(encryption_enabled=False)
matrix_client = AsyncClient(
matrix_homeserver, bot_user_id, config=config, ssl=ssl_context
)
matrix_client.access_token = matrix_access_token
response = await matrix_client.get_displayname(bot_user_id)
bot_user_name = response.displayname
return matrix_client
async def join_matrix_room(matrix_client, room_id_or_alias: str) -> None:
"""Join a Matrix room by its ID or alias."""
try:
if room_id_or_alias.startswith("#"):
response = await matrix_client.room_resolve_alias(room_id_or_alias)
if not response.room_id:
logger.error(
f"Failed to resolve room alias '{room_id_or_alias}': {response.message}"
)
return
room_id = response.room_id
# Update the room ID in the matrix_rooms list
for room_config in matrix_rooms:
if room_config["id"] == room_id_or_alias:
room_config["id"] = room_id
break
else:
room_id = room_id_or_alias
if room_id not in matrix_client.rooms:
response = await matrix_client.join(room_id)
if response and hasattr(response, "room_id"):
logger.info(f"Joined room '{room_id_or_alias}' successfully")
else:
logger.error(
f"Failed to join room '{room_id_or_alias}': {response.message}"
)
else:
logger.debug(f"Bot is already in room '{room_id_or_alias}'")
except Exception as e:
logger.error(f"Error joining room '{room_id_or_alias}': {e}")
# Send message to the Matrix room
async def matrix_relay(room_id, message, longname, shortname, meshnet_name):
matrix_client = await connect_matrix()
try:
content = {
"msgtype": "m.text",
"body": message,
"meshtastic_longname": longname,
"meshtastic_shortname": shortname,
"meshtastic_meshnet": meshnet_name,
}
await asyncio.wait_for(
matrix_client.room_send(
room_id=room_id,
message_type="m.room.message",
content=content,
),
timeout=0.5,
)
logger.info(f"Sent inbound radio message to matrix room: {room_id}")
except asyncio.TimeoutError:
logger.error(f"Timed out while waiting for Matrix response")
except Exception as e:
logger.error(f"Error sending radio message to matrix room {room_id}: {e}")
def truncate_message(
text, max_bytes=227
): # 227 is the maximum that we can run without an error so far. 228 throws an error.
"""
Truncate the given text to fit within the specified byte size.
:param text: The text to truncate.
:param max_bytes: The maximum allowed byte size for the truncated text.
:return: The truncated text.
"""
truncated_text = text.encode("utf-8")[:max_bytes].decode("utf-8", "ignore")
return truncated_text
# Callback for new messages in Matrix room
async def on_room_message(
room: MatrixRoom, event: Union[RoomMessageText, RoomMessageNotice]
) -> None:
full_display_name = "Unknown user"
message_timestamp = event.server_timestamp
# We do not relay the past
if message_timestamp < bot_start_time:
return
room_config = None
for config in matrix_rooms:
if config["id"] == room.room_id:
room_config = config
break
# Only relay supported rooms
if not room_config:
return
text = event.body.strip()
longname = event.source["content"].get("meshtastic_longname")
shortname = event.source["content"].get("meshtastic_shortname", None)
meshnet_name = event.source["content"].get("meshtastic_meshnet")
suppress = event.source["content"].get("mmrelay_suppress")
local_meshnet_name = relay_config["meshtastic"]["meshnet_name"]
# Do not process
if suppress:
return
if longname and meshnet_name:
full_display_name = f"{longname}/{meshnet_name}"
if meshnet_name != local_meshnet_name:
logger.info(f"Processing message from remote meshnet: {text}")
short_meshnet_name = meshnet_name[:4]
# If shortname is None, truncate the longname to 3 characters
if shortname is None:
shortname = longname[:3]
prefix = f"{shortname}/{short_meshnet_name}: "
text = re.sub(
rf"^\[{full_display_name}\]: ", "", text
) # Remove the original prefix from the text
text = truncate_message(text)
full_message = f"{prefix}{text}"
else:
# This is a message from a local user, it should be ignored no log is needed
return
else:
display_name_response = await matrix_client.get_displayname(event.sender)
full_display_name = display_name_response.displayname or event.sender
short_display_name = full_display_name[:5]
prefix = f"{short_display_name}[M]: "
logger.debug(f"Processing matrix message from [{full_display_name}]: {text}")
full_message = f"{prefix}{text}"
text = truncate_message(text)
truncated_message = f"{prefix}{text}"
# Plugin functionality
plugins = load_plugins()
meshtastic_interface = connect_meshtastic()
from meshtastic_utils import logger as meshtastic_logger
found_matching_plugin = False
for plugin in plugins:
if not found_matching_plugin:
found_matching_plugin = await plugin.handle_room_message(
room, event, full_message
)
if found_matching_plugin:
logger.debug(f"Processed by plugin {plugin.plugin_name}")
meshtastic_channel = room_config["meshtastic_channel"]
if not found_matching_plugin and event.sender != bot_user_id:
if relay_config["meshtastic"]["broadcast_enabled"]:
meshtastic_logger.info(
f"Relaying message from {full_display_name} to radio broadcast"
)
meshtastic_interface.sendText(
text=full_message, channelIndex=meshtastic_channel
)
else:
logger.debug(
f"Broadcast not supported: Message from {full_display_name} dropped."
)
async def upload_image(
client: AsyncClient, image: Image.Image, filename: str
) -> UploadResponse:
buffer = io.BytesIO()
image.save(buffer, format="PNG")
image_data = buffer.getvalue()
response, maybe_keys = await client.upload(
io.BytesIO(image_data),
content_type="image/png",
filename=filename,
filesize=len(image_data),
)
return response
async def send_room_image(
client: AsyncClient, room_id: str, upload_response: UploadResponse
):
response = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.image", "url": upload_response.content_uri, "body": ""},
)