Python 3 updates

python3
Mark Jessop 2022-03-19 03:50:41 +00:00
rodzic 3d29fe9d33
commit 4b65d3781b
16 zmienionych plików z 1845 dodań i 85 usunięć

Wyświetl plik

@ -17,7 +17,7 @@
import os, glob
# images should be named 1.jpg, 2.jpg, etc.
image_numbers = xrange(1,14)
image_numbers = range(1,14)
new_sizes = ["800x608","640x480","320x240"]
callsign = "VK5QI"

Wyświetl plik

@ -16,7 +16,6 @@
import serial
import Queue
import sys
import os
import datetime
@ -29,6 +28,7 @@ from time import sleep
from threading import Thread
import numpy as np
from ldpc_encoder import *
from queue import Queue
class PacketTX(object):
""" Packet Transmitter Class
@ -56,15 +56,15 @@ class PacketTX(object):
"""
# Transmit Queues.
ssdv_queue = Queue.Queue(4096) # Up to 1MB of 256 byte packets
telemetry_queue = Queue.Queue(256) # Keep this queue small. It's up to the user not to over-use this queue.
ssdv_queue = Queue(4096) # Up to 1MB of 256 byte packets
telemetry_queue = Queue(256) # Keep this queue small. It's up to the user not to over-use this queue.
# Framing parameters
unique_word = "\xab\xcd\xef\x01"
preamble = "\x55"*16
unique_word = b"\xab\xcd\xef\x01"
preamble = b"\x55"*16
# Idle sequence, transmitted if there is nothing in the transmit queues.
idle_sequence = "\x56"*256
idle_sequence = b"\x56"*256
# Transmit thread active flag.
transmit_active = False
@ -94,7 +94,7 @@ class PacketTX(object):
self.payload_length = payload_length
self.callsign = callsign
self.callsign = callsign.encode('ascii')
self.fec = fec
self.crc16 = crcmod.predefined.mkCrcFun('crc-ccitt-false')
@ -130,25 +130,25 @@ class PacketTX(object):
packet = packet[:self.payload_length]
if len(packet) < self.payload_length:
packet = packet + "\x55"*(self.payload_length - len(packet))
packet = packet + b"\x55"*(self.payload_length - len(packet))
crc = struct.pack("<H",self.crc16(packet))
if fec:
parity = ldpc_encode_string(packet + crc)
parity = ldpc_encode(packet + crc)
return self.preamble + self.unique_word + packet + crc + parity
else:
return self.preamble + self.unique_word + packet + crc
def set_idle_message(self, message):
temp_msg = "\x00" + "DE %s: \t%s" % (self.callsign, message)
temp_msg = b"\x00" + b"DE %s: \t%s" % (self.callsign, message.encode('ascii'))
self.idle_message = self.frame_packet(temp_msg,fec=self.fec)
def generate_idle_message(self):
# Append a \x00 control code before the data
return "\x00" + "DE %s: \t%s" % (self.callsign,self.idle_message)
return b"\x00" + b"DE %s: \t%s" % (self.callsign,self.idle_message)
def tx_thread(self):
@ -207,7 +207,7 @@ class PacketTX(object):
file_size = os.path.getsize(filename)
try:
f = open(filename,'rb')
for x in range(file_size/256):
for x in range(file_size//256):
data = f.read(256)
self.queue_image_packet(data)
f.close()
@ -249,7 +249,7 @@ class PacketTX(object):
if len(message) > 252:
message = message[:252]
packet = "\x00" + struct.pack(">BH",len(message),self.text_message_count) + message
packet = b"\x00" + struct.pack(">BH",len(message),self.text_message_count) + message.encode('ascii')
self.queue_telemetry_packet(packet, repeats=repeats)
log_string = "TXing Text Message #%d: %s" % (self.text_message_count,message)
@ -419,13 +419,13 @@ class PacketTX(object):
_id = int(id) % 256
# Convert the provided data to a string
_data = str(bytearray(data))
_data = bytes(bytearray(data))
# Clip to 254 bytes.
if len(_data) > 254:
_data = _data[:254]
_packet = "\x03" + struct.pack(">B",_id) + _data
_packet = b"\x03" + struct.pack(">B",_id) + _data
self.queue_telemetry_packet(_packet, repeats=repeats)
@ -438,7 +438,7 @@ class PacketTX(object):
def handle_udp_packet(self, packet):
''' Process a received UDP packet '''
try:
packet_dict = json.loads(packet)
packet_dict = json.loads(packet.decode())
if packet_dict['type'] == 'WENET_TX_TEXT':
# Transmit an arbitrary text packet.

939
tx/SX127x/LoRa.py 100755
Wyświetl plik

@ -0,0 +1,939 @@
""" Defines the SX127x class and a few utility functions. """
# Copyright 2015 Mayer Analytics Ltd.
#
# This file is part of pySX127x.
#
# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You can be released from the requirements of the license by obtaining a commercial license. Such a license is
# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your
# own applications, or shipping pySX127x with a closed source product.
#
# You should have received a copy of the GNU General Public License along with pySX127. If not, see
# <http://www.gnu.org/licenses/>.
import sys, struct
from .constants import *
################################################## Some utility functions ##############################################
def set_bit(value, index, new_bit):
""" Set the index'th bit of value to new_bit, and return the new value.
:param value: The integer to set the new_bit in
:type value: int
:param index: 0-based index
:param new_bit: New value the bit shall have (0 or 1)
:return: Changed value
:rtype: int
"""
mask = 1 << index
value &= ~mask
if new_bit:
value |= mask
return value
def getter(register_address):
""" The getter decorator reads the register content and calls the decorated function to do
post-processing.
:param register_address: Register address
:return: Register value
:rtype: int
"""
def decorator(func):
def wrapper(self):
return func(self, self.spi.xfer([register_address, 0])[1])
return wrapper
return decorator
def setter(register_address):
""" The setter decorator calls the decorated function for pre-processing and
then writes the result to the register
:param register_address: Register address
:return: New register value
:rtype: int
"""
def decorator(func):
def wrapper(self, val):
return self.spi.xfer([register_address | 0x80, func(self, val)])[1]
return wrapper
return decorator
############################################### Definition of the LoRa class ###########################################
class LoRa(object):
BOARD = None
spi = None
mode = None # the mode is backed up here
backup_registers = []
verbose = True
dio_mapping = [None] * 6 # store the dio mapping here
current_freq = 434.500
def __init__(self, hw_interface, verbose=True):
self.BOARD = hw_interface
self.spi = self.BOARD.SpiDev()
self.verbose = verbose
# set the callbacks for DIO0..5 IRQs.
self.BOARD.add_events(self._dio0, self._dio1, self._dio2, self._dio3, self._dio4, self._dio5)
# set mode to sleep and read all registers
self.set_mode(MODE.SLEEP)
self.backup_registers = self.get_all_registers()
# more setup work:
# self.rx_chain_calibration(434.) # TODO: Find out if we need this.
# the FSK registers are set up exactly as modtronix do it:
lookup_fsk = [
#[REG.FSK.LNA , 0x23],
#[REG.FSK.RX_CONFIG , 0x1E],
#[REG.FSK.RSSI_CONFIG , 0xD2],
#[REG.FSK.PREAMBLE_DETECT, 0xAA],
#[REG.FSK.OSC , 0x07],
#[REG.FSK.SYNC_CONFIG , 0x12],
#[REG.FSK.SYNC_VALUE_1 , 0xC1],
#[REG.FSK.SYNC_VALUE_2 , 0x94],
#[REG.FSK.SYNC_VALUE_3 , 0xC1],
#[REG.FSK.PACKET_CONFIG_1, 0xD8],
#[REG.FSK.FIFO_THRESH , 0x8F],
#[REG.FSK.IMAGE_CAL , 0x02],
#[REG.FSK.DIO_MAPPING_1 , 0x00],
#[REG.FSK.DIO_MAPPING_2 , 0x30]
]
#self.set_mode(MODE.FSK_STDBY)
#for register_address, value in lookup_fsk:
# self.set_register(register_address, value)
#self.set_mode(MODE.SLEEP)
# set the dio_ mapping by calling the two get_dio_mapping_* functions
self.get_dio_mapping_1()
self.get_dio_mapping_2()
# Overridable functions:
def on_rx_done(self):
pass
def on_tx_done(self):
pass
def on_cad_done(self):
pass
def on_rx_timeout(self):
pass
def on_valid_header(self):
pass
def on_payload_crc_error(self):
pass
def on_fhss_change_channel(self):
pass
# Internal callbacks for add_events()
def _dio0(self, channel):
# DIO0 00: RxDone
# DIO0 01: TxDone
# DIO0 10: CadDone
if self.dio_mapping[0] == 0:
self.on_rx_done()
elif self.dio_mapping[0] == 1:
self.on_tx_done()
elif self.dio_mapping[0] == 2:
self.on_cad_done()
else:
raise RuntimeError("unknown dio0mapping!")
def _dio1(self, channel):
# DIO1 00: RxTimeout
# DIO1 01: FhssChangeChannel
# DIO1 10: CadDetected
if self.dio_mapping[1] == 0:
self.on_rx_timeout()
elif self.dio_mapping[1] == 1:
self.on_fhss_change_channel()
elif self.dio_mapping[1] == 2:
self.on_CadDetected()
else:
raise RuntimeError("unknown dio1mapping!")
def _dio2(self, channel):
# DIO2 00: FhssChangeChannel
# DIO2 01: FhssChangeChannel
# DIO2 10: FhssChangeChannel
self.on_fhss_change_channel()
def _dio3(self, channel):
# DIO3 00: CadDone
# DIO3 01: ValidHeader
# DIO3 10: PayloadCrcError
if self.dio_mapping[3] == 0:
self.on_cad_done()
elif self.dio_mapping[3] == 1:
self.on_valid_header()
elif self.dio_mapping[3] == 2:
self.on_payload_crc_error()
else:
raise RuntimeError("unknown dio3 mapping!")
def _dio4(self, channel):
raise RuntimeError("DIO4 is not used")
def _dio5(self, channel):
raise RuntimeError("DIO5 is not used")
# All the set/get/read/write functions
def get_mode(self):
""" Get the mode
:return: New mode
"""
self.mode = self.spi.xfer([REG.LORA.OP_MODE, 0])[1]
return self.mode
def set_mode(self, mode):
""" Set the mode
:param mode: Set the mode. Use constants.MODE class
:return: New mode
"""
# the mode is backed up in self.mode
if mode == self.mode:
return mode
if self.verbose:
sys.stderr.write("Mode <- %s\n" % MODE.lookup[mode])
self.mode = mode
return self.spi.xfer([REG.LORA.OP_MODE | 0x80, mode])[1]
def write_payload(self, payload):
""" Get FIFO ready for TX: Set FifoAddrPtr to FifoTxBaseAddr. The transceiver is put into STDBY mode.
:param payload: Payload to write (list)
:return: Written payload
"""
self.set_mode(MODE.STDBY)
base_addr = self.get_fifo_tx_base_addr()
self.set_fifo_addr_ptr(base_addr)
return self.spi.xfer([REG.LORA.FIFO | 0x80] + payload)[1:]
def reset_ptr_rx(self):
""" Get FIFO ready for RX: Set FifoAddrPtr to FifoRxBaseAddr. The transceiver is put into STDBY mode. """
self.set_mode(MODE.STDBY)
base_addr = self.get_fifo_rx_base_addr()
self.set_fifo_addr_ptr(base_addr)
def rx_is_good(self):
""" Check the IRQ flags for RX errors
:return: True if no errors
:rtype: bool
"""
flags = self.get_irq_flags()
return not any([flags[s] for s in ['valid_header', 'crc_error', 'rx_done', 'rx_timeout']])
def read_payload(self , nocheck = False):
""" Read the payload from FIFO
:param nocheck: If True then check rx_is_good()
:return: Payload
:rtype: list[int]
"""
if not nocheck and not self.rx_is_good():
return None
rx_nb_bytes = self.get_rx_nb_bytes()
fifo_rx_current_addr = self.get_fifo_rx_current_addr()
self.set_fifo_addr_ptr(fifo_rx_current_addr)
payload = self.spi.xfer([REG.LORA.FIFO] + [0] * rx_nb_bytes)[1:]
return payload
def get_freq(self):
""" Get the frequency (MHz)
:return: Frequency in MHz
:rtype: float
"""
msb, mid, lsb = self.spi.xfer([REG.LORA.FR_MSB, 0, 0, 0])[1:]
f = lsb + 256*(mid + 256*msb)
return f / 16384.
def set_freq(self, f):
""" Set the frequency (MHz)
:param f: Frequency in MHz
"type f: float
:return: New register settings (3 bytes [msb, mid, lsb])
:rtype: list[int]
"""
assert self.mode == MODE.SLEEP or self.mode == MODE.STDBY or self.mode == MODE.FSK_STDBY
i = int(f * 16384.) # choose floor
msb = i // 65536
i -= msb * 65536
mid = i // 256
i -= mid * 256
lsb = i
self.current_freq = f
return self.spi.xfer([REG.LORA.FR_MSB | 0x80, msb, mid, lsb])
def get_pa_config(self, convert_dBm=False):
v = self.spi.xfer([REG.LORA.PA_CONFIG, 0])[1]
pa_select = v >> 7
max_power = v >> 4 & 0b111
output_power = v & 0b1111
if convert_dBm:
max_power = max_power * .6 + 10.8
output_power = max_power - (15 - output_power)
return dict(
pa_select = pa_select,
max_power = max_power,
output_power = output_power
)
def set_pa_config(self, pa_select=None, max_power=None, output_power=None):
""" Configure the PA
:param pa_select: Selects PA output pin, 0->RFO, 1->PA_BOOST
:param max_power: Select max output power Pmax=10.8+0.6*MaxPower
:param output_power: Output power Pout=Pmax-(15-OutputPower) if PaSelect = 0,
Pout=17-(15-OutputPower) if PaSelect = 1 (PA_BOOST pin)
:return: new register value
"""
loc = locals()
current = self.get_pa_config()
loc = {s: current[s] if loc[s] is None else loc[s] for s in loc}
val = (loc['pa_select'] << 7) | (loc['max_power'] << 4) | (loc['output_power'])
return self.spi.xfer([REG.LORA.PA_CONFIG | 0x80, val])[1]
@getter(REG.LORA.PA_RAMP)
def get_pa_ramp(self, val):
return val & 0b1111
@setter(REG.LORA.PA_RAMP)
def set_pa_ramp(self, val):
return val & 0b1111
def get_ocp(self, convert_mA=False):
v = self.spi.xfer([REG.LORA.OCP, 0])[1]
ocp_on = v >> 5 & 0x01
ocp_trim = v & 0b11111
if convert_mA:
if ocp_trim <= 15:
ocp_trim = 45. + 5. * ocp_trim
elif ocp_trim <= 27:
ocp_trim = -30. + 10. * ocp_trim
else:
assert ocp_trim <= 27
return dict(
ocp_on = ocp_on,
ocp_trim = ocp_trim
)
def set_ocp_trim(self, I_mA):
assert(I_mA >= 45 and I_mA <= 240)
ocp_on = self.spi.xfer([REG.LORA.OCP, 0])[1] >> 5 & 0x01
if I_mA <= 120:
v = int(round((I_mA-45.)/5.))
else:
v = int(round((I_mA+30.)/10.))
v = set_bit(v, 5, ocp_on)
return self.spi.xfer([REG.LORA.OCP | 0x80, v])[1]
def get_lna(self):
v = self.spi.xfer([REG.LORA.LNA, 0])[1]
return dict(
lna_gain = v >> 5,
lna_boost_lf = v >> 3 & 0b11,
lna_boost_hf = v & 0b11
)
def set_lna(self, lna_gain=None, lna_boost_lf=None, lna_boost_hf=None):
assert lna_boost_hf is None or lna_boost_hf == 0b00 or lna_boost_hf == 0b11
self.set_mode(MODE.STDBY)
if lna_gain is not None:
# Apparently agc_auto_on must be 0 in order to set lna_gain
self.set_agc_auto_on(lna_gain == GAIN.NOT_USED)
loc = locals()
current = self.get_lna()
loc = {s: current[s] if loc[s] is None else loc[s] for s in loc}
val = (loc['lna_gain'] << 5) | (loc['lna_boost_lf'] << 3) | (loc['lna_boost_hf'])
retval = self.spi.xfer([REG.LORA.LNA | 0x80, val])[1]
if lna_gain is not None:
# agc_auto_on must track lna_gain: GAIN=NOT_USED -> agc_auto=ON, otherwise =OFF
self.set_agc_auto_on(lna_gain == GAIN.NOT_USED)
return retval
def set_lna_gain(self, lna_gain):
self.set_lna(lna_gain=lna_gain)
def get_fifo_addr_ptr(self):
return self.spi.xfer([REG.LORA.FIFO_ADDR_PTR, 0])[1]
def set_fifo_addr_ptr(self, ptr):
return self.spi.xfer([REG.LORA.FIFO_ADDR_PTR | 0x80, ptr])[1]
def get_fifo_tx_base_addr(self):
return self.spi.xfer([REG.LORA.FIFO_TX_BASE_ADDR, 0])[1]
def set_fifo_tx_base_addr(self, ptr):
return self.spi.xfer([REG.LORA.FIFO_TX_BASE_ADDR | 0x80, ptr])[1]
def get_fifo_rx_base_addr(self):
return self.spi.xfer([REG.LORA.FIFO_RX_BASE_ADDR, 0])[1]
def set_fifo_rx_base_addr(self, ptr):
return self.spi.xfer([REG.LORA.FIFO_RX_BASE_ADDR | 0x80, ptr])[1]
def get_fifo_rx_current_addr(self):
return self.spi.xfer([REG.LORA.FIFO_RX_CURR_ADDR, 0])[1]
def get_fifo_rx_byte_addr(self):
return self.spi.xfer([REG.LORA.FIFO_RX_BYTE_ADDR, 0])[1]
def get_irq_flags_mask(self):
v = self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK, 0])[1]
return dict(
rx_timeout = v >> 7 & 0x01,
rx_done = v >> 6 & 0x01,
crc_error = v >> 5 & 0x01,
valid_header = v >> 4 & 0x01,
tx_done = v >> 3 & 0x01,
cad_done = v >> 2 & 0x01,
fhss_change_ch = v >> 1 & 0x01,
cad_detected = v >> 0 & 0x01,
)
def set_irq_flags_mask(self,
rx_timeout=None, rx_done=None, crc_error=None, valid_header=None, tx_done=None,
cad_done=None, fhss_change_ch=None, cad_detected=None):
loc = locals()
v = self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK, 0])[1]
for i, s in enumerate(['cad_detected', 'fhss_change_ch', 'cad_done', 'tx_done', 'valid_header',
'crc_error', 'rx_done', 'rx_timeout']):
this_bit = locals()[s]
if this_bit is not None:
v = set_bit(v, i, this_bit)
return self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK | 0x80, v])[1]
def get_irq_flags(self):
v = self.spi.xfer([REG.LORA.IRQ_FLAGS, 0])[1]
return dict(
rx_timeout = v >> 7 & 0x01,
rx_done = v >> 6 & 0x01,
crc_error = v >> 5 & 0x01,
valid_header = v >> 4 & 0x01,
tx_done = v >> 3 & 0x01,
cad_done = v >> 2 & 0x01,
fhss_change_ch = v >> 1 & 0x01,
cad_detected = v >> 0 & 0x01,
)
def set_irq_flags(self,
rx_timeout=None, rx_done=None, crc_error=None, valid_header=None, tx_done=None,
cad_done=None, fhss_change_ch=None, cad_detected=None):
v = self.spi.xfer([REG.LORA.IRQ_FLAGS, 0])[1]
for i, s in enumerate(['cad_detected', 'fhss_change_ch', 'cad_done', 'tx_done', 'valid_header',
'crc_error', 'rx_done', 'rx_timeout']):
this_bit = locals()[s]
if this_bit is not None:
v = set_bit(v, i, this_bit)
return self.spi.xfer([REG.LORA.IRQ_FLAGS | 0x80, v])[1]
def clear_irq_flags(self):
v = self.spi.xfer([REG.LORA.IRQ_FLAGS | 0x80, 0])[1]
return v
def get_rx_nb_bytes(self):
return self.spi.xfer([REG.LORA.RX_NB_BYTES, 0])[1]
def get_rx_header_cnt(self):
msb, lsb = self.spi.xfer([REG.LORA.RX_HEADER_CNT_MSB, 0, 0])[1:]
return lsb + 256 * msb
def get_rx_packet_cnt(self):
msb, lsb = self.spi.xfer([REG.LORA.RX_PACKET_CNT_MSB, 0, 0])[1:]
return lsb + 256 * msb
def get_modem_status(self):
status = self.spi.xfer([REG.LORA.MODEM_STAT, 0])[1]
return dict(
rx_coding_rate = status >> 5 & 0x03,
modem_clear = status >> 4 & 0x01,
header_info_valid = status >> 3 & 0x01,
rx_ongoing = status >> 2 & 0x01,
signal_sync = status >> 1 & 0x01,
signal_detected = status >> 0 & 0x01
)
def get_pkt_snr_value(self):
v = self.spi.xfer([REG.LORA.PKT_SNR_VALUE, 0])[1]
return struct.unpack('b',str(bytearray([v])))[0]/4.
def get_pkt_rssi_value(self):
v = self.spi.xfer([REG.LORA.PKT_RSSI_VALUE, 0])[1]
if self.current_freq>525.0:
return v - 157
else:
return v - 164
def get_rssi_value(self):
v = self.spi.xfer([REG.LORA.RSSI_VALUE, 0])[1]
if self.current_freq>525.0:
return v - 157
else:
return v - 164
def get_hop_channel(self):
v = self.spi.xfer([REG.LORA.HOP_CHANNEL, 0])[1]
return dict(
pll_timeout = v >> 7,
crc_on_payload = v >> 6 & 0x01,
fhss_present_channel = v >> 5 & 0b111111
)
def get_modem_config_1(self):
val = self.spi.xfer([REG.LORA.MODEM_CONFIG_1, 0])[1]
return dict(
bw = val >> 4 & 0x0F,
coding_rate = val >> 1 & 0x07,
implicit_header_mode = val & 0x01
)
def set_modem_config_1(self, bw=None, coding_rate=None, implicit_header_mode=None):
loc = locals()
current = self.get_modem_config_1()
loc = {s: current[s] if loc[s] is None else loc[s] for s in loc}
val = loc['implicit_header_mode'] | (loc['coding_rate'] << 1) | (loc['bw'] << 4)
return self.spi.xfer([REG.LORA.MODEM_CONFIG_1 | 0x80, val])[1]
def set_bw(self, bw):
""" Set the bandwidth 0=7.8kHz ... 9=500kHz
:param bw: A number 0,2,3,...,9
:return:
"""
self.set_modem_config_1(bw=bw)
def set_coding_rate(self, coding_rate):
""" Set the coding rate 4/5, 4/6, 4/7, 4/8
:param coding_rate: A number 1,2,3,4
:return: New register value
"""
self.set_modem_config_1(coding_rate=coding_rate)
def set_implicit_header_mode(self, implicit_header_mode):
self.set_modem_config_1(implicit_header_mode=implicit_header_mode)
def get_modem_config_2(self, include_symb_timout_lsb=False):
val = self.spi.xfer([REG.LORA.MODEM_CONFIG_2, 0])[1]
d = dict(
spreading_factor = val >> 4 & 0x0F,
tx_cont_mode = val >> 3 & 0x01,
rx_crc = val >> 2 & 0x01,
)
if include_symb_timout_lsb:
d['symb_timout_lsb'] = val & 0x03
return d
def set_modem_config_2(self, spreading_factor=None, tx_cont_mode=None, rx_crc=None):
loc = locals()
# RegModemConfig2 contains the SymbTimout MSB bits. We tack the back on when writing this register.
current = self.get_modem_config_2(include_symb_timout_lsb=True)
loc = {s: current[s] if loc[s] is None else loc[s] for s in loc}
val = (loc['spreading_factor'] << 4) | (loc['tx_cont_mode'] << 3) | (loc['rx_crc'] << 2) | current['symb_timout_lsb']
return self.spi.xfer([REG.LORA.MODEM_CONFIG_2 | 0x80, val])[1]
def set_spreading_factor(self, spreading_factor):
self.set_modem_config_2(spreading_factor=spreading_factor)
def set_rx_crc(self, rx_crc):
self.set_modem_config_2(rx_crc=rx_crc)
def get_modem_config_3(self):
val = self.spi.xfer([REG.LORA.MODEM_CONFIG_3, 0])[1]
return dict(
low_data_rate_optim = val >> 3 & 0x01,
agc_auto_on = val >> 2 & 0x01
)
def set_modem_config_3(self, low_data_rate_optim=None, agc_auto_on=None):
loc = locals()
current = self.get_modem_config_3()
loc = {s: current[s] if loc[s] is None else loc[s] for s in loc}
val = (loc['low_data_rate_optim'] << 3) | (loc['agc_auto_on'] << 2)
return self.spi.xfer([REG.LORA.MODEM_CONFIG_3 | 0x80, val])[1]
@setter(REG.LORA.INVERT_IQ)
def set_invert_iq(self, invert):
""" Invert the LoRa I and Q signals
:param invert: 0: normal mode, 1: I and Q inverted
:return: New value of register
"""
return 0x27 | (invert & 0x01) << 6
@getter(REG.LORA.INVERT_IQ)
def get_invert_iq(self, val):
""" Get the invert the I and Q setting
:return: 0: normal mode, 1: I and Q inverted
"""
return (val >> 6) & 0x01
def get_agc_auto_on(self):
return self.get_modem_config_3()['agc_auto_on']
def set_agc_auto_on(self, agc_auto_on):
self.set_modem_config_3(agc_auto_on=agc_auto_on)
def get_low_data_rate_optim(self):
return self.set_modem_config_3()['low_data_rate_optim']
def set_low_data_rate_optim(self, low_data_rate_optim):
self.set_modem_config_3(low_data_rate_optim=low_data_rate_optim)
def get_symb_timeout(self):
SYMB_TIMEOUT_MSB = REG.LORA.MODEM_CONFIG_2
msb, lsb = self.spi.xfer([SYMB_TIMEOUT_MSB, 0, 0])[1:] # the MSB bits are stored in REG.LORA.MODEM_CONFIG_2
msb = msb & 0b11
return lsb + 256 * msb
def set_symb_timeout(self, timeout):
bkup_reg_modem_config_2 = self.spi.xfer([REG.LORA.MODEM_CONFIG_2, 0])[1]
msb = timeout >> 8 & 0b11 # bits 8-9
lsb = timeout - 256 * msb # bits 0-7
reg_modem_config_2 = bkup_reg_modem_config_2 & 0xFC | msb # bits 2-7 of bkup_reg_modem_config_2 ORed with the two msb bits
old_msb = self.spi.xfer([REG.LORA.MODEM_CONFIG_2 | 0x80, reg_modem_config_2])[1] & 0x03
old_lsb = self.spi.xfer([REG.LORA.SYMB_TIMEOUT_LSB | 0x80, lsb])[1]
return old_lsb + 256 * old_msb
def get_preamble(self):
msb, lsb = self.spi.xfer([REG.LORA.PREAMBLE_MSB, 0, 0])[1:]
return lsb + 256 * msb
def set_preamble(self, preamble):
msb = preamble >> 8
lsb = preamble - msb * 256
old_msb, old_lsb = self.spi.xfer([REG.LORA.PREAMBLE_MSB | 0x80, msb, lsb])[1:]
return old_lsb + 256 * old_msb
@getter(REG.LORA.PAYLOAD_LENGTH)
def get_payload_length(self, val):
return val
@setter(REG.LORA.PAYLOAD_LENGTH)
def set_payload_length(self, payload_length):
return payload_length
@getter(REG.LORA.MAX_PAYLOAD_LENGTH)
def get_max_payload_length(self, val):
return val
@setter(REG.LORA.MAX_PAYLOAD_LENGTH)
def set_max_payload_length(self, max_payload_length):
return max_payload_length
@getter(REG.LORA.HOP_PERIOD)
def get_hop_period(self, val):
return val
@setter(REG.LORA.HOP_PERIOD)
def set_hop_period(self, hop_period):
return hop_period
def get_fei(self):
msb, mid, lsb = self.spi.xfer([REG.LORA.FEI_MSB, 0, 0, 0])[1:]
msb &= 0x0F
freq_error = lsb + 256 * (mid + 256 * msb)
return freq_error
@getter(REG.LORA.DETECT_OPTIMIZE)
def get_detect_optimize(self, val):
""" Get LoRa detection optimize setting
:return: detection optimize setting 0x03: SF7-12, 0x05: SF6
"""
return val & 0b111
@setter(REG.LORA.DETECT_OPTIMIZE)
def set_detect_optimize(self, detect_optimize):
""" Set LoRa detection optimize
:param detect_optimize 0x03: SF7-12, 0x05: SF6
:return: New register value
"""
assert detect_optimize == 0x03 or detect_optimize == 0x05
return detect_optimize & 0b111
@getter(REG.LORA.DETECTION_THRESH)
def get_detection_threshold(self, val):
""" Get LoRa detection threshold setting
:return: detection threshold 0x0A: SF7-12, 0x0C: SF6
"""
return val
@setter(REG.LORA.DETECTION_THRESH)
def set_detection_threshold(self, detect_threshold):
""" Set LoRa detection optimize
:param detect_threshold 0x0A: SF7-12, 0x0C: SF6
:return: New register value
"""
assert detect_threshold == 0x0A or detect_threshold == 0x0C
return detect_threshold
@getter(REG.LORA.SYNC_WORD)
def get_sync_word(self, sync_word):
return sync_word
@setter(REG.LORA.SYNC_WORD)
def set_sync_word(self, sync_word):
return sync_word
@getter(REG.LORA.DIO_MAPPING_1)
def get_dio_mapping_1(self, mapping):
""" Get mapping of pins DIO0 to DIO3. Object variable dio_mapping will be set.
:param mapping: Register value
:type mapping: int
:return: Value of the mapping list
:rtype: list[int]
"""
self.dio_mapping = [mapping>>6 & 0x03, mapping>>4 & 0x03, mapping>>2 & 0x03, mapping>>0 & 0x03] \
+ self.dio_mapping[4:6]
return self.dio_mapping
@setter(REG.LORA.DIO_MAPPING_1)
def set_dio_mapping_1(self, mapping):
""" Set mapping of pins DIO0 to DIO3. Object variable dio_mapping will be set.
:param mapping: Register value
:type mapping: int
:return: New value of the register
:rtype: int
"""
self.dio_mapping = [mapping>>6 & 0x03, mapping>>4 & 0x03, mapping>>2 & 0x03, mapping>>0 & 0x03] \
+ self.dio_mapping[4:6]
return mapping
@getter(REG.LORA.DIO_MAPPING_2)
def get_dio_mapping_2(self, mapping):
""" Get mapping of pins DIO4 to DIO5. Object variable dio_mapping will be set.
:param mapping: Register value
:type mapping: int
:return: Value of the mapping list
:rtype: list[int]
"""
self.dio_mapping = self.dio_mapping[0:4] + [mapping>>6 & 0x03, mapping>>4 & 0x03]
return self.dio_mapping
@setter(REG.LORA.DIO_MAPPING_2)
def set_dio_mapping_2(self, mapping):
""" Set mapping of pins DIO4 to DIO5. Object variable dio_mapping will be set.
:param mapping: Register value
:type mapping: int
:return: New value of the register
:rtype: int
"""
assert mapping & 0b00001110 == 0
self.dio_mapping = self.dio_mapping[0:4] + [mapping>>6 & 0x03, mapping>>4 & 0x03]
return mapping
def get_dio_mapping(self):
""" Utility function that returns the list of current DIO mappings. Object variable dio_mapping will be set.
:return: List of current DIO mappings
:rtype: list[int]
"""
self.get_dio_mapping_1()
return self.get_dio_mapping_2()
def set_dio_mapping(self, mapping):
""" Utility function that returns the list of current DIO mappings. Object variable dio_mapping will be set.
:param mapping: DIO mapping list
:type mapping: list[int]
:return: New DIO mapping list
:rtype: list[int]
"""
mapping_1 = (mapping[0] & 0x03) << 6 | (mapping[1] & 0x03) << 4 | (mapping[2] & 0x3) << 2 | mapping[3] & 0x3
mapping_2 = (mapping[4] & 0x03) << 6 | (mapping[5] & 0x03) << 4
self.set_dio_mapping_1(mapping_1)
return self.set_dio_mapping_2(mapping_2)
@getter(REG.LORA.VERSION)
def get_version(self, version):
""" Version code of the chip.
Bits 7-4 give the full revision number; bits 3-0 give the metal mask revision number.
:return: Version code
:rtype: int
"""
return version
@getter(REG.LORA.TCXO)
def get_tcxo(self, tcxo):
""" Get TCXO or XTAL input setting
0 -> "XTAL": Crystal Oscillator with external Crystal
1 -> "TCXO": External clipped sine TCXO AC-connected to XTA pin
:param tcxo: 1=TCXO or 0=XTAL input setting
:return: TCXO or XTAL input setting
:type: int (0 or 1)
"""
return tcxo & 0b00010000
@setter(REG.LORA.TCXO)
def set_tcxo(self, tcxo):
""" Make TCXO or XTAL input setting.
0 -> "XTAL": Crystal Oscillator with external Crystal
1 -> "TCXO": External clipped sine TCXO AC-connected to XTA pin
:param tcxo: 1=TCXO or 0=XTAL input setting
:return: new TCXO or XTAL input setting
"""
return (tcxo >= 1) << 4 | 0x09 # bits 0-3 must be 0b1001
@getter(REG.LORA.PA_DAC)
def get_pa_dac(self, pa_dac):
""" Enables the +20dBm option on PA_BOOST pin
False -> Default value
True -> +20dBm on PA_BOOST when OutputPower=1111
:return: True/False if +20dBm option on PA_BOOST on/off
:rtype: bool
"""
pa_dac &= 0x07 # only bits 0-2
if pa_dac == 0x04:
return False
elif pa_dac == 0x07:
return True
else:
raise RuntimeError("Bad PA_DAC value %s" % hex(pa_dac))
@setter(REG.LORA.PA_DAC)
def set_pa_dac(self, pa_dac):
""" Enables the +20dBm option on PA_BOOST pin
False -> Default value
True -> +20dBm on PA_BOOST when OutputPower=1111
:param pa_dac: 1/0 if +20dBm option on PA_BOOST on/off
:return: New pa_dac register value
:rtype: int
"""
return 0x87 if pa_dac else 0x84
def rx_chain_calibration(self, freq=868.):
""" Run the image calibration (see Semtech documentation section 4.2.3.8)
:param freq: Frequency for the HF calibration
:return: None
"""
# backup some registers
op_mode_bkup = self.get_mode()
pa_config_bkup = self.get_register(REG.LORA.PA_CONFIG)
freq_bkup = self.get_freq()
# for image calibration device must be in FSK standby mode
self.set_mode(MODE.FSK_STDBY)
# cut the PA
self.set_register(REG.LORA.PA_CONFIG, 0x00)
# calibration for the LF band
image_cal = (self.get_register(REG.FSK.IMAGE_CAL) & 0xBF) | 0x40
self.set_register(REG.FSK.IMAGE_CAL, image_cal)
while (self.get_register(REG.FSK.IMAGE_CAL) & 0x20) == 0x20:
pass
# Set a Frequency in HF band
self.set_freq(freq)
# calibration for the HF band
image_cal = (self.get_register(REG.FSK.IMAGE_CAL) & 0xBF) | 0x40
self.set_register(REG.FSK.IMAGE_CAL, image_cal)
while (self.get_register(REG.FSK.IMAGE_CAL) & 0x20) == 0x20:
pass
# put back the saved parameters
self.set_mode(op_mode_bkup)
self.set_register(REG.LORA.PA_CONFIG, pa_config_bkup)
self.set_freq(freq_bkup)
def dump_registers(self):
""" Returns a list of [reg_addr, reg_name, reg_value] tuples. Chip is put into mode SLEEP.
:return: List of [reg_addr, reg_name, reg_value] tuples
:rtype: list[tuple]
"""
self.set_mode(MODE.SLEEP)
values = self.get_all_registers()
skip_set = set([REG.LORA.FIFO])
result_list = []
for i, s in REG.LORA.lookup.iteritems():
if i in skip_set:
continue
v = values[i]
result_list.append((i, s, v))
return result_list
def get_register(self, register_address):
return self.spi.xfer([register_address & 0x7F, 0])[1]
def set_register(self, register_address, val):
return self.spi.xfer([register_address | 0x80, val])[1]
def get_all_registers(self):
# read all registers
reg = [0] + self.spi.xfer([1]+[0]*0x3E)[1:]
self.mode = reg[1]
return reg
def __del__(self):
self.set_mode(MODE.SLEEP)
if self.verbose:
sys.stderr.write("MODE=SLEEP\n")
def __str__(self):
# don't use __str__ while in any mode other that SLEEP or STDBY
assert(self.mode == MODE.SLEEP or self.mode == MODE.STDBY)
onoff = lambda i: 'ON' if i else 'OFF'
f = self.get_freq()
cfg1 = self.get_modem_config_1()
cfg2 = self.get_modem_config_2()
cfg3 = self.get_modem_config_3()
pa_config = self.get_pa_config(convert_dBm=True)
ocp = self.get_ocp(convert_mA=True)
lna = self.get_lna()
s = "SX127x LoRa registers:\n"
s += " mode %s\n" % MODE.lookup[self.get_mode()]
s += " freq %f MHz\n" % f
s += " coding_rate %s\n" % CODING_RATE.lookup[cfg1['coding_rate']]
s += " bw %s\n" % BW.lookup[cfg1['bw']]
s += " spreading_factor %s chips/symb\n" % (1 << cfg2['spreading_factor'])
s += " implicit_hdr_mode %s\n" % onoff(cfg1['implicit_header_mode'])
s += " rx_payload_crc %s\n" % onoff(cfg2['rx_crc'])
s += " tx_cont_mode %s\n" % onoff(cfg2['tx_cont_mode'])
s += " preamble %d\n" % self.get_preamble()
s += " low_data_rate_opti %s\n" % onoff(cfg3['low_data_rate_optim'])
s += " agc_auto_on %s\n" % onoff(cfg3['agc_auto_on'])
s += " symb_timeout %s\n" % self.get_symb_timeout()
s += " freq_hop_period %s\n" % self.get_hop_period()
s += " hop_channel %s\n" % self.get_hop_channel()
s += " payload_length %s\n" % self.get_payload_length()
s += " max_payload_length %s\n" % self.get_max_payload_length()
s += " irq_flags_mask %s\n" % self.get_irq_flags_mask()
s += " irq_flags %s\n" % self.get_irq_flags()
s += " rx_nb_byte %d\n" % self.get_rx_nb_bytes()
s += " rx_header_cnt %d\n" % self.get_rx_header_cnt()
s += " rx_packet_cnt %d\n" % self.get_rx_packet_cnt()
s += " pkt_snr_value %f\n" % self.get_pkt_snr_value()
s += " pkt_rssi_value %d\n" % self.get_pkt_rssi_value()
s += " rssi_value %d\n" % self.get_rssi_value()
s += " fei %d\n" % self.get_fei()
s += " pa_select %s\n" % PA_SELECT.lookup[pa_config['pa_select']]
s += " max_power %f dBm\n" % pa_config['max_power']
s += " output_power %f dBm\n" % pa_config['output_power']
s += " ocp %s\n" % onoff(ocp['ocp_on'])
s += " ocp_trim %f mA\n" % ocp['ocp_trim']
s += " lna_gain %s\n" % GAIN.lookup[lna['lna_gain']]
s += " lna_boost_lf %s\n" % bin(lna['lna_boost_lf'])
s += " lna_boost_hf %s\n" % bin(lna['lna_boost_hf'])
s += " detect_optimize %#02x\n" % self.get_detect_optimize()
s += " detection_thresh %#02x\n" % self.get_detection_threshold()
s += " sync_word %#02x\n" % self.get_sync_word()
s += " dio_mapping 0..5 %s\n" % self.get_dio_mapping()
s += " tcxo %s\n" % ['XTAL', 'TCXO'][self.get_tcxo()]
s += " pa_dac %s\n" % ['default', 'PA_BOOST'][self.get_pa_dac()]
s += " fifo_addr_ptr %#02x\n" % self.get_fifo_addr_ptr()
s += " fifo_tx_base_addr %#02x\n" % self.get_fifo_tx_base_addr()
s += " fifo_rx_base_addr %#02x\n" % self.get_fifo_rx_base_addr()
s += " fifo_rx_curr_addr %#02x\n" % self.get_fifo_rx_current_addr()
s += " fifo_rx_byte_addr %#02x\n" % self.get_fifo_rx_byte_addr()
s += " status %s\n" % self.get_modem_status()
s += " version %#02x\n" % self.get_version()
return s

Wyświetl plik

@ -0,0 +1,75 @@
""" Defines LoRaArgumentParser which extends argparse.ArgumentParser with standard config parameters for the SX127x. """
# Copyright 2015 Mayer Analytics Ltd.
#
# This file is part of pySX127x.
#
# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You can be released from the requirements of the license by obtaining a commercial license. Such a license is
# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your
# own applications, or shipping pySX127x with a closed source product.
#
# You should have received a copy of the GNU General Public License along with pySX127. If not, see
# <http://www.gnu.org/licenses/>.
import argparse
class LoRaArgumentParser(argparse.ArgumentParser):
""" This class extends argparse.ArgumentParser.
Some commonly used LoRa config parameters are defined
* ocp
* spreading factor
* frequency
* bandwidth
* preamble
Call the parse_args with an additional parameter referencing a LoRa object. The args will be used to configure
the LoRa.
"""
bw_lookup = dict(BW7_8=0, BW10_4=1, BW15_6=2, BW20_8=3, BW31_25=4, BW41_7=5, BW62_5=6, BW125=7, BW250=8, BW500=9)
cr_lookup = dict(CR4_5=1, CR4_6=2,CR4_7=3,CR4_8=4)
def __init__(self, description):
argparse.ArgumentParser.__init__(self, description=description)
self.add_argument('--ocp', '-c', dest='ocp', default=100, action="store", type=float,
help="Over current protection in mA (45 .. 240 mA)")
self.add_argument('--sf', '-s', dest='sf', default=7, action="store", type=int,
help="Spreading factor (6...12). Default is 7.")
self.add_argument('--freq', '-f', dest='freq', default=869., action="store", type=float,
help="Frequency")
self.add_argument('--bw', '-b', dest='bw', default='BW125', action="store", type=str,
help="Bandwidth (one of BW7_8 BW10_4 BW15_6 BW20_8 BW31_25 BW41_7 BW62_5 BW125 BW250 BW500).\nDefault is BW125.")
self.add_argument('--cr', '-r', dest='coding_rate', default='CR4_5', action="store", type=str,
help="Coding rate (one of CR4_5 CR4_6 CR4_7 CR4_8).\nDefault is CR4_5.")
self.add_argument('--preamble', '-p', dest='preamble', default=8, action="store", type=int,
help="Preamble length. Default is 8.")
def parse_args(self, lora):
""" Parse the args, perform some sanity checks and configure the LoRa accordingly.
:param lora: Reference to LoRa object
:return: args
"""
args = argparse.ArgumentParser.parse_args(self)
args.bw = self.bw_lookup.get(args.bw, None)
args.coding_rate = self.cr_lookup.get(args.coding_rate, None)
# some sanity checks
assert(args.bw is not None)
assert(args.coding_rate is not None)
assert(args.sf >=6 and args.sf <= 12)
# set the LoRa object
lora.set_freq(args.freq)
lora.set_preamble(args.preamble)
lora.set_spreading_factor(args.sf)
lora.set_bw(args.bw)
lora.set_coding_rate(args.coding_rate)
lora.set_ocp_trim(args.ocp)
return args

Wyświetl plik

@ -0,0 +1 @@
__all__ = ['SX127x']

Wyświetl plik

@ -0,0 +1,124 @@
""" Defines the BOARD class that contains the board pin mappings. """
# Copyright 2015 Mayer Analytics Ltd.
#
# This file is part of pySX127x.
#
# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You can be released from the requirements of the license by obtaining a commercial license. Such a license is
# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your
# own applications, or shipping pySX127x with a closed source product.
#
# You should have received a copy of the GNU General Public License along with pySX127. If not, see
# <http://www.gnu.org/licenses/>.
import RPi.GPIO as GPIO
import spidev
import time
class BOARD:
""" Board initialisation/teardown and pin configuration is kept here.
This is the Raspberry Pi board with one LED and a modtronix inAir9B
"""
# Note that the BCOM numbering for the GPIOs is used.
DIO0 = 22 # RaspPi GPIO 21
DIO1 = 23 # RaspPi GPIO 22
DIO2 = 24 # RaspPi GPIO 23
DIO3 = 25 # RaspPi GPIO 24
LED = 18 # RaspPi GPIO 18 connects to the LED on the proto shield
SWITCH = 4 # RaspPi GPIO 4 connects to a switch
# The spi object is kept here
spi = None
@staticmethod
def setup():
""" Configure the Raspberry GPIOs
:rtype : None
"""
GPIO.setmode(GPIO.BCM)
# LED
GPIO.setup(BOARD.LED, GPIO.OUT)
GPIO.output(BOARD.LED, 0)
# switch
GPIO.setup(BOARD.SWITCH, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# DIOx
for gpio_pin in [BOARD.DIO0, BOARD.DIO1, BOARD.DIO2, BOARD.DIO3]:
GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# blink 2 times to signal the board is set up
BOARD.blink(.1, 2)
@staticmethod
def teardown():
""" Cleanup GPIO and SpiDev """
GPIO.cleanup()
BOARD.spi.close()
@staticmethod
def SpiDev():
""" Init and return the SpiDev object
:return: SpiDev object
:rtype: SpiDev
"""
BOARD.spi = spidev.SpiDev()
BOARD.spi.open(0, 0)
return BOARD.spi
@staticmethod
def add_event_detect(dio_number, callback):
""" Wraps around the GPIO.add_event_detect function
:param dio_number: DIO pin 0...5
:param callback: The function to call when the DIO triggers an IRQ.
:return: None
"""
GPIO.add_event_detect(dio_number, GPIO.RISING, callback=callback)
@staticmethod
def add_events(cb_dio0, cb_dio1, cb_dio2, cb_dio3, cb_dio4, cb_dio5, switch_cb=None):
BOARD.add_event_detect(BOARD.DIO0, callback=cb_dio0)
BOARD.add_event_detect(BOARD.DIO1, callback=cb_dio1)
BOARD.add_event_detect(BOARD.DIO2, callback=cb_dio2)
BOARD.add_event_detect(BOARD.DIO3, callback=cb_dio3)
# the modtronix inAir9B does not expose DIO4 and DIO5
if switch_cb is not None:
GPIO.add_event_detect(BOARD.SWITCH, GPIO.RISING, callback=switch_cb, bouncetime=300)
@staticmethod
def led_on(value=1):
""" Switch the proto shields LED
:param value: 0/1 for off/on. Default is 1.
:return: value
:rtype : int
"""
GPIO.output(BOARD.LED, value)
return value
@staticmethod
def led_off():
""" Switch LED off
:return: 0
"""
GPIO.output(BOARD.LED, 0)
return 0
@staticmethod
def blink(time_sec, n_blink):
if n_blink == 0:
return
BOARD.led_on()
for i in range(n_blink):
time.sleep(time_sec)
BOARD.led_off()
time.sleep(time_sec)
BOARD.led_on()
BOARD.led_off()

Wyświetl plik

@ -0,0 +1,190 @@
""" Defines constants (modes, bandwidths, registers, etc.) needed by SX127x. """
# Copyright 2015 Mayer Analytics Ltd.
#
# This file is part of pySX127x.
#
# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You can be released from the requirements of the license by obtaining a commercial license. Such a license is
# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your
# own applications, or shipping pySX127x with a closed source product.
#
# You should have received a copy of the GNU General Public License along with pySX127. If not, see
# <http://www.gnu.org/licenses/>.
def add_lookup(cls):
""" A decorator that adds a lookup dictionary to the class.
The lookup dictionary maps the codes back to the names. This is used for pretty-printing. """
varnames = filter(str.isupper, cls.__dict__.keys())
lookup = dict(map(lambda varname: (cls.__dict__.get(varname, None), varname), varnames))
setattr(cls, 'lookup', lookup)
return cls
@add_lookup
class MODE:
SLEEP = 0x80
STDBY = 0x81
FSTX = 0x82
TX = 0x83
FSRX = 0x84
RXCONT = 0x85
RXSINGLE = 0x86
CAD = 0x87
FSK_STDBY= 0x01 # needed for calibration
@add_lookup
class BW:
BW7_8 = 0
BW10_4 = 1
BW15_6 = 2
BW20_8 = 3
BW31_25 = 4
BW41_7 = 5
BW62_5 = 6
BW125 = 7
BW250 = 8
BW500 = 9
@add_lookup
class CODING_RATE:
CR4_5 = 1
CR4_6 = 2
CR4_7 = 3
CR4_8 = 4
@add_lookup
class GAIN:
NOT_USED = 0b000
G1 = 0b001
G2 = 0b010
G3 = 0b011
G4 = 0b100
G5 = 0b101
G6 = 0b110
@add_lookup
class PA_SELECT:
RFO = 0
PA_BOOST = 1
@add_lookup
class PA_RAMP:
RAMP_3_4_ms = 0
RAMP_2_ms = 1
RAMP_1_ms = 2
RAMP_500_us = 3
RAMP_250_us = 4
RAMP_125_us = 5
RAMP_100_us = 6
RAMP_62_us = 7
RAMP_50_us = 8
RAMP_40_us = 9
RAMP_31_us = 10
RAMP_25_us = 11
RAMP_20_us = 12
RAMP_15_us = 13
RAMP_12_us = 14
RAMP_10_us = 15
class MASK:
class IRQ_FLAGS:
RxTimeout = 7
RxDone = 6
PayloadCrcError = 5
ValidHeader = 4
TxDone = 3
CadDone = 2
FhssChangeChannel = 1
CadDetected = 0
class REG:
@add_lookup
class LORA:
FIFO = 0x00
OP_MODE = 0x01
FR_MSB = 0x06
FR_MID = 0x07
FR_LSB = 0x08
PA_CONFIG = 0x09
PA_RAMP = 0x0A
OCP = 0x0B
LNA = 0x0C
FIFO_ADDR_PTR = 0x0D
FIFO_TX_BASE_ADDR = 0x0E
FIFO_RX_BASE_ADDR = 0x0F
FIFO_RX_CURR_ADDR = 0x10
IRQ_FLAGS_MASK = 0x11
IRQ_FLAGS = 0x12
RX_NB_BYTES = 0x13
RX_HEADER_CNT_MSB = 0x14
RX_PACKET_CNT_MSB = 0x16
MODEM_STAT = 0x18
PKT_SNR_VALUE = 0x19
PKT_RSSI_VALUE = 0x1A
RSSI_VALUE = 0x1B
HOP_CHANNEL = 0x1C
MODEM_CONFIG_1 = 0x1D
MODEM_CONFIG_2 = 0x1E
SYMB_TIMEOUT_LSB = 0x1F
PREAMBLE_MSB = 0x20
PAYLOAD_LENGTH = 0x22
MAX_PAYLOAD_LENGTH = 0x22
HOP_PERIOD = 0x24
FIFO_RX_BYTE_ADDR = 0x25
MODEM_CONFIG_3 = 0x26
PPM_CORRECTION = 0x27
FEI_MSB = 0x28
DETECT_OPTIMIZE = 0X31
INVERT_IQ = 0x33
DETECTION_THRESH = 0X37
SYNC_WORD = 0X39
DIO_MAPPING_1 = 0x40
DIO_MAPPING_2 = 0x41
VERSION = 0x42
TCXO = 0x4B
PA_DAC = 0x4D
AGC_REF = 0x61
AGC_THRESH_1 = 0x62
AGC_THRESH_2 = 0x63
AGC_THRESH_3 = 0x64
PLL = 0x70
@add_lookup
class FSK:
LNA = 0x0C
RX_CONFIG = 0x0D
RSSI_CONFIG = 0x0E
PREAMBLE_DETECT = 0x1F
OSC = 0x24
SYNC_CONFIG = 0x27
SYNC_VALUE_1 = 0x28
SYNC_VALUE_2 = 0x29
SYNC_VALUE_3 = 0x2A
SYNC_VALUE_4 = 0x2B
SYNC_VALUE_5 = 0x2C
SYNC_VALUE_6 = 0x2D
SYNC_VALUE_7 = 0x2E
SYNC_VALUE_8 = 0x2F
PACKET_CONFIG_1 = 0x30
FIFO_THRESH = 0x35
IMAGE_CAL = 0x3B
DIO_MAPPING_1 = 0x40
DIO_MAPPING_2 = 0x41

Wyświetl plik

@ -0,0 +1,121 @@
""" Defines the BOARD class that contains the board pin mappings. """
# Copyright 2015 Mark Jessop <vk5qi@rfhead.net>
#
# This file is part of pySX127x.
#
# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You can be released from the requirements of the license by obtaining a commercial license. Such a license is
# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your
# own applications, or shipping pySX127x with a closed source product.
#
# You should have received a copy of the GNU General Public License along with pySX127x. If not, see
# <http://www.gnu.org/licenses/>.
import RPi.GPIO as GPIO
import spidev
import time
class HardwareInterface(object):
""" Board initialisation/teardown and pin configuration is kept here.
This is the HabSupplies PiLoraGateway v2.4 Shield.
Schematic for this board is here: https://github.com/PiInTheSky/pits-hardware/blob/master/PiLoraGatewayV2.4.pdf
Only the DIO0 and DIO5 pins are wired up
"""
# Note that the BCOM numbering for the GPIOs is used.
# The spi object is kept here
spi_device = 0
spi = None
spi_speed = 1000000
def __init__(self, device=0):
""" Configure the Raspberry GPIOs
:rtype : None
"""
self.spi_device = device
GPIO.setmode(GPIO.BCM)
if device == 0:
self.LED = 5
self.DIO0 = 25
self.DIO5 = 24
else:
self.LED = 21
self.DIO0 = 16
self.DIO5 = 12
GPIO.setup(self.LED, GPIO.OUT)
GPIO.output(self.LED, 0)
# DIOx
for gpio_pin in [self.DIO0, self.DIO5]:
GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# blink 2 times to signal the board is set up
self.blink(.1, 2)
def teardown(self):
""" Cleanup GPIO and SpiDev """
GPIO.cleanup()
self.spi.close()
def SpiDev(self):
""" Init and return the SpiDev object
:return: SpiDev object
:rtype: SpiDev
"""
self.spi = spidev.SpiDev()
self.spi.open(0, self.spi_device)
self.spi.max_speed_hz = self.spi_speed
return self.spi
def add_event_detect(self,dio_number, callback):
""" Wraps around the GPIO.add_event_detect function
:param dio_number: DIO pin 0...5
:param callback: The function to call when the DIO triggers an IRQ.
:return: None
"""
GPIO.add_event_detect(dio_number, GPIO.RISING, callback=callback)
def add_events(self,cb_dio0, cb_dio1, cb_dio2, cb_dio3, cb_dio4, cb_dio5, switch_cb=None):
return
#self.add_event_detect(self.DIO0, callback=cb_dio0)
#self.add_event_detect(self.DIO5, callback=cb_dio5)
def led_on(self,value=1):
""" Switch the proto shields LED
:param value: 0/1 for off/on. Default is 1.
:return: value
:rtype : int
"""
GPIO.output(self.LED, value)
return value
def led_off(self):
""" Switch LED off
:return: 0
"""
GPIO.output(self.LED, 0)
return 0
def blink(self,time_sec, n_blink):
if n_blink == 0:
return
self.led_on()
for i in range(n_blink):
time.sleep(time_sec)
self.led_off()
time.sleep(time_sec)
self.led_on()
self.led_off()
def read_gpio(self):
return (GPIO.input(self.DIO0),GPIO.input(self.DIO5))

Wyświetl plik

@ -0,0 +1,95 @@
""" Arduino SPI Bridge Hardware backend. """
# Copyright 2015 Mark Jessop <vk5qi@rfhead.net>
#
# This file is part of pySX127x.
#
# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You can be released from the requirements of the license by obtaining a commercial license. Such a license is
# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your
# own applications, or shipping pySX127x with a closed source product.
#
# You should have received a copy of the GNU General Public License along with pySX127x. If not, see
# <http://www.gnu.org/licenses/>.
import time,thread
from spibridge import SPIBridge
class HardwareInterface(object):
""" Special HardwareInterface object for the Arduino SPI Bridge code.
This is different in that we have to poll for interrupt flags.
Board initialisation/teardown and pin configuration is kept here.
Only the DIO0 and DIO5 pins are wired up on these Arduino shields
"""
# The object is kept here
spi = None
def __init__(self,port="/dev/ttyUSB0",baud=57600):
""" Configure the Raspberry GPIOs
:rtype : None
"""
self.spi = SPIBridge(port,baud)
# blink 2 times to signal the board is set up
self.blink(.1, 2)
def teardown(self):
""" Cleanup Serial Instance """
self.spi.close()
def SpiDev(self):
""" Init and return the SpiDev object
:return: SpiDev object
:rtype: SpiDev
"""
return self.spi
def add_event_detect(self,dio_number, callback):
""" Wraps around the GPIO.add_event_detect function
:param dio_number: DIO pin 0...5
:param callback: The function to call when the DIO triggers an IRQ.
:return: None
"""
pass
def add_events(self,cb_dio0, cb_dio1, cb_dio2, cb_dio3, cb_dio4, cb_dio5, switch_cb=None):
pass
def led_on(self,value=1):
""" Switch the proto shields LED
:param value: 0/1 for off/on. Default is 1.
:return: value
:rtype : int
"""
self.spi.set_led(1)
return value
def led_off(self):
""" Switch LED off
:return: 0
"""
self.spi.set_led(0)
return 0
def blink(self,time_sec, n_blink):
if n_blink == 0:
return
self.led_on()
for i in range(n_blink):
time.sleep(time_sec)
self.led_off()
time.sleep(time_sec)
self.led_on()
self.led_off()
def read_gpio(self):
return self.spi.read_gpio()

Wyświetl plik

@ -0,0 +1,186 @@
""" Arduino SPI Bridge Object """
# Copyright 2015 Mark Jessop <vk5qi@rfhead.net>
#
# This file is part of pySX127x.
#
# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You can be released from the requirements of the license by obtaining a commercial license. Such a license is
# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your
# own applications, or shipping pySX127x with a closed source product.
#
# You should have received a copy of the GNU General Public License along with pySX127x. If not, see
# <http://www.gnu.org/licenses/>.
import serial, time, struct
class SPIBridge(object):
ser = None
# Binary protocol stuff.
sync = 0xABCD
OPCODE_VERSION = 0x00
OPCODE_SPI_TXFR = 0x01
OPCODE_LED = 0x02
OPCODE_READ_GPIO = 0x03
def __init__(self, serialport="/dev/ttyUSB0",serialbaud=57600):
self.ser = serial.Serial(serialport, serialbaud, timeout=1)
print "Waiting for Arduino to boot..."
time.sleep(2)
if "SPIBridge" in self.read_version():
print "Connected OK!"
else:
print "Could not connect to SPI Bridge."
def close(self):
self.ser.close()
# CRC16 Functions
crc16tab = [
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
]
def crc16_floating(self,next_byte, seed):
return ((seed << 8) ^ self.crc16tab[(seed >> 8) ^ (ord(next_byte) & 0x00FF)])\
& 0xFFFF
def crc16_buff(self,buff):
crc = 0xFFFF
for ch in buff:
crc = self.crc16_floating(ch, crc)
return crc
def read_version(self):
temp = struct.pack('>BH', self.OPCODE_VERSION, 0x0000)
crc = self.crc16_buff(temp)
tx_data = struct.pack('>H3sH', self.sync, temp, crc)
if(self.ser.inWaiting()>0):
rx_data = self.ser.read(ser.inWaiting())
#print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data)
# Send!
self.ser.write(tx_data)
rx_data = self.ser.read(24)
if(ord(rx_data[2])==self.OPCODE_VERSION):
return str(ord(rx_data[5])) + "." + str(ord(rx_data[6])) + "." + str(ord(rx_data[7])) + " " + rx_data[8:-2]
else:
return "Unknown"
def xfer(self,data):
if(len(data)>1024):
data = data[:1024]
temp = struct.pack('>BH', self.OPCODE_SPI_TXFR, len(data)) + str(bytearray(data))
crc = self.crc16_buff(temp)
tx_data = struct.pack('>H', self.sync) + temp + struct.pack('>H', crc)
#print "Data in TX Buffer:" + ':'.join(x.encode('hex') for x in tx_data)
if(self.ser.inWaiting()>0):
rx_data = self.ser.read(ser.inWaiting())
#print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data)
self.ser.write(tx_data)
#time.sleep(1)
#print str(self.ser.inWaiting()) + " waiting."
rx_data = self.ser.read(len(tx_data))
#print "SPI RX Data:" + ':'.join(x.encode('hex') for x in rx_data)
#print rx_data
calc_crc = self.crc16_buff(rx_data[2:-2])
if(struct.pack('>H', calc_crc) == rx_data[-2:]):
#print "Checksum Match"
pass
else:
#print "Checksum Fail"
pass
if(ord(rx_data[2]) == self.OPCODE_SPI_TXFR):
return list(bytearray(rx_data[5:-2]))
else:
print "No Opcode Found!"
return [0]
def set_led(self,value=1):
opcode = self.OPCODE_LED
payload_length = 0x0001
payload = 0x00
if(value==1):
payload = 0x01
temp = struct.pack('>BHB',opcode,payload_length,payload)
crc = self.crc16_buff(temp)
tx_data = struct.pack('>H4sH', self.sync, temp, crc)
if(self.ser.inWaiting()>0):
rx_data = self.ser.read(ser.inWaiting())
#print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data)
self.ser.write(tx_data)
#print "Data in TX Buffer:" + ':'.join(x.encode('hex') for x in tx_data)
rx_data = self.ser.read(len(tx_data))
if(rx_data == tx_data):
return value
def read_gpio(self):
temp = struct.pack('>BH', self.OPCODE_READ_GPIO, 0x0000)
crc = self.crc16_buff(temp)
tx_data = struct.pack('>H3sH', self.sync, temp, crc)
if(self.ser.inWaiting()>0):
rx_data = self.ser.read(self.ser.inWaiting())
#print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data)
# Send!
self.ser.write(tx_data)
#print "Data in TX Buffer:" + ':'.join(x.encode('hex') for x in tx_data)
rx_data = self.ser.read(9)
#print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data)
if(ord(rx_data[2])==self.OPCODE_READ_GPIO):
return (ord(rx_data[5]),ord(rx_data[6]))
else:
return (-1,-1)
if __name__ == "__main__":
spi = SPIBridge()
print spi.read_version()
spi.close()

Wyświetl plik

@ -274,7 +274,7 @@ class WenetPiCam(object):
return
# Inform ground station we are about to send an image.
self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size/256))
self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size//256))
# Push SSDV file into transmit queue.
tx.queue_image_file(ssdv_filename)

Wyświetl plik

@ -10,6 +10,7 @@
# Released under GNU GPL v3 or later
#
import codecs
import ctypes
from numpy.ctypeslib import ndpointer
import numpy as np
@ -38,21 +39,22 @@ except OSError as e:
# Accepts a 258 byte string as input, returns the LDPC parity bits.
#
def ldpc_encode_string(payload, Nibits = 2064, Npbits = 516):
def ldpc_encode(payload, Nibits = 2064, Npbits = 516):
if len(payload) != 258:
raise TypeError("Payload MUST be 258 bytes in length! (2064 bit codeword)")
# Get input data into the right form (list of 0s and 1s)
ibits = np.unpackbits(np.fromstring(payload,dtype=np.uint8)).astype(np.uint8)
ibits = np.unpackbits(np.frombuffer(payload,dtype=np.uint8)).astype(np.uint8)
pbits = np.zeros(Npbits).astype(np.uint8)
_ldpc_enc.encode(ibits, pbits)
return np.packbits(np.array(list(pbits)).astype(np.uint8)).tostring()
return np.packbits(np.array(list(pbits)).astype(np.uint8)).tobytes()
#
# Interleaver functions
# Interleaver functions - Note that these are not used in Wenet right now.
# These also may not work under Python 3
#
# These variables need to be synchronised with those in ldpc_enc.c, until i figure out a better way
@ -113,26 +115,26 @@ def interleave_test():
def generate_dummy_packet():
payload = np.arange(0,256,1)
payload = np.append(payload,[0,0]).astype(np.uint8).tostring() # Add on dummy checksum, for a total of 258 bytes.
payload = np.append(payload,[0,0]).astype(np.uint8).tobytes() # Add on dummy checksum, for a total of 258 bytes.
return payload
def main():
# Generate a dummy test packet, and convert it to an array of 0 and 1.
payload = generate_dummy_packet()
print("Input (hex): %s" % ("".join("{:02x}".format(ord(c)) for c in payload)))
print("Input (hex):" + codecs.encode(payload,'hex').decode())
# Now run ldpc_encode over it X times.
parity = ""
start = time.time()
for x in xrange(1000):
for x in range(1000):
#print(x)
parity = ldpc_encode(payload)
stop = time.time()
print("time delta: %.3f" % (stop-start))
print("LDPC Parity Bits (hex): %s" % ("".join("{:02x}".format(ord(c)) for c in parity)))
print("LDPC Parity Bits (hex):" + codecs.encode(parity,'hex').decode())
print("Done!")
# Some basic test code.

Wyświetl plik

@ -10,7 +10,7 @@
import PacketTX, sys, os, time
import numpy as np
payload = np.arange(0,256,1).astype(np.uint8).tostring() # 0->255
payload = np.arange(0,256,1).astype(np.uint8).tobytes() # 0->255
debug_output = False # If True, packet bits are saved to debug.bin as one char per bit.

Wyświetl plik

@ -151,8 +151,8 @@ def post_process_image(filename):
# Finally, initialise the PiCam capture object.
picam = WenetPiCam.WenetPiCam(src_resolution=(1920,1088),
tx_resolution=(1920,1088),
picam = WenetPiCam.WenetPiCam(src_resolution=(3280,2464),
tx_resolution=(1488,1120),
callsign=callsign,
num_images=5,
debug_ptr=tx.transmit_text_message,

Wyświetl plik

@ -11,7 +11,7 @@ import PacketTX, sys, os, argparse
# Set to whatever resolution you want to test.
file_path = "../test_images/%d_raw.bin" # _raw, _800x608, _640x480, _320x240
image_numbers = xrange(1,14)
image_numbers = range(1,14)
debug_output = False # If True, packet bits are saved to debug.bin as one char per bit.
@ -22,11 +22,11 @@ def transmit_file(filename, tx_object):
print("File size not a multiple of 256 bytes!")
return
print("Transmitting %d Packets." % (file_size/256))
print("Transmitting %d Packets." % (file_size//256))
f = open(filename,'rb')
for x in range(file_size/256):
for x in range(file_size//256):
data = f.read(256)
tx_object.tx_packet(data)

Wyświetl plik

@ -12,22 +12,27 @@ Added UBloxGPS abstraction layer class for use with Wenet TX system.
import struct
import datetime
from threading import Thread
import time, os, json, calendar, math
import time, os, json, calendar, math, traceback, socket, argparse
# protocol constants
PREAMBLE1 = 0xb5
PREAMBLE2 = 0x62
# message classes
CLASS_NAV = 0x01
CLASS_RXM = 0x02
CLASS_INF = 0x04
CLASS_ACK = 0x05
CLASS_CFG = 0x06
CLASS_MON = 0x0A
CLASS_AID = 0x0B
CLASS_TIM = 0x0D
CLASS_ESF = 0x10
CLASS_NAV = 0x01 # Navigation
CLASS_RXM = 0x02 # Receiver Manager
CLASS_INF = 0x04 # Inforation
CLASS_ACK = 0x05 # Message ACKs
CLASS_CFG = 0x06 # Configuration
CLASS_UPD = 0x09 # Firmware Updates
CLASS_MON = 0x0A # Monitoring
CLASS_AID = 0x0B # AssistNow Aiding
CLASS_TIM = 0x0D # Timing
CLASS_ESF = 0x10 # External Sensor Fusion
CLASS_MGA = 0x13 # Multiple-GNSS Assistance
CLASS_LOG = 0x21 # Logging
CLASS_SEC = 0x27 # Security
CLASS_HNR = 0x28 # High-Rate Navigation
# ACK messages
MSG_ACK_NACK = 0x00
@ -52,6 +57,7 @@ MSG_NAV_DOP = 0x04
MSG_NAV_EKFSTATUS = 0x40
MSG_NAV_SBAS = 0x32
MSG_NAV_SOL = 0x06
MSG_NAV_PVT = 0x07
# RXM messages
MSG_RXM_RAW = 0x10
@ -102,10 +108,18 @@ MSG_CFG_TMODE = 0x1D
MSG_CFG_TPS = 0x31
MSG_CFG_TP = 0x07
MSG_CFG_GNSS = 0x3E
MSG_CFG_ESFALG = 0x56
MSG_CFG_ESFA = 0x4C
MSG_CFG_ESFG = 0x4D
MSG_CFG_ESFWT = 0x82
MSG_CFG_HNR = 0x5C
# ESF messages
MSG_ESF_MEAS = 0x02
MSG_ESF_STATUS = 0x10
MSG_ESF_ALG = 0x14
MSG_ESF_INS = 0x15
MSG_ESF_RAW = 0x03
# INF messages
MSG_INF_DEBUG = 0x04
@ -131,6 +145,11 @@ MSG_TIM_TM2 = 0x03
MSG_TIM_SVIN = 0x04
MSG_TIM_VRFY = 0x06
# HNR Messages
MSG_HNR_ATT = 0x01
MSG_HNR_INS = 0x02
MSG_HNR_PVT = 0x00
# port IDs
PORT_DDC =0
PORT_SERIAL1=1
@ -204,7 +223,7 @@ class UBloxDescriptor:
self.fields2 = fields2
def unpack(self, msg):
'''unpack a UBloxMessage, creating the .fields and ._recs attributes in msg'''
'''unpack a UBloxMessage, creating the .fields and ._recs attributes in msg'''
msg._fields = {}
# unpack main message blocks. A comm
@ -261,7 +280,7 @@ class UBloxDescriptor:
msg._unpacked = True
def pack(self, msg, msg_class=None, msg_id=None):
'''pack a UBloxMessage from the .fields and ._recs attributes in msg'''
'''pack a UBloxMessage from the .fields and ._recs attributes in msg'''
f1 = []
if msg_class is None:
msg_class = msg.msg_class()
@ -302,7 +321,7 @@ class UBloxDescriptor:
msg._buf += struct.pack('<BB', *msg.checksum(data=msg._buf[2:]))
def format(self, msg):
'''return a formatted string for a message'''
'''return a formatted string for a message'''
if not msg._unpacked:
self.unpack(msg)
ret = self.name + ': '
@ -364,6 +383,12 @@ msg_types = {
(CLASS_CFG, MSG_CFG_MSG) : UBloxDescriptor('CFG_MSG',
'<BB6B',
['msgClass', 'msgId', 'rates[6]']),
(CLASS_CFG, MSG_CFG_ESFALG) : UBloxDescriptor('CFG_ESFALG',
'<BBBBIhh',
['bitfield', 'unused1', 'unused2', 'unused3', 'yaw', 'pitch', 'roll']),
(CLASS_NAV, MSG_NAV_PVT) : UBloxDescriptor('NAV_PVT',
'<IhBBBBBBIiBBBBiiiiIIiiiiiIIHHBBBBihH',
['iTOW', 'year', 'month', 'day', 'hour', 'min', 'sec', 'valid', 'tAcc', 'nano', 'fixType', 'flags', 'flags2', 'numSV', 'Longitude', 'Latitude', 'height', 'hMSL', 'hAcc', 'vAcc', 'velN', 'velE', 'velD', 'gSpeed', 'headMot', 'sAcc', 'headAcc', 'pDOP', 'flags3', 'reserved1', 'reserved2', 'reserved3', 'reserved4', 'headVeh', 'magDec', 'magAcc']),
(CLASS_NAV, MSG_NAV_POSLLH) : UBloxDescriptor('NAV_POSLLH',
'<IiiiiII',
['iTOW', 'Longitude', 'Latitude', 'height', 'hMSL', 'hAcc', 'vAcc']),
@ -504,21 +529,24 @@ msg_types = {
['dur', 'meanX', 'meanY', 'meanZ', 'meanV',
'obs', 'valid', 'active', 'reserved1']),
(CLASS_INF, MSG_INF_ERROR) : UBloxDescriptor('INF_ERR', '<18s', ['str']),
(CLASS_INF, MSG_INF_DEBUG) : UBloxDescriptor('INF_DEBUG', '<18s', ['str'])
(CLASS_INF, MSG_INF_DEBUG) : UBloxDescriptor('INF_DEBUG', '<18s', ['str']),
(CLASS_ESF, MSG_ESF_ALG) : UBloxDescriptor('ESF_ALG',
'<iBBBBIhh',
['iTOW', 'version', 'flags', 'error', 'reserved1', 'yaw', 'pitch', 'roll'])
}
class UBloxMessage:
'''UBlox message class - holds a UBX binary message'''
def __init__(self):
self._buf = ""
self._buf = b""
self._fields = {}
self._recs = []
self._unpacked = False
self.debug_level = 0
def __str__(self):
'''format a message as a string'''
'''format a message as a string'''
if not self.valid():
return 'UBloxMessage(INVALID)'
type = self.msg_type()
@ -552,7 +580,7 @@ class UBloxMessage:
print(msg)
def unpack(self):
'''unpack a message'''
'''unpack a message'''
if not self.valid():
raise UBloxError('INVALID MESSAGE')
type = self.msg_type()
@ -561,7 +589,7 @@ class UBloxMessage:
msg_types[type].unpack(self)
def pack(self):
'''pack a message'''
'''pack a message'''
if not self.valid():
raise UbloxError('INVALID MESSAGE')
type = self.msg_type()
@ -570,7 +598,7 @@ class UBloxMessage:
msg_types[type].pack(self)
def name(self):
'''return the short string name for a message'''
'''return the short string name for a message'''
if not self.valid():
raise UbloxError('INVALID MESSAGE')
type = self.msg_type()
@ -579,27 +607,27 @@ class UBloxMessage:
return msg_types[type].name
def msg_class(self):
'''return the message class'''
return ord(self._buf[2])
'''return the message class'''
return self._buf[2]
def msg_id(self):
'''return the message id within the class'''
return ord(self._buf[3])
'''return the message id within the class'''
return self._buf[3]
def msg_type(self):
'''return the message type tuple (class, id)'''
'''return the message type tuple (class, id)'''
return (self.msg_class(), self.msg_id())
def msg_length(self):
'''return the payload length'''
'''return the payload length'''
(payload_length,) = struct.unpack('<H', self._buf[4:6])
return payload_length
def valid_so_far(self):
'''check if the message is valid so far'''
if len(self._buf) > 0 and ord(self._buf[0]) != PREAMBLE1:
'''check if the message is valid so far'''
if len(self._buf) > 0 and self._buf[0] != PREAMBLE1:
return False
if len(self._buf) > 1 and ord(self._buf[1]) != PREAMBLE2:
if len(self._buf) > 1 and self._buf[1] != PREAMBLE2:
self.debug(1, "bad pre2")
return False
if self.needed_bytes() == 0 and not self.valid():
@ -611,28 +639,27 @@ class UBloxMessage:
return True
def add(self, bytes):
'''add some bytes to a message'''
'''add some bytes to a message'''
self._buf += bytes
while not self.valid_so_far() and len(self._buf) > 0:
'''handle corrupted streams'''
self._buf = self._buf[1:]
if self.needed_bytes() < 0:
self._buf = ""
self._buf = b""
def checksum(self, data=None):
'''return a checksum tuple for a message'''
'''return a checksum tuple for a message'''
if data is None:
data = self._buf[2:-2]
cs = 0
ck_a = 0
ck_b = 0
for i in data:
ck_a = (ck_a + ord(i)) & 0xFF
ck_a = (ck_a + i) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
return (ck_a, ck_b)
def valid_checksum(self):
'''check if the checksum is OK'''
'''check if the checksum is OK'''
(ck_a, ck_b) = self.checksum()
d = self._buf[2:-2]
(ck_a2, ck_b2) = struct.unpack('<BB', self._buf[-2:])
@ -645,7 +672,7 @@ class UBloxMessage:
return self.msg_length() + 8 - len(self._buf)
def valid(self):
'''check if a message is valid'''
'''check if a message is valid'''
return len(self._buf) >= 8 and self.needed_bytes() == 0 and self.valid_checksum()
@ -685,9 +712,9 @@ class UBlox:
self.preferred_dgps_timeout = None
def close(self):
'''close the device'''
'''close the device'''
self.dev.close()
self.dev = None
self.dev = None
def set_debug(self, debug_level):
'''set debug level'''
@ -699,7 +726,7 @@ class UBlox:
print(msg)
def set_logfile(self, logfile, append=False):
'''setup logging to a file'''
'''setup logging to a file'''
if self.log is not None:
self.log.close()
self.log = None
@ -758,10 +785,10 @@ class UBlox:
def send_nmea(self, msg):
if not self.read_only:
s = msg + "*%02X" % self.nmea_checksum(msg)
self.write(s)
self.write(s.encode('ascii'))
def set_binary(self):
'''put a UBlox into binary mode using a NMEA string'''
'''put a UBlox into binary mode using a NMEA string'''
if not self.read_only:
print("try set binary at %u" % self.baudrate)
self.send_nmea("$PUBX,41,0,0007,0001,%u,0" % self.baudrate)
@ -808,7 +835,7 @@ class UBlox:
def receive_message(self, ignore_eof=False):
'''blocking receive of one ublox message'''
'''blocking receive of one ublox message'''
msg = UBloxMessage()
while True:
n = msg.needed_bytes()
@ -827,7 +854,7 @@ class UBlox:
return msg
def receive_message_noerror(self, ignore_eof=False):
'''blocking receive of one ublox message, ignoring errors'''
'''blocking receive of one ublox message, ignoring errors'''
try:
return self.receive_message(ignore_eof=ignore_eof)
except UBloxError as e:
@ -840,7 +867,7 @@ class UBlox:
return None
def send(self, msg):
'''send a preformatted ublox message'''
'''send a preformatted ublox message'''
if not msg.valid():
self.debug(1, "invalid send")
return
@ -848,7 +875,7 @@ class UBlox:
self.write(msg._buf)
def send_message(self, msg_class, msg_id, payload):
'''send a ublox message with class, id and payload'''
'''send a ublox message with class, id and payload'''
msg = UBloxMessage()
msg._buf = struct.pack('<BBBBH', 0xb5, 0x62, msg_class, msg_id, len(payload))
msg._buf += payload
@ -857,33 +884,33 @@ class UBlox:
self.send(msg)
def configure_solution_rate(self, rate_ms=200, nav_rate=1, timeref=0):
'''configure the solution rate in milliseconds'''
'''configure the solution rate in milliseconds'''
payload = struct.pack('<HHH', rate_ms, nav_rate, timeref)
self.send_message(CLASS_CFG, MSG_CFG_RATE, payload)
def configure_message_rate(self, msg_class, msg_id, rate):
'''configure the message rate for a given message'''
'''configure the message rate for a given message'''
payload = struct.pack('<BBB', msg_class, msg_id, rate)
self.send_message(CLASS_CFG, MSG_CFG_SET_RATE, payload)
def configure_port(self, port=1, inMask=3, outMask=3, mode=2240, baudrate=None):
'''configure a IO port'''
'''configure a IO port'''
if baudrate is None:
baudrate = self.baudrate
payload = struct.pack('<BBHIIHHHH', port, 0xff, 0, mode, baudrate, inMask, outMask, 0xFFFF, 0xFFFF)
self.send_message(CLASS_CFG, MSG_CFG_PRT, payload)
def configure_loadsave(self, clearMask=0, saveMask=0, loadMask=0, deviceMask=0):
'''configure configuration load/save'''
'''configure configuration load/save'''
payload = struct.pack('<IIIB', clearMask, saveMask, loadMask, deviceMask)
self.send_message(CLASS_CFG, MSG_CFG_CFG, payload)
def configure_poll(self, msg_class, msg_id, payload=''):
'''poll a configuration message'''
def configure_poll(self, msg_class, msg_id, payload=b''):
'''poll a configuration message'''
self.send_message(msg_class, msg_id, payload)
def configure_poll_port(self, portID=None):
'''poll a port configuration'''
'''poll a port configuration'''
if portID is None:
self.configure_poll(CLASS_CFG, MSG_CFG_PRT)
else:
@ -1167,7 +1194,7 @@ class UBloxGPS(object):
# Poll for a CFG_NAV5 message occasionally.
if self.rx_counter % 20 == 0:
# A message with only 0x00 in the payload field is a poll.
self.gps.send_message(CLASS_CFG, MSG_CFG_NAV5,'\x00')
self.gps.send_message(CLASS_CFG, MSG_CFG_NAV5,b'\x00')
# Additional checks to be sure we're in the right dynamic model.
if self.rx_counter % 40 == 0:
@ -1205,7 +1232,7 @@ if __name__ == "__main__":
print(state)
gps = UBloxGPS(port=sys.argv[1], callback=gps_test, update_rate_ms=500, dynamic_model=DYNAMIC_MODEL_PORTABLE, ntpd_update=True)
gps = UBloxGPS(port=sys.argv[1], callback=gps_test, update_rate_ms=500, dynamic_model=DYNAMIC_MODEL_AIRBORNE1G, ntpd_update=True)
try:
while True: