commit b228b97ed1c6e9355895e67e4ccaeb8b7a8b5197 Author: Piotr Lewandowski Date: Sat Jul 29 21:15:52 2023 +0200 initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..567609b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build/ diff --git a/__pycache__/comm.cpython-311.pyc b/__pycache__/comm.cpython-311.pyc new file mode 100644 index 0000000..e7ed1c6 Binary files /dev/null and b/__pycache__/comm.cpython-311.pyc differ diff --git a/__pycache__/confprot.cpython-311.pyc b/__pycache__/confprot.cpython-311.pyc new file mode 100644 index 0000000..9df0a17 Binary files /dev/null and b/__pycache__/confprot.cpython-311.pyc differ diff --git a/__pycache__/flashaccess.cpython-311.pyc b/__pycache__/flashaccess.cpython-311.pyc new file mode 100644 index 0000000..00054eb Binary files /dev/null and b/__pycache__/flashaccess.cpython-311.pyc differ diff --git a/__pycache__/fwupd.cpython-311.pyc b/__pycache__/fwupd.cpython-311.pyc new file mode 100644 index 0000000..13e84e9 Binary files /dev/null and b/__pycache__/fwupd.cpython-311.pyc differ diff --git a/__pycache__/imgupload.cpython-311.pyc b/__pycache__/imgupload.cpython-311.pyc new file mode 100644 index 0000000..9ad1aab Binary files /dev/null and b/__pycache__/imgupload.cpython-311.pyc differ diff --git a/__pycache__/objects.cpython-311.pyc b/__pycache__/objects.cpython-311.pyc new file mode 100644 index 0000000..77bbfc4 Binary files /dev/null and b/__pycache__/objects.cpython-311.pyc differ diff --git a/__pycache__/stefan_conf.cpython-311.pyc b/__pycache__/stefan_conf.cpython-311.pyc new file mode 100644 index 0000000..92273c0 Binary files /dev/null and b/__pycache__/stefan_conf.cpython-311.pyc differ diff --git a/comm.py b/comm.py new file mode 100644 index 0000000..b275759 --- /dev/null +++ b/comm.py @@ -0,0 +1,69 @@ +import serial + +class SerialWrapper: + def __init__(self, serial_ref, timeout = 1000, timeout_idle_line = 100) -> None: + self.serial_ref = serial_ref + self.timeout = timeout + self.timeout_idle_line = timeout_idle_line + self.user_rx_callback = None + self.callback_slot_inited = False + + def send(self, data): + self.serial_ref.flush() + self.serial_ref.write(data) + self.serial_ref.waitForBytesWritten(-1) + + def receive(self, size = 0): + self.flush() + rx_data = bytearray() + while self.serial_ref.waitForReadyRead(self.timeout): + rx_data += self.serial_ref.readAll() + if(len(rx_data) >= size): + break + return rx_data + + def receive_detect_idle(self): + rx_data = bytearray() + rx_to = self.timeout + self.serial_ref.flush() + while self.serial_ref.waitForReadyRead(500): + rx_data += self.serial_ref.readAll() + rx_to = self.timeout_idle_line + + return rx_data + # while 1: + # rx_byte = self.serial_ref.readData(1) + # self.serial_ref.waitForReadyRead(rx_to) + # rx_byte = self.serial_ref.readData(1) + + # if rx_byte: + # rx_data += rx_byte + # rx_to = self.timeout_idle_line + # else: + # print("no rx data") + # return rx_data + def send_async(self, data): + self.serial_ref.write(data) + + def receive_async(self, size, callback): + self.serial_ref.readAll() + self.user_requested_read_size = size + self.user_data = bytearray() + self.user_rx_callback = callback + + if not self.callback_slot_inited: + self.callback_slot_inited = True + self.serial_ref.readyRead.connect(self.onRxDone) + + def onSendDone(self): + pass + + def onRxDone(self): + self.user_data += self.serial_ref.readAll() + if(len(self.user_data) >= self.user_requested_read_size): + print(f"rx data: {self.user_data}") + self.user_rx_callback(self.user_data) + pass + + def flush(self): + self.serial_ref.readAll() diff --git a/confprot.py b/confprot.py new file mode 100644 index 0000000..3e28baa --- /dev/null +++ b/confprot.py @@ -0,0 +1,228 @@ +import struct +from typing import Dict, Any + +class ECommand: + GET = 0 + SET = 1 + GET_RESP = 2 + SET_ACK = 3 + SET_NACK = 4 + GET_ACK = 5 + +class ConfProtHeader: + SIZE = 3 + ACCESS_SUB_SIZE = 4 + def __init__(self) -> None: + self.fixed_id = 0xAF43 + self.command = ECommand.GET + pass + + def serialize(self): + return struct.pack(" bool: + if not raw_data or len(raw_data) < 3: + return False + self.fixed_id, self.command = struct.unpack(" None: + self.conf_objects = conf_objects + self.comm_interface = comm_interface + self.idx = 0 + self.sub_idx = 0 + self.irq_lock = True + pass + + def serialize_set(self, conf_obj, idx = 0) -> bytearray: + data = self.generate_header(ECommand.SET) + data += bytearray([conf_obj.databank_id, + conf_obj.obj_id, + idx, + conf_obj.len]) + if hasattr(conf_obj, 'table_size'): + data.extend(conf_obj.serialize(idx)) + else: + data.extend(conf_obj.serialize()) + return data + + def serialize_get(self, conf_obj, idx = 0) -> bytearray: + data = self.generate_header(ECommand.GET) + data += bytearray([conf_obj.databank_id, + conf_obj.obj_id, + idx, + conf_obj.len]) + return data + + def set_sync(self, conf_obj, idx = 0) -> bool: + tx_frame = self.serialize_set(conf_obj, idx) + self.comm_interface.send(tx_frame) + rx_frame = self.comm_interface.receive(ConfProtHeader.SIZE) + Header = ConfProtHeader() + error = Header.deserialize(rx_frame) + if error or Header.command != ECommand.SET_ACK: + return False + return True + + def get_sync(self, conf_obj, idx = 0) -> bool: + tx_frame = self.serialize_get(conf_obj, idx) + self.comm_interface.send(tx_frame) + rx_frame = self.comm_interface.receive(conf_obj.len + ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE) + print(rx_frame) + Header = ConfProtHeader() + status = Header.deserialize(rx_frame) + print(f"hader data {Header.command} {Header.fixed_id} {status} {status == False} {Header.command != ECommand.GET_RESP}") + if status == False or Header.command != ECommand.GET_RESP: + print("response header not valid") + return False + if idx != 0: + return conf_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:], idx) + return conf_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:]) + + def set_all_sync(self, status_callback = None): + processing_idx = 0 + for obj in self.conf_objects: + if hasattr(obj, 'table_size'): + for i in range(obj.table_size): + processing_idx += self.set_sync(obj, i) + else: + processing_idx += self.set_sync(obj) + if status_callback != None: + status_callback(100*processing_idx/len(self.conf_objects)) + + def get_all_sync(self, status_callback = None): + processing_idx = 0 + for obj in self.conf_objects: + if hasattr(obj, 'table_size'): + for i in range(obj.table_size): + processing_idx += self.get_sync(obj, i) + else: + processing_idx += self.get_sync(obj) + if status_callback != None: + status_callback(100*processing_idx/len(self.conf_objects)) + + def set_all_async(self, status_callback): + if self.idx > 0: + return False + self.status_callback = status_callback + self.idx = 0 + self.sub_idx = 0 + self.send_next_set_async() + + def send_next_set_async(self): + if self.idx >= len(self.conf_objects): + print("send cmd finished") + self.idx = 0 + return False + obj = self.conf_objects[self.idx] + if hasattr(obj, 'table_size'): + if self.sub_idx < obj.table_size: + tx_frame = self.serialize_set(self.conf_objects[self.idx], self.sub_idx) + print(f"send cmd {self.idx} {self.sub_idx}") + self.sub_idx += 1 + else: # sub idx finished + self.sub_idx = 0 + self.idx += 1 + return self.send_next_set_async() + else: + tx_frame = self.serialize_set(self.conf_objects[self.idx]) + print(f"send cmd {self.idx} {self.sub_idx}") + self.idx += 1 + + self.comm_interface.flush() + self.comm_interface.send_async(tx_frame) + self.comm_interface.receive_async(ConfProtHeader.SIZE, self.rx_done_cb_set) + return True + + def rx_done_cb_set(self, rx_frame): + if rx_frame == None: + self.status_callback(-1) + + Header = ConfProtHeader() + error = Header.deserialize(rx_frame) + if error == False or Header.command != ECommand.SET_ACK: + return self.status_callback(-1) + if self.send_next_set_async(): + self.status_callback(100 * self.idx/len(self.conf_objects)) + else: + self.status_callback(100) + + def get_all_async(self, status_callback): + if self.idx > 0: + return False + self.status_callback = status_callback + self.idx = 0 + self.sub_idx = 0 + self.send_next_get() + + def rx_done_cb_get(self, rx_frame): + if len(rx_frame) != self.current_obj.len + ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE: + print("rx wrong len") + self.status_callback(-1) + self.idx = 0 + self.sub_idx = 0 + return + + Header = ConfProtHeader() + HeaderDesResult = Header.deserialize(rx_frame) + if not HeaderDesResult or Header.command != ECommand.GET_RESP: + print("rx wrong header") + self.status_callback(-1) + return + + if self.sub_idx > 0: + res = self.current_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:], self.sub_idx - 1) + else: + res = self.current_obj.deserialize(rx_frame[ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE:]) + + if not res: + print(f"deserialize err {self.idx - 1}, {self.sub_idx - 1}") + + if not self.send_next_get(): + self.status_callback(100) + else: + self.status_callback(100 * self.idx / len(self.conf_objects)) + + def send_next_get(self): + if(self.idx >= len(self.conf_objects)): + self.idx = 0 + return False + + self.current_obj = self.conf_objects[self.idx] + if hasattr(self.current_obj, 'table_size'): + if self.sub_idx < self.current_obj.table_size: + tx_frame = self.serialize_get(self.conf_objects[self.idx], self.sub_idx) + print(f"get cmd {self.idx} {self.sub_idx}") + self.sub_idx += 1 + else: # sub idx finished + self.sub_idx = 0 + self.idx += 1 + return self.send_next_get() + else: + tx_frame = self.serialize_get(self.conf_objects[self.idx]) + print(f"send cmd {self.idx} {self.sub_idx}") + self.idx += 1 + + self.comm_interface.flush() + self.comm_interface.send_async(tx_frame) + self.comm_interface.receive_async(self.current_obj.len + ConfProtHeader.SIZE + ConfProtHeader.ACCESS_SUB_SIZE, self.rx_done_cb_get) + return True + + def generate_header(self, cmd): + return bytearray([0x43, 0xAF, cmd]) + + +if __name__ == "__main__": + import objects + import serial + ser = serial.Serial('COM24', 115200, timeout=1) + aprs_conf = [] + aprs_conf.append(objects.ObjAprsConfig(0, 1)) + prot = ConfProt(aprs_conf, None) + tx_frame = prot.serialize_get(aprs_conf[0]) + ser.write(tx_frame) + rx_frame = ser.read(100) + print(rx_frame) \ No newline at end of file diff --git a/docs/freq_conf.png b/docs/freq_conf.png new file mode 100644 index 0000000..ea6e0ff Binary files /dev/null and b/docs/freq_conf.png differ diff --git a/docs/tracker.png b/docs/tracker.png new file mode 100644 index 0000000..59d8a97 Binary files /dev/null and b/docs/tracker.png differ diff --git a/flashaccess.py b/flashaccess.py new file mode 100644 index 0000000..3b7418f --- /dev/null +++ b/flashaccess.py @@ -0,0 +1,73 @@ +import struct + +class FlashAccess: + frame_id = 0xFFAABBCC + erase_sequence = 0xAABBCCDD + ack_byte = 0xAA + ack_len = 1 + mtu = 128 + img_block_size = 3 * 0x10000 + + def __init__(self, comm_interface): + self.comm_interface = comm_interface + + def write_to_flash(self, address, data, progress_callback): + self.start_address = address + self.data = data + self.progress_callback = progress_callback + self.idx = 0 + self.erase_idx = 0 + + if not len(data): + return self.progress_callback(-1) + + tx_frame = self.encode_next_erase_frame() + if not len(tx_frame): + return self.progress_callback(-1) + + self.comm_interface.send_async(tx_frame) + self.comm_interface.receive_async(FlashAccess.ack_len, self.receive_handler) + + def encode_next_erase_frame(self): + erase_offset = self.erase_idx * FlashAccess.img_block_size + if erase_offset >= len(self.data): + return bytearray() + erase_frame = struct.pack('= len(self.data): + return bytearray() + + data_size = min(FlashAccess.mtu, len(self.data) - self.idx) + tx_frame = self.encode_header(self.start_address + self.idx, + data_size) + tx_frame += self.data[self.idx:self.idx + data_size] + self.idx += data_size + return tx_frame + + def receive_handler(self, rx_data): + if len(rx_data) != 1 or rx_data[0] != FlashAccess.ack_byte: + return self.progress_callback(-1) + + erase_frame = self.encode_next_erase_frame() + if len(erase_frame): + next_frame = erase_frame + else: + next_frame = self.encode_next_frame() + if(not len(next_frame)): + return self.progress_callback(100) + + self.comm_interface.send_async(next_frame) + self.comm_interface.receive_async(1, self.receive_handler) + + def encode_header(self, address, len): + return struct.pack(' bool: + if len(raw_data) != self.len: + print(f"deserialize err rx size {len(raw_data)} but required {self.len}") + return False + + ( + self.callsign, + self.destination, + self.digi_path, + self.callsign_designator, + self.destination_designator, + self.symbol + ) = struct.unpack('15s15s30sbbB', raw_data) + self.callsign = self.callsign.decode().replace('\x00', '') + self.destination = self.destination.decode().replace('\x00', '') + self.digi_path = self.digi_path.decode().replace('\x00', '') + print("deserialize res:") + print(self.callsign) + print(self.symbol) + print(self.destination_designator) + return True + +class ObjFreq(ObjBase): + def __init__(self, db_id, obj_id): + ObjBase.__init__(self, db_id, obj_id) + self.len = 7 + + self.frequency = 3 + self.radio_mode = 0 + self.tx_power = 0 + self.telemetry_psc = 0 + self.position_psc = 0 + + def serialize(self): + return struct.pack(' bool: + if len(raw_data) != self.len: + print(f"deserialize err rx size {len(raw_data)} but required {self.len}") + return False + ( + self.frequency, + self.radio_mode, + self.tx_power, + psc_bath + ) = struct.unpack('> 4) & 0xFF + self.position_psc = int(psc_bath) & 0xFF + return True + +class ObjTable(ObjBase): + def __init__(self, init_instance, table_size): + ObjBase.__init__(self, init_instance.databank_id, init_instance.obj_id) + self.len = init_instance.len + self.table_size = table_size + init_instance_type = type(init_instance) + self.objects = [init_instance_type(init_instance.databank_id, init_instance.obj_id) for _ in range(table_size)] + + def serialize(self, idx): + return self.objects[idx].serialize() + + def deserialize(self, data, idx = 0): + return self.objects[idx].deserialize(data) + +class TPoint: + def __init__(self, x, y): + self.x = x + self.y = y + +class ObjGeoConf(ObjBase): + def __init__(self, db_id, obj_id): + ObjBase.__init__(self, db_id, obj_id) + self.len = 22 + self.points = [TPoint(0, 0)] * 5 + + self.b_inside = False + + def serialize(self): + points_data = [] + for i in range(len(self.points)): + points_data.append(self.points[i].x) + points_data.append(self.points[i].y) + + return struct.pack('<10hH', + *points_data, + self.b_inside) + + def deserialize(self, raw_data) -> bool: + if len(raw_data) != self.len: + print(f"deserialize err rx size {len(raw_data)} but required {self.len}") + return False + + raw_data = struct.unpack('<10hH', raw_data) + points_data = raw_data[:-1] + self.b_inside = raw_data[-1] + print(f"len of points_data {len(points_data)}") + for i in range(0, len(points_data), 2): + self.points[int(i/2)] = TPoint(points_data[i], points_data[i+1]) + + return True + +class ObjSstvConfig(ObjBase): + def __init__(self, db_id, obj_id): + ObjBase.__init__(self, db_id, obj_id) + self.len = 53 + self.sstv_text_field = str('\0') * self.len + self.images_cnt = 0 + self.images_per_freq = 0 + self.b_header_enabled = True + + def serialize(self): + return struct.pack('<50sBBB', self.sstv_text_field, + self.images_cnt, + self.images_per_freq, + self.b_header_enabled) + + def deserialize(self, raw_data): + if len(raw_data) != self.len: + return False + + (self.sstv_text_field, + self.images_cnt, + self.images_per_freq, + self.b_header_enabled) = struct.unpack('<50sBBB') + return True \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..35f8ad9 --- /dev/null +++ b/readme.md @@ -0,0 +1,13 @@ +# tracker configurator +## tracker setup +![tracker](./docs/tracker.png) + +* connect usb uart adapter to GND, TX, TX pins +* if battery isnt soldered connect power to 3.3V, be sure to not exceed 3.6V +* enter configuration mode by shortly pressing button 5 times +* after entering configuration mode led will blink 3 times in 5 s intervals + +## connecting configurator +![configurator](./docs/freq_conf.png) +* select COM port of usb adapter and press connect +* press GET to read configuration from tracker \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..804b45a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +Pillow==9.5.0 +Pillow==10.0.0 +PyQt5==5.15.9 +PyQt5_sip==12.12.1 +pyserial==3.5 diff --git a/setter.py b/setter.py new file mode 100644 index 0000000..92d6055 --- /dev/null +++ b/setter.py @@ -0,0 +1,154 @@ +import sys +import time +import struct +from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QComboBox +from PyQt5.QtCore import Qt +from PyQt5.QtGui import QColor, QPalette +import serial +import serial.tools.list_ports + +class SerialApp(QWidget): + def __init__(self): + super().__init__() + self.init_ui() + + def init_ui(self): + layout = QVBoxLayout() + + self.serial_port = serial.Serial() + + # Wybór portu szeregowego + port_layout = QHBoxLayout() + port_layout.addWidget(QLabel('Port szeregowy:')) + self.port_select = QComboBox() + ports = serial.tools.list_ports.comports() + for port in ports: + self.port_select.addItem(port.device) + port_layout.addWidget(self.port_select) + layout.addLayout(port_layout) + + # Ustawienia numeru banku + bank_layout = QHBoxLayout() + bank_layout.addWidget(QLabel('Numer banku konfiguracyjnego:')) + self.bank_input = QLineEdit() + bank_layout.addWidget(self.bank_input) + layout.addLayout(bank_layout) + + # Ustawienia numeru obiektu + object_layout = QHBoxLayout() + object_layout.addWidget(QLabel('Numer obiektu konfiguracyjnego:')) + self.object_input = QLineEdit() + object_layout.addWidget(self.object_input) + layout.addLayout(object_layout) + + # Wybór akcji + action_layout = QHBoxLayout() + action_layout.addWidget(QLabel('Akcja:')) + self.action_select = QComboBox() + self.action_select.addItem('GET', 0) + self.action_select.addItem('SET', 1) + action_layout.addWidget(self.action_select) + layout.addLayout(action_layout) + + # Pole danych + data_layout = QHBoxLayout() + data_layout.addWidget(QLabel('Dane:')) + self.data_input = QLineEdit() + data_layout.addWidget(self.data_input) + layout.addLayout(data_layout) + + # Przyciski + button_layout = QHBoxLayout() + self.send_button = QPushButton('Wyślij') + self.send_button.clicked.connect(self.send_data) + button_layout.addWidget(self.send_button) + layout.addLayout(button_layout) + + self.setLayout(layout) + + def send_data(self): + bank = int(self.bank_input.text()) + obj = int(self.object_input.text()) + action = self.action_select.currentData() + + if action == 0: # GET + self.send_get(bank, obj) + elif action == 1: # SET + data = self.data_input.text() + self.send_set(bank, obj, data) + + def open_serial_port(self): + if not self.serial_port.is_open: + self.serial_port.port = self.port_select.currentText() + self.serial_port.baudrate = 9600 + self.serial_port.timeout = 1 # Ustal timeout na 1 sekundę + self.serial_port.open() + + def send_get(self, bank, obj): + self.open_serial_port() + + header = 0xABCD + action = 0 + frame = struct.pack('>HBB', header, action, bank) + struct.pack('B', obj) + self.serial_port.write(frame) + response = self.wait_for_get_response() + + if response is not None: + self.data_input.setText(response) + + def send_set(self, bank, obj, data): + self.open_serial_port() + + header = 0xABCD + action = 1 + data_length = len(data) + frame = struct.pack('>HBBB', header, action, bank, obj) + struct.pack('B', data_length) + data.encode() + self.serial_port.write(frame) + ack_received = self.wait_for_ack() + + if ack_received: + self.set_data_input_color("green") + print("ACK received") + else: + self.set_data_input_color("red") + print("ACK not received") + + def set_data_input_color(self, color): + palette = self.data_input.palette() + palette.setColor(QPalette.Text, QColor(color)) + self.data_input.setPalette(palette) + + def wait_for_ack(self): + start_time = time.time() + timeout = 5 # Oczekuj na odpowiedź przez 5 sekund + + while time.time() - start_time < timeout: + if self.serial_port.in_waiting > 0: + response = self.serial_port.read(5) + header, action, _, _ = struct.unpack('>HBBB', response) + + if header == 0xABCD and action == 3: + return True + + return False + + def wait_for_get_response(self): + start_time = time.time() + timeout = 5 # Oczekuj na odpowiedź przez 5 sekund + + while time.time() - start_time < timeout: + if self.serial_port.in_waiting > 0: + response = self.serial_port.read(6) + header, action, _, _, data_length = struct.unpack('>HBBBB', response) + + if header == 0xABCD and action == 4: + data = self.serial_port.read(data_length) + return data.decode() + + return None + +if __name__ == '__main__': + app = QApplication(sys.argv) + ex = SerialApp() + ex.show() + sys.exit(app.exec_()) \ No newline at end of file diff --git a/stefan_conf.py b/stefan_conf.py new file mode 100644 index 0000000..b5fcb71 --- /dev/null +++ b/stefan_conf.py @@ -0,0 +1,440 @@ +import sys +from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QComboBox, QLabel, QLineEdit, QGroupBox, QFormLayout, QProgressBar, QTabWidget, QCheckBox, QFileDialog, QListWidget, QListWidgetItem +from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo +from PyQt5.QtCore import pyqtSlot, QStringListModel +from PyQt5.QtGui import QIcon +from objects import TPoint + +class SerialConfigurator(QWidget): + def __init__(self, confprot, config_objects, serialPort, fw_uploader, img_uploader): + self.confprot = confprot + self.config = config_objects + self.serialPort = serialPort + self.fw_uploader = fw_uploader + self.img_uploader = img_uploader + self.fw_file = None + super().__init__() + self.initUI() + self.init_layout_lists() + + def show(self): + super().show() + self.resize(500, self.size().height()) + self.setFixedSize(self.size()) + + def init_layout_lists(self): + self.aprsSymbolMap = { + "BALLOON" : 6, + "CAR" : 3, + "ROCKET" : 5, + "WX_STATION" : 1, + "DIGI" : 0, + } + self.aprsSymbolMapInverse = {v: k for k, v in self.aprsSymbolMap.items()} + + self.geoConfigModeMap = { + 'TX INSIDE' : 0, + 'TX OUTSIDE' : 1 + } + self.geoConfigModeMapInverse = {v: k for k, v in self.geoConfigModeMap.items()} + + # common setters + self.str_to_qlabel = lambda x, y: y.setText(str(x)) + self.str_to_qcombobox = lambda x, y: y.setCurrentText(str(x)) + cord_lines_to_objs = lambda x : [TPoint(int(float(x[i].text()) * 100), int(float(x[i+1].text()) * 100)) for i in range(0, len(x), 2)] + def objs_to_cord_lines(x, y): + for i in range(0, len(y), 2): + y[i].setText(str(float(x[int(i/2)].x)/100)) + y[i+1].setText(str(float(x[int(i/2)].y)/100)) + # stefan obj(idx, attribute) | qtObj | setter | getter + + self.config_map = [ + [('aprs_config', "callsign"), self.aprsCallsign, lambda x: x.text(), self.str_to_qlabel ], + [('aprs_config', "destination"), None, None, None ], + [('aprs_config', "digi_path"), self.aprsDigiPath, lambda x: x.text(), self.str_to_qlabel ], + [('aprs_config', "callsign_designator"), self.aprsDesignator, lambda x: int(x.currentText()), self.str_to_qcombobox ], + [('aprs_config', "destination_designator"), None, None, None ], + [('aprs_config', "symbol"), self.aprsSymbol, lambda x: int(self.aprsSymbolMap[x.currentText()]), lambda x,y : self.str_to_qcombobox(self.aprsSymbolMapInverse[x], y) ], + + [('freq_table_config', "frequency"), self.frequencyLines, lambda x: int(float(x.text())*1000), lambda x,y : self.str_to_qlabel(float(x)/1000, y) ], + [('freq_table_config', "telemetry_psc"), self.telemetryPrescalers, lambda x: int(x.currentText()), self.str_to_qcombobox ], + [('freq_table_config', "position_psc"), self.positionPrescalers, lambda x: int(x.currentText()), self.str_to_qcombobox ], + [('freq_table_config', "radio_mode"), self.modeBoxes, lambda x: int(self.modes.index(x.currentText())), lambda x, y : y.setCurrentText(self.modes[x]) ], + + [('geofencing_config', "points"), self.geoConfigCordLines, cord_lines_to_objs, objs_to_cord_lines ], + [('geofencing_config', "b_inside"), self.geoConfigModeBoxes, lambda x: int(self.geoConfigModeMap[x.currentText()]), lambda x,y : self.str_to_qcombobox(self.geoConfigModeMapInverse[x], y) ], + + [('sstv_config', "sstv_text_field"), self.sstvTextHeader, lambda x: x.text(), self.str_to_qlabel ], + [('sstv_config', "images_cnt"), self.sstvImageCnt, lambda x: int(x.currentText()), self.str_to_qcombobox ], + [('sstv_config', "images_per_freq"), self.sstvImageCntPerFreq, lambda x: int(x.currentText()), self.str_to_qcombobox ], + [('sstv_config', "b_header_enabled"), self.sstvBHeaderEnabled, lambda x: int(x.isChecked()), lambda x,y : y.setChecked( x > 0 ) ], + ] + + @pyqtSlot() + def connect(self): + portName = self.comboBox.currentText() + # self.serialPort = QSerialPort() + self.serialPort.setPortName(portName) + self.serialPort.setBaudRate(115200) + + if self.serialPort.open(QSerialPort.ReadWrite, ): + self.connectionLabel.setText("Connected to " + portName) + else: + self.connectionLabel.setText("Failed to connect to " + portName) + + def set_command(self): + for key, obj_qt, setter, getter in self.config_map: + if obj_qt == None: + continue + dict_idx, attr = key + obj = self.config[dict_idx] + if not hasattr(obj, 'table_size'): + setattr(obj, attr, setter(obj_qt)) + else: + for i in range(min(obj.table_size, len(obj_qt))): + setattr(obj.objects[i], attr, setter(obj_qt[i])) + self.confprot.set_all_sync(self.update_progressbar) + + def set_command_async(self): + print("set command requested") + for key, obj_qt, setter, getter in self.config_map: + if obj_qt == None: + continue + dict_idx, attr = key + obj = self.config[dict_idx] + if not hasattr(obj, 'table_size'): + setattr(obj, attr, setter(obj_qt)) + else: + for i in range(min(obj.table_size, len(obj_qt))): + setattr(obj.objects[i], attr, setter(obj_qt[i])) + + self.disable_buttons(True) + self.confprot.set_all_async(self.progress_callback) + + def get_command_async(self): + self.disable_buttons(True) + self.confprot.get_all_async(self.progress_callback_get) + + def progress_callback_get(self, progress): + self.progress_callback(progress) + if progress != 100: + return + #update qt objects + for key, obj_qt, setter, getter in self.config_map: + if obj_qt == None: + continue + dict_idx, attr = key + obj = self.config[dict_idx] + if not hasattr(obj, 'table_size'): + getter(getattr(obj, attr), obj_qt) + else: + for i in range(min(obj.table_size, len(obj_qt))): + getter(getattr(obj.objects[i], attr), obj_qt[i]) + + def progress_callback(self, progress): + if progress < 0 or progress == 100: + self.disable_buttons(False) + + if progress >= 0: + self.update_progressbar(progress) + + def disable_buttons(self, disabled): + self.getCommandButton.setDisabled(disabled) + self.setCommandButton.setDisabled(disabled) + self.fwUpdateUploadButton.setDisabled(disabled) + self.sstvImagesUploadButton.setDisabled(disabled) + + def get_command(self): + self.confprot.get_all_sync(self.update_progressbar) + for key, obj_qt, setter, getter in self.config_map: + if obj_qt == None: + continue + dict_idx, attr = key + obj = self.config[dict_idx] + if not hasattr(obj, 'table_size'): + getter(getattr(obj, attr), obj_qt) + else: + for i in range(min(obj.table_size, len(obj_qt))): + getter(getattr(obj.objects[i], attr), obj_qt[i]) + + def update_progressbar(self, percent): + print(percent) + self.progressBar.setValue(int(percent)) + + def initUI(self): + self.setWindowTitle('SQ9P Tracker Configurator') + + self.layout = QVBoxLayout() + self.setLayout(self.layout) + + self.connectionSettings = QGroupBox('Connection Settings') + self.connectionSettingsLayout = QVBoxLayout() + self.connectionSettings.setLayout(self.connectionSettingsLayout) + + self.uartConfigRowLayout = QHBoxLayout() + self.connectionSettingsLayout.addLayout(self.uartConfigRowLayout) + + self.comboBox = QComboBox(self) + self.serialPorts = QSerialPortInfo.availablePorts() + for port in self.serialPorts: + self.comboBox.addItem(port.portName()) + + self.connectionLabel = QLabel(self) + self.connectionLabel.setText("Disconnected") + + self.connectButton = QPushButton('Connect', self) + self.connectButton.clicked.connect(self.connect) + + self.uartConfigRowLayout.addWidget(self.comboBox) + self.uartConfigRowLayout.addWidget(self.connectionLabel) + self.uartConfigRowLayout.addWidget(self.connectButton) + + self.uartCommandRowLayout = QHBoxLayout() + self.connectionSettingsLayout.addLayout(self.uartCommandRowLayout) + + self.getCommandButton = QPushButton('Get', self) + self.getCommandButton.clicked.connect(self.get_command_async) + self.setCommandButton = QPushButton('Set', self) + self.setCommandButton.clicked.connect(self.set_command_async) + + self.uartCommandRowLayout.addWidget(self.getCommandButton) + self.uartCommandRowLayout.addWidget(self.setCommandButton) + + self.progresBarLayout = QHBoxLayout() + self.connectionSettingsLayout.addLayout(self.progresBarLayout) + + self.progressBar = QProgressBar(self) + self.progresBarLayout.addWidget(self.progressBar) + self.progressBar.setValue(0) + self.layout.addWidget(self.connectionSettings) + + self.tabs = QTabWidget() + self.init_frequency_layout() + self.init_geoconig_layout() + self.init_sstv_layout() + self.init_update_layout() + + self.multiConfig = QGroupBox('Multi Configuration') + self.multiConfigLayout = QVBoxLayout() + self.multiConfig.setLayout(self.multiConfigLayout) + + self.multiConfigLayout.addWidget(self.tabs) + self.layout.addWidget(self.multiConfig) + #self.layout.addWidget(self.tabs) + # self.layout.addWidget(self.aprsSettings) + + def init_frequency_layout(self): + self.frequencyConfigTab = QWidget() + self.frequencyConfigTabLayout = QHBoxLayout() + self.frequencyConfigTab.setLayout(self.frequencyConfigTabLayout) + + self.frequencyConfigGroupBox = QGroupBox('Frequency Settings') + self.frequencyConfigLayout = QVBoxLayout() + self.frequencyConfigGroupBox.setLayout(self.frequencyConfigLayout) + # self.frequencyConfigTab.setLayout(self.frequencyConfigLayout) + self.frequencyConfigTabLayout.addWidget(self.frequencyConfigGroupBox) + self.tabs.addTab(self.frequencyConfigTab, "Frequency") + + self.frequencyLayouts = [] + self.frequencyLines = [] + self.modeBoxes = [] + self.positionPrescalers = [] + self.telemetryPrescalers = [] + + self.modes = ['APRS AFSK 1200', 'APRS AFSK 9600', 'APRS AFSK CUSTOM', 'APRS_LORA 300', 'APRS LORA 1200', 'APRS LORA CUSTOM', '4FSK HORUS V2', 'FM SSTV', 'FM AUDIO'] + prescalers = [str(i) for i in range(0, 16)] + + self.columnLabelsLayout = QHBoxLayout() + self.frequencyConfigLayout.addLayout(self.columnLabelsLayout) + + for i in range(11): + layout = QHBoxLayout() + self.frequencyLayouts.append(layout) + self.frequencyConfigLayout.addLayout(layout) + + layout.addWidget(QLabel(str(i)+(". " if i < 10 else "."))) + layout.addWidget(QLabel("Frequency")) + + frequencyLine = QLineEdit() + self.frequencyLines.append(frequencyLine) + layout.addWidget(frequencyLine) + + layout.addWidget(QLabel("Mode")) + modeBox = QComboBox() + self.modeBoxes.append(modeBox) + modeBox.addItems(self.modes) + layout.addWidget(modeBox) + + layout.addWidget(QLabel("Position psc")) + positionPrescaler = QComboBox() + self.positionPrescalers.append(positionPrescaler) + positionPrescaler.addItems(prescalers) + layout.addWidget(positionPrescaler) + + layout.addWidget(QLabel("Telemetry psc")) + telemetryPrescaler = QComboBox() + self.telemetryPrescalers.append(telemetryPrescaler) + telemetryPrescaler.addItems(prescalers) + layout.addWidget(telemetryPrescaler) + + self.aprsSettings = QGroupBox('APRS Settings') + self.aprsSettingsLayout = QFormLayout() + self.aprsSettings.setLayout(self.aprsSettingsLayout) + + self.aprsCallsign = QLineEdit() + self.aprsDesignator = QComboBox() + self.aprsDesignator.addItems([str(i) for i in range(0, 16)]) + self.aprsDigiPath = QLineEdit() + self.aprsSymbol = QComboBox() + self.aprsSymbol.addItems(["BALLOON", "CAR", "ROCKET", "WX_STATION", "DIGI"]) + + self.aprsSettingsLayout.addRow('CALLSIGN', self.aprsCallsign) + self.aprsSettingsLayout.addRow('Designator', self.aprsDesignator) + self.aprsSettingsLayout.addRow('Digi path', self.aprsDigiPath) + self.aprsSettingsLayout.addRow('Symbol', self.aprsSymbol) + self.frequencyConfigTabLayout.addWidget(self.aprsSettings) + + + def init_sstv_layout(self): + self.sstvConfigTab = QWidget() + self.sstvConfigTabLayout = QHBoxLayout() + self.sstvConfigTab.setLayout(self.sstvConfigTabLayout) + + self.sstvParametersGroupBox = QGroupBox('SSTV parameters') + self.sstvParametersGroupBoxLayout = QFormLayout() + self.sstvParametersGroupBox.setLayout(self.sstvParametersGroupBoxLayout) + self.sstvConfigTabLayout.addWidget(self.sstvParametersGroupBox) + + self.sstvTextHeader = QLineEdit() + self.sstvBHeaderEnabled = QCheckBox() + self.sstvImageCnt = QComboBox() + self.sstvImageCnt.addItems([str(i) for i in range(0, 15)]) + self.sstvImageCntPerFreq = QComboBox() + self.sstvImageCntPerFreq.addItems([str(i) for i in range(0, 15)]) + + self.sstvParametersGroupBoxLayout.addRow('Text header', self.sstvTextHeader) + self.sstvParametersGroupBoxLayout.addRow('Text header enabled', self.sstvBHeaderEnabled) + self.sstvParametersGroupBoxLayout.addRow('Images count', self.sstvImageCnt) + self.sstvParametersGroupBoxLayout.addRow('Images per freq', self.sstvImageCntPerFreq) + + self.sstvImagesGroupBox = QGroupBox('Images') + self.sstvImagesGroupBoxLayout = QVBoxLayout() + self.sstvImagesGroupBox.setLayout(self.sstvImagesGroupBoxLayout) + self.sstvConfigTabLayout.addWidget(self.sstvImagesGroupBox) + + self.sstvImagesSelectButton = QPushButton('Open images') + self.sstvImagesGroupBoxLayout.addWidget(self.sstvImagesSelectButton) + self.sstvImagesSelectButton.clicked.connect(self.sstv_file_chose_dialog) + + self.sstvImagesGroupBoxLayout.addWidget(QLabel('Selected images:')) + self.sstvImagesSelectedList = QListWidget() + self.sstvImagesGroupBoxLayout.addWidget(self.sstvImagesSelectedList) + + self.sstvImagesUploadButton = QPushButton('Upload images to device') + self.sstvImagesGroupBoxLayout.addWidget(self.sstvImagesUploadButton) + self.sstvImagesUploadButton.pressed.connect(self.init_sstv_upload) + + self.tabs.addTab(self.sstvConfigTab, "SSTV config") + + def sstv_file_chose_dialog(self): + options = QFileDialog.Options() + options |= QFileDialog.ReadOnly + self.sstv_files, _ = QFileDialog.getOpenFileNames(self,"Select file", "","PNG (*.png);JPEG (*.jpg);;All Files (*)", options=options) + + self.sstvImagesSelectedList.clear() + for image in self.sstv_files: + thumbnail = QListWidgetItem() + thumbnail.setText(image) + thumbnail.setIcon(QIcon(image)) + self.sstvImagesSelectedList.addItem(thumbnail) + + def init_geoconig_layout(self): + self.geoConfigTab = QWidget() + self.geoConfigTabLayout = QVBoxLayout() + self.geoConfigTab.setLayout(self.geoConfigTabLayout) + + self.geoConfigRowLayouts = [] + self.geoConfigModeBoxes = [] + self.geoConfigCordLines = [] + + self.positionModes = ['TX INSIDE', 'TX OUTSIDE'] + + for i in range(11): + RowLayout = QHBoxLayout() + self.geoConfigRowLayouts.append(RowLayout) + self.geoConfigTabLayout.addLayout(RowLayout) + + RowLayout.addWidget(QLabel(str(i)+(". " if i < 10 else "."))) + + PositionModeBox = QComboBox() + PositionModeBox.addItems(self.positionModes) + self.geoConfigModeBoxes.append(PositionModeBox) + RowLayout.addWidget(PositionModeBox) + + CoordinatesLinesInRow = [] + for i in range(5): + CoordLineX = QLineEdit() + CoordLineY = QLineEdit() + CoordinatesLinesInRow.append(CoordLineX) + CoordinatesLinesInRow.append(CoordLineY) + + RowLayout.addWidget(QLabel("X"+str(i))) + RowLayout.addWidget(CoordLineX) + RowLayout.addWidget(QLabel("Y"+str(i))) + RowLayout.addWidget(CoordLineY) + + self.geoConfigCordLines.append(CoordinatesLinesInRow) + + self.tabs.addTab(self.geoConfigTab, "Geofencing") + + def init_update_layout(self): + self.fwUpdateTab = QWidget() + self.fwUpdateTabLayout = QVBoxLayout() + self.fwUpdateTab.setLayout(self.fwUpdateTabLayout) + + self.fwUpdateRowLayout = QHBoxLayout() + self.fwUpdateTabLayout.addLayout(self.fwUpdateRowLayout) + + self.fwUpdateSelectButton = QPushButton('Select file') + self.fwUpdateRowLayout.addWidget(self.fwUpdateSelectButton) + self.fwUpdateSelectButton.clicked.connect(self.load_file_update_dialog) + + self.fwUpdateFileLabel = QLabel("firmware file not selected") + self.fwUpdateRowLayout.addWidget(self.fwUpdateFileLabel) + + self.fwUpdateUploadButton = QPushButton('upload') + self.fwUpdateUploadButton.clicked.connect(self.init_firmware_upload) + self.fwUpdateTabLayout.addWidget(self.fwUpdateUploadButton) + self.fwUpdateTabLayout.addStretch(1) + + self.tabs.addTab(self.fwUpdateTab, "Software update") + + def load_file_update_dialog(self): + options = QFileDialog.Options() + options |= QFileDialog.ReadOnly + self.fw_file, _ = QFileDialog.getOpenFileName(self,"Select file", "","BIN (*.bin);;All Files (*)", options=options) + + if self.fw_file: + self.fwUpdateFileLabel.setText(f"Wybrane pliki: {self.fw_file}") + + def init_firmware_upload(self): + if not self.fw_file: + return + + self.disable_buttons(True) + self.fw_uploader.upload_firmware(self.fw_file, self.progress_callback) + + def init_sstv_upload(self): + if not self.sstv_files: + return + + self.disable_buttons(True) + self.img_uploader.upload_images(self.sstv_files, self.progress_callback) + +# app = QApplication(sys.argv) + +# widget = SerialConfigurator() +# widget.show() + +# sys.exit(app.exec_()) + \ No newline at end of file