Piotr Lewandowski 2023-07-29 21:15:52 +02:00
commit b228b97ed1
21 zmienionych plików z 1281 dodań i 0 usunięć

1
.gitignore vendored 100644
Wyświetl plik

@ -0,0 +1 @@
build/

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

69
comm.py 100644
Wyświetl plik

@ -0,0 +1,69 @@
import serial
class SerialWrapper:
def __init__(self, serial_ref, timeout = 1000, timeout_idle_line = 100) -> None:
self.serial_ref = serial_ref
self.timeout = timeout
self.timeout_idle_line = timeout_idle_line
self.user_rx_callback = None
self.callback_slot_inited = False
def send(self, data):
self.serial_ref.flush()
self.serial_ref.write(data)
self.serial_ref.waitForBytesWritten(-1)
def receive(self, size = 0):
self.flush()
rx_data = bytearray()
while self.serial_ref.waitForReadyRead(self.timeout):
rx_data += self.serial_ref.readAll()
if(len(rx_data) >= size):
break
return rx_data
def receive_detect_idle(self):
rx_data = bytearray()
rx_to = self.timeout
self.serial_ref.flush()
while self.serial_ref.waitForReadyRead(500):
rx_data += self.serial_ref.readAll()
rx_to = self.timeout_idle_line
return rx_data
# while 1:
# rx_byte = self.serial_ref.readData(1)
# self.serial_ref.waitForReadyRead(rx_to)
# rx_byte = self.serial_ref.readData(1)
# if rx_byte:
# rx_data += rx_byte
# rx_to = self.timeout_idle_line
# else:
# print("no rx data")
# return rx_data
def send_async(self, data):
self.serial_ref.write(data)
def receive_async(self, size, callback):
self.serial_ref.readAll()
self.user_requested_read_size = size
self.user_data = bytearray()
self.user_rx_callback = callback
if not self.callback_slot_inited:
self.callback_slot_inited = True
self.serial_ref.readyRead.connect(self.onRxDone)
def onSendDone(self):
pass
def onRxDone(self):
self.user_data += self.serial_ref.readAll()
if(len(self.user_data) >= self.user_requested_read_size):
print(f"rx data: {self.user_data}")
self.user_rx_callback(self.user_data)
pass
def flush(self):
self.serial_ref.readAll()

228
confprot.py 100644
Wyświetl plik

@ -0,0 +1,228 @@
import struct
from typing import Dict, Any
class ECommand:
GET = 0
SET = 1
GET_RESP = 2
SET_ACK = 3
SET_NACK = 4
GET_ACK = 5
class ConfProtHeader:
SIZE = 3
ACCESS_SUB_SIZE = 4
def __init__(self) -> None:
self.fixed_id = 0xAF43
self.command = ECommand.GET
pass
def serialize(self):
return struct.pack("<HB",
self.fixed_id,
self.command)
def deserialize(self, raw_data) -> bool:
if not raw_data or len(raw_data) < 3:
return False
self.fixed_id, self.command = struct.unpack("<HB", raw_data[:3])
return True
class ConfProt:
def __init__(self, conf_objects, comm_interface) -> None:
self.conf_objects = conf_objects
self.comm_interface = comm_interface
self.idx = 0
self.sub_idx = 0
self.irq_lock = True
pass
def serialize_set(self, conf_obj, idx = 0) -> bytearray:
data = self.generate_header(ECommand.SET)
data += bytearray([conf_obj.databank_id,
conf_obj.obj_id,
idx,
conf_obj.len])
if hasattr(conf_obj, 'table_size'):
data.extend(conf_obj.serialize(idx))
else:
data.extend(conf_obj.serialize())
return data
def serialize_get(self, conf_obj, idx = 0) -> bytearray:
data = self.generate_header(ECommand.GET)
data += bytearray([conf_obj.databank_id,
conf_obj.obj_id,
idx,
conf_obj.len])
return data
def set_sync(self, conf_obj, idx = 0) -> bool:
tx_frame = self.serialize_set(conf_obj, idx)
self.comm_interface.send(tx_frame)
rx_frame = self.comm_interface.receive(ConfProtHeader.SIZE)
Header = ConfProtHeader()
error = Header.deserialize(rx_frame)
if error or Header.command != ECommand.SET_ACK:
return False
return True
def get_sync(self, conf_obj, idx = 0) -> bool:
tx_frame = self.serialize_get(conf_obj, idx)
self.comm_interface.send(tx_frame)
rx_frame = self.comm_interface.receive(conf_obj.len + ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE)
print(rx_frame)
Header = ConfProtHeader()
status = Header.deserialize(rx_frame)
print(f"hader data {Header.command} {Header.fixed_id} {status} {status == False} {Header.command != ECommand.GET_RESP}")
if status == False or Header.command != ECommand.GET_RESP:
print("response header not valid")
return False
if idx != 0:
return conf_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:], idx)
return conf_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:])
def set_all_sync(self, status_callback = None):
processing_idx = 0
for obj in self.conf_objects:
if hasattr(obj, 'table_size'):
for i in range(obj.table_size):
processing_idx += self.set_sync(obj, i)
else:
processing_idx += self.set_sync(obj)
if status_callback != None:
status_callback(100*processing_idx/len(self.conf_objects))
def get_all_sync(self, status_callback = None):
processing_idx = 0
for obj in self.conf_objects:
if hasattr(obj, 'table_size'):
for i in range(obj.table_size):
processing_idx += self.get_sync(obj, i)
else:
processing_idx += self.get_sync(obj)
if status_callback != None:
status_callback(100*processing_idx/len(self.conf_objects))
def set_all_async(self, status_callback):
if self.idx > 0:
return False
self.status_callback = status_callback
self.idx = 0
self.sub_idx = 0
self.send_next_set_async()
def send_next_set_async(self):
if self.idx >= len(self.conf_objects):
print("send cmd finished")
self.idx = 0
return False
obj = self.conf_objects[self.idx]
if hasattr(obj, 'table_size'):
if self.sub_idx < obj.table_size:
tx_frame = self.serialize_set(self.conf_objects[self.idx], self.sub_idx)
print(f"send cmd {self.idx} {self.sub_idx}")
self.sub_idx += 1
else: # sub idx finished
self.sub_idx = 0
self.idx += 1
return self.send_next_set_async()
else:
tx_frame = self.serialize_set(self.conf_objects[self.idx])
print(f"send cmd {self.idx} {self.sub_idx}")
self.idx += 1
self.comm_interface.flush()
self.comm_interface.send_async(tx_frame)
self.comm_interface.receive_async(ConfProtHeader.SIZE, self.rx_done_cb_set)
return True
def rx_done_cb_set(self, rx_frame):
if rx_frame == None:
self.status_callback(-1)
Header = ConfProtHeader()
error = Header.deserialize(rx_frame)
if error == False or Header.command != ECommand.SET_ACK:
return self.status_callback(-1)
if self.send_next_set_async():
self.status_callback(100 * self.idx/len(self.conf_objects))
else:
self.status_callback(100)
def get_all_async(self, status_callback):
if self.idx > 0:
return False
self.status_callback = status_callback
self.idx = 0
self.sub_idx = 0
self.send_next_get()
def rx_done_cb_get(self, rx_frame):
if len(rx_frame) != self.current_obj.len + ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:
print("rx wrong len")
self.status_callback(-1)
self.idx = 0
self.sub_idx = 0
return
Header = ConfProtHeader()
HeaderDesResult = Header.deserialize(rx_frame)
if not HeaderDesResult or Header.command != ECommand.GET_RESP:
print("rx wrong header")
self.status_callback(-1)
return
if self.sub_idx > 0:
res = self.current_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:], self.sub_idx - 1)
else:
res = self.current_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:])
if not res:
print(f"deserialize err {self.idx - 1}, {self.sub_idx - 1}")
if not self.send_next_get():
self.status_callback(100)
else:
self.status_callback(100 * self.idx / len(self.conf_objects))
def send_next_get(self):
if(self.idx >= len(self.conf_objects)):
self.idx = 0
return False
self.current_obj = self.conf_objects[self.idx]
if hasattr(self.current_obj, 'table_size'):
if self.sub_idx < self.current_obj.table_size:
tx_frame = self.serialize_get(self.conf_objects[self.idx], self.sub_idx)
print(f"get cmd {self.idx} {self.sub_idx}")
self.sub_idx += 1
else: # sub idx finished
self.sub_idx = 0
self.idx += 1
return self.send_next_get()
else:
tx_frame = self.serialize_get(self.conf_objects[self.idx])
print(f"send cmd {self.idx} {self.sub_idx}")
self.idx += 1
self.comm_interface.flush()
self.comm_interface.send_async(tx_frame)
self.comm_interface.receive_async(self.current_obj.len + ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE, self.rx_done_cb_get)
return True
def generate_header(self, cmd):
return bytearray([0x43, 0xAF, cmd])
if __name__ == "__main__":
import objects
import serial
ser = serial.Serial('COM24', 115200, timeout=1)
aprs_conf = []
aprs_conf.append(objects.ObjAprsConfig(0, 1))
prot = ConfProt(aprs_conf, None)
tx_frame = prot.serialize_get(aprs_conf[0])
ser.write(tx_frame)
rx_frame = ser.read(100)
print(rx_frame)

BIN
docs/freq_conf.png 100644

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 76 KiB

BIN
docs/tracker.png 100644

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 549 KiB

73
flashaccess.py 100644
Wyświetl plik

@ -0,0 +1,73 @@
import struct
class FlashAccess:
frame_id = 0xFFAABBCC
erase_sequence = 0xAABBCCDD
ack_byte = 0xAA
ack_len = 1
mtu = 128
img_block_size = 3 * 0x10000
def __init__(self, comm_interface):
self.comm_interface = comm_interface
def write_to_flash(self, address, data, progress_callback):
self.start_address = address
self.data = data
self.progress_callback = progress_callback
self.idx = 0
self.erase_idx = 0
if not len(data):
return self.progress_callback(-1)
tx_frame = self.encode_next_erase_frame()
if not len(tx_frame):
return self.progress_callback(-1)
self.comm_interface.send_async(tx_frame)
self.comm_interface.receive_async(FlashAccess.ack_len, self.receive_handler)
def encode_next_erase_frame(self):
erase_offset = self.erase_idx * FlashAccess.img_block_size
if erase_offset >= len(self.data):
return bytearray()
erase_frame = struct.pack('<IIHB',
FlashAccess.erase_sequence,
self.start_address + erase_offset,
1,1)
self.erase_idx += 1
return erase_frame
def encode_next_frame(self):
if self.idx >= len(self.data):
return bytearray()
data_size = min(FlashAccess.mtu, len(self.data) - self.idx)
tx_frame = self.encode_header(self.start_address + self.idx,
data_size)
tx_frame += self.data[self.idx:self.idx + data_size]
self.idx += data_size
return tx_frame
def receive_handler(self, rx_data):
if len(rx_data) != 1 or rx_data[0] != FlashAccess.ack_byte:
return self.progress_callback(-1)
erase_frame = self.encode_next_erase_frame()
if len(erase_frame):
next_frame = erase_frame
else:
next_frame = self.encode_next_frame()
if(not len(next_frame)):
return self.progress_callback(100)
self.comm_interface.send_async(next_frame)
self.comm_interface.receive_async(1, self.receive_handler)
def encode_header(self, address, len):
return struct.pack('<IIHB',
FlashAccess.frame_id,
address,
len,
1)

55
fwupd.py 100644
Wyświetl plik

@ -0,0 +1,55 @@
import struct, os
class OtaFrameType:
init = 0
data = 1
reflash = 2
ack = 3
reflash_other_device = 4
class FirmwareUploader:
ack_frame_len = 7
packet_fw_chunk_size = 128
def __init__(self, comm_interface):
self.comm_interface = comm_interface
def upload_firmware(self, filename, status_callback):
self.status_callback = status_callback
self.file = open(filename)
self.file.seek(0, os.SEEK_END)
fw_size = self.file.tell()
self.file.seek(0)
init_frame = self.get_header(OtaFrameType.init)
secret_code = struct.unpack('<Q', self.file.read(8))
init_frame += struct.pack('<QHH', secret_code,
0, fw_size)
self.comm_interface.send(init_frame)
self.comm_interface.receive(7, self.receive_cb)
def receive_cb(self, rx_data):
if len(rx_data) != self.ack_frame_len:
return self.status_callback(-1)
_, _, _, _, ack = struct.unpack('<BHHBB')
if not ack:
return self.status_callback(-1)
fw_offset = self.file.tell() - 8
fw_part = self.file.read(self.packet_data_size)
if not len(fw_part):
return self.status_callback(100)
if len(fw_part) < self.packet_fw_chunk_size:
fw_part += [0xFF] * self.packet_fw_chunk_size - len(fw_part)
packet = self.get_header(OtaFrameType.data)
packet += struct.pack('<H', fw_offset)
packet += fw_part
self.comm_interface.send(packet)
self.comm_interface.receive(7, self.receive_cb)
def get_header(self, frame_type):
return struct.pack('<BHHB',
0xA3,
0x00,
0xABCE,
frame_type)

46
imgupload.py 100644
Wyświetl plik

@ -0,0 +1,46 @@
from PIL import Image
from flashaccess import FlashAccess
class ImgUpload:
width = 320
img_base_address = 0x40000
img_offset = 3 * 0x10000
def __init__(self, comm_interface):
self.flash_access = FlashAccess(comm_interface)
def upload_images(self, filenames, status_callback):
if self.image_idx or not len(filenames):
return False
self.filenames = filenames
self.status_callback = status_callback
self.write_address = ImgUpload.img_base_address
self.write_done_cb()
def write_done_cb(self, progress):
img_serialized = self.prepare_next_image()
if not len(img_serialized) or progress == -1:
return self.status_callback(100)
self.flash_access.write_to_flash(self.write_address,
img_serialized,
self.write_done_cb)
self.write_address += ImgUpload.img_offset
def prepare_next_image(self):
if not len(self.filenames):
return bytearray()
img = Image.open(self.filenames.pop(0))
img = img.convert('RGB')
if img.size[0] != self.width:
print("image not cropped to 320px in width")
pixels = img.load()
tx_buff = bytearray()
pixel_cnt = img.size[0] * img.size[1]
for i in range(pixel_cnt):
x = i % img.size[0]
y = i / img.size[0]
for color in range(3):
tx_buff.append(pixels[x, y][color])
return tx_buff

39
main.py 100644
Wyświetl plik

@ -0,0 +1,39 @@
from comm import SerialWrapper
from confprot import ConfProt
import objects
from stefan_conf import SerialConfigurator
from fwupd import FirmwareUploader
from imgupload import ImgUpload
from PyQt5.QtWidgets import QApplication
from PyQt5.QtSerialPort import QSerialPort
import sys
import serial
v2_objects = {
"aprs_config" : objects.ObjAprsConfig(0, 1),
"freq_table_config" : objects.ObjTable(objects.ObjFreq(1, 0), 11),
"geofencing_config" : objects.ObjTable(objects.ObjGeoConf(1, 1), 11),
"sstv_config" : objects.ObjSstvConfig(1, 2)
}
if __name__ == "__main__":
ser = QSerialPort()
comm_interface = SerialWrapper(ser)
config_protocol = ConfProt(list(v2_objects.values()), comm_interface)
fw_uploader = FirmwareUploader(comm_interface)
img_uploader = ImgUpload(comm_interface)
v2_objects["aprs_config"].callsign = "SQ9P"
v2_objects["aprs_config"].callsign_designator = 7
v2_objects["aprs_config"].digi_path = "WIDE1-1"
v2_objects["aprs_config"].destination = "TEST"
v2_objects["aprs_config"].destination_designator = 1
v2_objects["aprs_config"].symbol = 1
# err = config_protocol.set_sync(v2_objects["aprs_config"])
app = QApplication(sys.argv)
widget = SerialConfigurator(config_protocol, v2_objects, ser, fw_uploader, img_uploader)
widget.show()
sys.exit(app.exec_())

158
objects.py 100644
Wyświetl plik

@ -0,0 +1,158 @@
import struct
class ObjBase:
def __init__(self, db_id, obj_id):
self.databank_id = db_id
self.obj_id = obj_id
self.len = 63
class ObjAprsConfig(ObjBase):
def __init__(self, db_id, obj_id):
ObjBase.__init__(self, db_id, obj_id)
self.len = 63
self.callsign = None
self.destination = None
self.digi_path = None
self.callsign_designator = None
self.destination_designator = None
self.symbol = None
def serialize(self):
return struct.pack('15s15s30sbbB',
self.callsign.encode('utf-8'),
self.destination.encode('utf-8'),
self.digi_path.encode('utf-8'),
self.callsign_designator,
self.destination_designator,
self.symbol)
def deserialize(self, raw_data) -> bool:
if len(raw_data) != self.len:
print(f"deserialize err rx size {len(raw_data)} but required {self.len}")
return False
(
self.callsign,
self.destination,
self.digi_path,
self.callsign_designator,
self.destination_designator,
self.symbol
) = struct.unpack('15s15s30sbbB', raw_data)
self.callsign = self.callsign.decode().replace('\x00', '')
self.destination = self.destination.decode().replace('\x00', '')
self.digi_path = self.digi_path.decode().replace('\x00', '')
print("deserialize res:")
print(self.callsign)
print(self.symbol)
print(self.destination_designator)
return True
class ObjFreq(ObjBase):
def __init__(self, db_id, obj_id):
ObjBase.__init__(self, db_id, obj_id)
self.len = 7
self.frequency = 3
self.radio_mode = 0
self.tx_power = 0
self.telemetry_psc = 0
self.position_psc = 0
def serialize(self):
return struct.pack('<IBBB',
self.frequency,
self.radio_mode & 0xFF,
self.tx_power & 0xFF,
int(self.telemetry_psc) << 4 | int(self.position_psc) & 0xFF)
def deserialize(self, raw_data) -> bool:
if len(raw_data) != self.len:
print(f"deserialize err rx size {len(raw_data)} but required {self.len}")
return False
(
self.frequency,
self.radio_mode,
self.tx_power,
psc_bath
) = struct.unpack('<IBBB', raw_data)
self.telemetry_psc = (int(psc_bath) >> 4) & 0xFF
self.position_psc = int(psc_bath) & 0xFF
return True
class ObjTable(ObjBase):
def __init__(self, init_instance, table_size):
ObjBase.__init__(self, init_instance.databank_id, init_instance.obj_id)
self.len = init_instance.len
self.table_size = table_size
init_instance_type = type(init_instance)
self.objects = [init_instance_type(init_instance.databank_id, init_instance.obj_id) for _ in range(table_size)]
def serialize(self, idx):
return self.objects[idx].serialize()
def deserialize(self, data, idx = 0):
return self.objects[idx].deserialize(data)
class TPoint:
def __init__(self, x, y):
self.x = x
self.y = y
class ObjGeoConf(ObjBase):
def __init__(self, db_id, obj_id):
ObjBase.__init__(self, db_id, obj_id)
self.len = 22
self.points = [TPoint(0, 0)] * 5
self.b_inside = False
def serialize(self):
points_data = []
for i in range(len(self.points)):
points_data.append(self.points[i].x)
points_data.append(self.points[i].y)
return struct.pack('<10hH',
*points_data,
self.b_inside)
def deserialize(self, raw_data) -> bool:
if len(raw_data) != self.len:
print(f"deserialize err rx size {len(raw_data)} but required {self.len}")
return False
raw_data = struct.unpack('<10hH', raw_data)
points_data = raw_data[:-1]
self.b_inside = raw_data[-1]
print(f"len of points_data {len(points_data)}")
for i in range(0, len(points_data), 2):
self.points[int(i/2)] = TPoint(points_data[i], points_data[i+1])
return True
class ObjSstvConfig(ObjBase):
def __init__(self, db_id, obj_id):
ObjBase.__init__(self, db_id, obj_id)
self.len = 53
self.sstv_text_field = str('\0') * self.len
self.images_cnt = 0
self.images_per_freq = 0
self.b_header_enabled = True
def serialize(self):
return struct.pack('<50sBBB', self.sstv_text_field,
self.images_cnt,
self.images_per_freq,
self.b_header_enabled)
def deserialize(self, raw_data):
if len(raw_data) != self.len:
return False
(self.sstv_text_field,
self.images_cnt,
self.images_per_freq,
self.b_header_enabled) = struct.unpack('<50sBBB')
return True

13
readme.md 100644
Wyświetl plik

@ -0,0 +1,13 @@
# tracker configurator
## tracker setup
![tracker](./docs/tracker.png)
* connect usb uart adapter to GND, TX, TX pins
* if battery isnt soldered connect power to 3.3V, be sure to not exceed 3.6V
* enter configuration mode by shortly pressing button 5 times
* after entering configuration mode led will blink 3 times in 5 s intervals
## connecting configurator
![configurator](./docs/freq_conf.png)
* select COM port of usb adapter and press connect
* press GET to read configuration from tracker

5
requirements.txt 100644
Wyświetl plik

@ -0,0 +1,5 @@
Pillow==9.5.0
Pillow==10.0.0
PyQt5==5.15.9
PyQt5_sip==12.12.1
pyserial==3.5

154
setter.py 100644
Wyświetl plik

@ -0,0 +1,154 @@
import sys
import time
import struct
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QComboBox
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QColor, QPalette
import serial
import serial.tools.list_ports
class SerialApp(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
self.serial_port = serial.Serial()
# Wybór portu szeregowego
port_layout = QHBoxLayout()
port_layout.addWidget(QLabel('Port szeregowy:'))
self.port_select = QComboBox()
ports = serial.tools.list_ports.comports()
for port in ports:
self.port_select.addItem(port.device)
port_layout.addWidget(self.port_select)
layout.addLayout(port_layout)
# Ustawienia numeru banku
bank_layout = QHBoxLayout()
bank_layout.addWidget(QLabel('Numer banku konfiguracyjnego:'))
self.bank_input = QLineEdit()
bank_layout.addWidget(self.bank_input)
layout.addLayout(bank_layout)
# Ustawienia numeru obiektu
object_layout = QHBoxLayout()
object_layout.addWidget(QLabel('Numer obiektu konfiguracyjnego:'))
self.object_input = QLineEdit()
object_layout.addWidget(self.object_input)
layout.addLayout(object_layout)
# Wybór akcji
action_layout = QHBoxLayout()
action_layout.addWidget(QLabel('Akcja:'))
self.action_select = QComboBox()
self.action_select.addItem('GET', 0)
self.action_select.addItem('SET', 1)
action_layout.addWidget(self.action_select)
layout.addLayout(action_layout)
# Pole danych
data_layout = QHBoxLayout()
data_layout.addWidget(QLabel('Dane:'))
self.data_input = QLineEdit()
data_layout.addWidget(self.data_input)
layout.addLayout(data_layout)
# Przyciski
button_layout = QHBoxLayout()
self.send_button = QPushButton('Wyślij')
self.send_button.clicked.connect(self.send_data)
button_layout.addWidget(self.send_button)
layout.addLayout(button_layout)
self.setLayout(layout)
def send_data(self):
bank = int(self.bank_input.text())
obj = int(self.object_input.text())
action = self.action_select.currentData()
if action == 0: # GET
self.send_get(bank, obj)
elif action == 1: # SET
data = self.data_input.text()
self.send_set(bank, obj, data)
def open_serial_port(self):
if not self.serial_port.is_open:
self.serial_port.port = self.port_select.currentText()
self.serial_port.baudrate = 9600
self.serial_port.timeout = 1 # Ustal timeout na 1 sekundę
self.serial_port.open()
def send_get(self, bank, obj):
self.open_serial_port()
header = 0xABCD
action = 0
frame = struct.pack('>HBB', header, action, bank) + struct.pack('B', obj)
self.serial_port.write(frame)
response = self.wait_for_get_response()
if response is not None:
self.data_input.setText(response)
def send_set(self, bank, obj, data):
self.open_serial_port()
header = 0xABCD
action = 1
data_length = len(data)
frame = struct.pack('>HBBB', header, action, bank, obj) + struct.pack('B', data_length) + data.encode()
self.serial_port.write(frame)
ack_received = self.wait_for_ack()
if ack_received:
self.set_data_input_color("green")
print("ACK received")
else:
self.set_data_input_color("red")
print("ACK not received")
def set_data_input_color(self, color):
palette = self.data_input.palette()
palette.setColor(QPalette.Text, QColor(color))
self.data_input.setPalette(palette)
def wait_for_ack(self):
start_time = time.time()
timeout = 5 # Oczekuj na odpowiedź przez 5 sekund
while time.time() - start_time < timeout:
if self.serial_port.in_waiting > 0:
response = self.serial_port.read(5)
header, action, _, _ = struct.unpack('>HBBB', response)
if header == 0xABCD and action == 3:
return True
return False
def wait_for_get_response(self):
start_time = time.time()
timeout = 5 # Oczekuj na odpowiedź przez 5 sekund
while time.time() - start_time < timeout:
if self.serial_port.in_waiting > 0:
response = self.serial_port.read(6)
header, action, _, _, data_length = struct.unpack('>HBBBB', response)
if header == 0xABCD and action == 4:
data = self.serial_port.read(data_length)
return data.decode()
return None
if __name__ == '__main__':
app = QApplication(sys.argv)
ex = SerialApp()
ex.show()
sys.exit(app.exec_())

440
stefan_conf.py 100644
Wyświetl plik

@ -0,0 +1,440 @@
import sys
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QComboBox, QLabel, QLineEdit, QGroupBox, QFormLayout, QProgressBar, QTabWidget, QCheckBox, QFileDialog, QListWidget, QListWidgetItem
from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo
from PyQt5.QtCore import pyqtSlot, QStringListModel
from PyQt5.QtGui import QIcon
from objects import TPoint
class SerialConfigurator(QWidget):
def __init__(self, confprot, config_objects, serialPort, fw_uploader, img_uploader):
self.confprot = confprot
self.config = config_objects
self.serialPort = serialPort
self.fw_uploader = fw_uploader
self.img_uploader = img_uploader
self.fw_file = None
super().__init__()
self.initUI()
self.init_layout_lists()
def show(self):
super().show()
self.resize(500, self.size().height())
self.setFixedSize(self.size())
def init_layout_lists(self):
self.aprsSymbolMap = {
"BALLOON" : 6,
"CAR" : 3,
"ROCKET" : 5,
"WX_STATION" : 1,
"DIGI" : 0,
}
self.aprsSymbolMapInverse = {v: k for k, v in self.aprsSymbolMap.items()}
self.geoConfigModeMap = {
'TX INSIDE' : 0,
'TX OUTSIDE' : 1
}
self.geoConfigModeMapInverse = {v: k for k, v in self.geoConfigModeMap.items()}
# common setters
self.str_to_qlabel = lambda x, y: y.setText(str(x))
self.str_to_qcombobox = lambda x, y: y.setCurrentText(str(x))
cord_lines_to_objs = lambda x : [TPoint(int(float(x[i].text()) * 100), int(float(x[i+1].text()) * 100)) for i in range(0, len(x), 2)]
def objs_to_cord_lines(x, y):
for i in range(0, len(y), 2):
y[i].setText(str(float(x[int(i/2)].x)/100))
y[i+1].setText(str(float(x[int(i/2)].y)/100))
# stefan obj(idx, attribute) | qtObj | setter | getter
self.config_map = [
[('aprs_config', "callsign"), self.aprsCallsign, lambda x: x.text(), self.str_to_qlabel ],
[('aprs_config', "destination"), None, None, None ],
[('aprs_config', "digi_path"), self.aprsDigiPath, lambda x: x.text(), self.str_to_qlabel ],
[('aprs_config', "callsign_designator"), self.aprsDesignator, lambda x: int(x.currentText()), self.str_to_qcombobox ],
[('aprs_config', "destination_designator"), None, None, None ],
[('aprs_config', "symbol"), self.aprsSymbol, lambda x: int(self.aprsSymbolMap[x.currentText()]), lambda x,y : self.str_to_qcombobox(self.aprsSymbolMapInverse[x], y) ],
[('freq_table_config', "frequency"), self.frequencyLines, lambda x: int(float(x.text())*1000), lambda x,y : self.str_to_qlabel(float(x)/1000, y) ],
[('freq_table_config', "telemetry_psc"), self.telemetryPrescalers, lambda x: int(x.currentText()), self.str_to_qcombobox ],
[('freq_table_config', "position_psc"), self.positionPrescalers, lambda x: int(x.currentText()), self.str_to_qcombobox ],
[('freq_table_config', "radio_mode"), self.modeBoxes, lambda x: int(self.modes.index(x.currentText())), lambda x, y : y.setCurrentText(self.modes[x]) ],
[('geofencing_config', "points"), self.geoConfigCordLines, cord_lines_to_objs, objs_to_cord_lines ],
[('geofencing_config', "b_inside"), self.geoConfigModeBoxes, lambda x: int(self.geoConfigModeMap[x.currentText()]), lambda x,y : self.str_to_qcombobox(self.geoConfigModeMapInverse[x], y) ],
[('sstv_config', "sstv_text_field"), self.sstvTextHeader, lambda x: x.text(), self.str_to_qlabel ],
[('sstv_config', "images_cnt"), self.sstvImageCnt, lambda x: int(x.currentText()), self.str_to_qcombobox ],
[('sstv_config', "images_per_freq"), self.sstvImageCntPerFreq, lambda x: int(x.currentText()), self.str_to_qcombobox ],
[('sstv_config', "b_header_enabled"), self.sstvBHeaderEnabled, lambda x: int(x.isChecked()), lambda x,y : y.setChecked( x > 0 ) ],
]
@pyqtSlot()
def connect(self):
portName = self.comboBox.currentText()
# self.serialPort = QSerialPort()
self.serialPort.setPortName(portName)
self.serialPort.setBaudRate(115200)
if self.serialPort.open(QSerialPort.ReadWrite, ):
self.connectionLabel.setText("Connected to " + portName)
else:
self.connectionLabel.setText("Failed to connect to " + portName)
def set_command(self):
for key, obj_qt, setter, getter in self.config_map:
if obj_qt == None:
continue
dict_idx, attr = key
obj = self.config[dict_idx]
if not hasattr(obj, 'table_size'):
setattr(obj, attr, setter(obj_qt))
else:
for i in range(min(obj.table_size, len(obj_qt))):
setattr(obj.objects[i], attr, setter(obj_qt[i]))
self.confprot.set_all_sync(self.update_progressbar)
def set_command_async(self):
print("set command requested")
for key, obj_qt, setter, getter in self.config_map:
if obj_qt == None:
continue
dict_idx, attr = key
obj = self.config[dict_idx]
if not hasattr(obj, 'table_size'):
setattr(obj, attr, setter(obj_qt))
else:
for i in range(min(obj.table_size, len(obj_qt))):
setattr(obj.objects[i], attr, setter(obj_qt[i]))
self.disable_buttons(True)
self.confprot.set_all_async(self.progress_callback)
def get_command_async(self):
self.disable_buttons(True)
self.confprot.get_all_async(self.progress_callback_get)
def progress_callback_get(self, progress):
self.progress_callback(progress)
if progress != 100:
return
#update qt objects
for key, obj_qt, setter, getter in self.config_map:
if obj_qt == None:
continue
dict_idx, attr = key
obj = self.config[dict_idx]
if not hasattr(obj, 'table_size'):
getter(getattr(obj, attr), obj_qt)
else:
for i in range(min(obj.table_size, len(obj_qt))):
getter(getattr(obj.objects[i], attr), obj_qt[i])
def progress_callback(self, progress):
if progress < 0 or progress == 100:
self.disable_buttons(False)
if progress >= 0:
self.update_progressbar(progress)
def disable_buttons(self, disabled):
self.getCommandButton.setDisabled(disabled)
self.setCommandButton.setDisabled(disabled)
self.fwUpdateUploadButton.setDisabled(disabled)
self.sstvImagesUploadButton.setDisabled(disabled)
def get_command(self):
self.confprot.get_all_sync(self.update_progressbar)
for key, obj_qt, setter, getter in self.config_map:
if obj_qt == None:
continue
dict_idx, attr = key
obj = self.config[dict_idx]
if not hasattr(obj, 'table_size'):
getter(getattr(obj, attr), obj_qt)
else:
for i in range(min(obj.table_size, len(obj_qt))):
getter(getattr(obj.objects[i], attr), obj_qt[i])
def update_progressbar(self, percent):
print(percent)
self.progressBar.setValue(int(percent))
def initUI(self):
self.setWindowTitle('SQ9P Tracker Configurator')
self.layout = QVBoxLayout()
self.setLayout(self.layout)
self.connectionSettings = QGroupBox('Connection Settings')
self.connectionSettingsLayout = QVBoxLayout()
self.connectionSettings.setLayout(self.connectionSettingsLayout)
self.uartConfigRowLayout = QHBoxLayout()
self.connectionSettingsLayout.addLayout(self.uartConfigRowLayout)
self.comboBox = QComboBox(self)
self.serialPorts = QSerialPortInfo.availablePorts()
for port in self.serialPorts:
self.comboBox.addItem(port.portName())
self.connectionLabel = QLabel(self)
self.connectionLabel.setText("Disconnected")
self.connectButton = QPushButton('Connect', self)
self.connectButton.clicked.connect(self.connect)
self.uartConfigRowLayout.addWidget(self.comboBox)
self.uartConfigRowLayout.addWidget(self.connectionLabel)
self.uartConfigRowLayout.addWidget(self.connectButton)
self.uartCommandRowLayout = QHBoxLayout()
self.connectionSettingsLayout.addLayout(self.uartCommandRowLayout)
self.getCommandButton = QPushButton('Get', self)
self.getCommandButton.clicked.connect(self.get_command_async)
self.setCommandButton = QPushButton('Set', self)
self.setCommandButton.clicked.connect(self.set_command_async)
self.uartCommandRowLayout.addWidget(self.getCommandButton)
self.uartCommandRowLayout.addWidget(self.setCommandButton)
self.progresBarLayout = QHBoxLayout()
self.connectionSettingsLayout.addLayout(self.progresBarLayout)
self.progressBar = QProgressBar(self)
self.progresBarLayout.addWidget(self.progressBar)
self.progressBar.setValue(0)
self.layout.addWidget(self.connectionSettings)
self.tabs = QTabWidget()
self.init_frequency_layout()
self.init_geoconig_layout()
self.init_sstv_layout()
self.init_update_layout()
self.multiConfig = QGroupBox('Multi Configuration')
self.multiConfigLayout = QVBoxLayout()
self.multiConfig.setLayout(self.multiConfigLayout)
self.multiConfigLayout.addWidget(self.tabs)
self.layout.addWidget(self.multiConfig)
#self.layout.addWidget(self.tabs)
# self.layout.addWidget(self.aprsSettings)
def init_frequency_layout(self):
self.frequencyConfigTab = QWidget()
self.frequencyConfigTabLayout = QHBoxLayout()
self.frequencyConfigTab.setLayout(self.frequencyConfigTabLayout)
self.frequencyConfigGroupBox = QGroupBox('Frequency Settings')
self.frequencyConfigLayout = QVBoxLayout()
self.frequencyConfigGroupBox.setLayout(self.frequencyConfigLayout)
# self.frequencyConfigTab.setLayout(self.frequencyConfigLayout)
self.frequencyConfigTabLayout.addWidget(self.frequencyConfigGroupBox)
self.tabs.addTab(self.frequencyConfigTab, "Frequency")
self.frequencyLayouts = []
self.frequencyLines = []
self.modeBoxes = []
self.positionPrescalers = []
self.telemetryPrescalers = []
self.modes = ['APRS AFSK 1200', 'APRS AFSK 9600', 'APRS AFSK CUSTOM', 'APRS_LORA 300', 'APRS LORA 1200', 'APRS LORA CUSTOM', '4FSK HORUS V2', 'FM SSTV', 'FM AUDIO']
prescalers = [str(i) for i in range(0, 16)]
self.columnLabelsLayout = QHBoxLayout()
self.frequencyConfigLayout.addLayout(self.columnLabelsLayout)
for i in range(11):
layout = QHBoxLayout()
self.frequencyLayouts.append(layout)
self.frequencyConfigLayout.addLayout(layout)
layout.addWidget(QLabel(str(i)+(". " if i < 10 else ".")))
layout.addWidget(QLabel("Frequency"))
frequencyLine = QLineEdit()
self.frequencyLines.append(frequencyLine)
layout.addWidget(frequencyLine)
layout.addWidget(QLabel("Mode"))
modeBox = QComboBox()
self.modeBoxes.append(modeBox)
modeBox.addItems(self.modes)
layout.addWidget(modeBox)
layout.addWidget(QLabel("Position psc"))
positionPrescaler = QComboBox()
self.positionPrescalers.append(positionPrescaler)
positionPrescaler.addItems(prescalers)
layout.addWidget(positionPrescaler)
layout.addWidget(QLabel("Telemetry psc"))
telemetryPrescaler = QComboBox()
self.telemetryPrescalers.append(telemetryPrescaler)
telemetryPrescaler.addItems(prescalers)
layout.addWidget(telemetryPrescaler)
self.aprsSettings = QGroupBox('APRS Settings')
self.aprsSettingsLayout = QFormLayout()
self.aprsSettings.setLayout(self.aprsSettingsLayout)
self.aprsCallsign = QLineEdit()
self.aprsDesignator = QComboBox()
self.aprsDesignator.addItems([str(i) for i in range(0, 16)])
self.aprsDigiPath = QLineEdit()
self.aprsSymbol = QComboBox()
self.aprsSymbol.addItems(["BALLOON", "CAR", "ROCKET", "WX_STATION", "DIGI"])
self.aprsSettingsLayout.addRow('CALLSIGN', self.aprsCallsign)
self.aprsSettingsLayout.addRow('Designator', self.aprsDesignator)
self.aprsSettingsLayout.addRow('Digi path', self.aprsDigiPath)
self.aprsSettingsLayout.addRow('Symbol', self.aprsSymbol)
self.frequencyConfigTabLayout.addWidget(self.aprsSettings)
def init_sstv_layout(self):
self.sstvConfigTab = QWidget()
self.sstvConfigTabLayout = QHBoxLayout()
self.sstvConfigTab.setLayout(self.sstvConfigTabLayout)
self.sstvParametersGroupBox = QGroupBox('SSTV parameters')
self.sstvParametersGroupBoxLayout = QFormLayout()
self.sstvParametersGroupBox.setLayout(self.sstvParametersGroupBoxLayout)
self.sstvConfigTabLayout.addWidget(self.sstvParametersGroupBox)
self.sstvTextHeader = QLineEdit()
self.sstvBHeaderEnabled = QCheckBox()
self.sstvImageCnt = QComboBox()
self.sstvImageCnt.addItems([str(i) for i in range(0, 15)])
self.sstvImageCntPerFreq = QComboBox()
self.sstvImageCntPerFreq.addItems([str(i) for i in range(0, 15)])
self.sstvParametersGroupBoxLayout.addRow('Text header', self.sstvTextHeader)
self.sstvParametersGroupBoxLayout.addRow('Text header enabled', self.sstvBHeaderEnabled)
self.sstvParametersGroupBoxLayout.addRow('Images count', self.sstvImageCnt)
self.sstvParametersGroupBoxLayout.addRow('Images per freq', self.sstvImageCntPerFreq)
self.sstvImagesGroupBox = QGroupBox('Images')
self.sstvImagesGroupBoxLayout = QVBoxLayout()
self.sstvImagesGroupBox.setLayout(self.sstvImagesGroupBoxLayout)
self.sstvConfigTabLayout.addWidget(self.sstvImagesGroupBox)
self.sstvImagesSelectButton = QPushButton('Open images')
self.sstvImagesGroupBoxLayout.addWidget(self.sstvImagesSelectButton)
self.sstvImagesSelectButton.clicked.connect(self.sstv_file_chose_dialog)
self.sstvImagesGroupBoxLayout.addWidget(QLabel('Selected images:'))
self.sstvImagesSelectedList = QListWidget()
self.sstvImagesGroupBoxLayout.addWidget(self.sstvImagesSelectedList)
self.sstvImagesUploadButton = QPushButton('Upload images to device')
self.sstvImagesGroupBoxLayout.addWidget(self.sstvImagesUploadButton)
self.sstvImagesUploadButton.pressed.connect(self.init_sstv_upload)
self.tabs.addTab(self.sstvConfigTab, "SSTV config")
def sstv_file_chose_dialog(self):
options = QFileDialog.Options()
options |= QFileDialog.ReadOnly
self.sstv_files, _ = QFileDialog.getOpenFileNames(self,"Select file", "","PNG (*.png);JPEG (*.jpg);;All Files (*)", options=options)
self.sstvImagesSelectedList.clear()
for image in self.sstv_files:
thumbnail = QListWidgetItem()
thumbnail.setText(image)
thumbnail.setIcon(QIcon(image))
self.sstvImagesSelectedList.addItem(thumbnail)
def init_geoconig_layout(self):
self.geoConfigTab = QWidget()
self.geoConfigTabLayout = QVBoxLayout()
self.geoConfigTab.setLayout(self.geoConfigTabLayout)
self.geoConfigRowLayouts = []
self.geoConfigModeBoxes = []
self.geoConfigCordLines = []
self.positionModes = ['TX INSIDE', 'TX OUTSIDE']
for i in range(11):
RowLayout = QHBoxLayout()
self.geoConfigRowLayouts.append(RowLayout)
self.geoConfigTabLayout.addLayout(RowLayout)
RowLayout.addWidget(QLabel(str(i)+(". " if i < 10 else ".")))
PositionModeBox = QComboBox()
PositionModeBox.addItems(self.positionModes)
self.geoConfigModeBoxes.append(PositionModeBox)
RowLayout.addWidget(PositionModeBox)
CoordinatesLinesInRow = []
for i in range(5):
CoordLineX = QLineEdit()
CoordLineY = QLineEdit()
CoordinatesLinesInRow.append(CoordLineX)
CoordinatesLinesInRow.append(CoordLineY)
RowLayout.addWidget(QLabel("X"+str(i)))
RowLayout.addWidget(CoordLineX)
RowLayout.addWidget(QLabel("Y"+str(i)))
RowLayout.addWidget(CoordLineY)
self.geoConfigCordLines.append(CoordinatesLinesInRow)
self.tabs.addTab(self.geoConfigTab, "Geofencing")
def init_update_layout(self):
self.fwUpdateTab = QWidget()
self.fwUpdateTabLayout = QVBoxLayout()
self.fwUpdateTab.setLayout(self.fwUpdateTabLayout)
self.fwUpdateRowLayout = QHBoxLayout()
self.fwUpdateTabLayout.addLayout(self.fwUpdateRowLayout)
self.fwUpdateSelectButton = QPushButton('Select file')
self.fwUpdateRowLayout.addWidget(self.fwUpdateSelectButton)
self.fwUpdateSelectButton.clicked.connect(self.load_file_update_dialog)
self.fwUpdateFileLabel = QLabel("firmware file not selected")
self.fwUpdateRowLayout.addWidget(self.fwUpdateFileLabel)
self.fwUpdateUploadButton = QPushButton('upload')
self.fwUpdateUploadButton.clicked.connect(self.init_firmware_upload)
self.fwUpdateTabLayout.addWidget(self.fwUpdateUploadButton)
self.fwUpdateTabLayout.addStretch(1)
self.tabs.addTab(self.fwUpdateTab, "Software update")
def load_file_update_dialog(self):
options = QFileDialog.Options()
options |= QFileDialog.ReadOnly
self.fw_file, _ = QFileDialog.getOpenFileName(self,"Select file", "","BIN (*.bin);;All Files (*)", options=options)
if self.fw_file:
self.fwUpdateFileLabel.setText(f"Wybrane pliki: {self.fw_file}")
def init_firmware_upload(self):
if not self.fw_file:
return
self.disable_buttons(True)
self.fw_uploader.upload_firmware(self.fw_file, self.progress_callback)
def init_sstv_upload(self):
if not self.sstv_files:
return
self.disable_buttons(True)
self.img_uploader.upload_images(self.sstv_files, self.progress_callback)
# app = QApplication(sys.argv)
# widget = SerialConfigurator()
# widget.show()
# sys.exit(app.exec_())