modbus remove example tests

pull/10721/merge
aleks 2023-02-20 11:06:27 +01:00
rodzic a30779662e
commit ed86c026cb
4 zmienionych plików z 0 dodań i 614 usunięć

Wyświetl plik

@ -1032,18 +1032,6 @@ example_test_007:
- ESP32
- Example_I2C_CCS811_SENSOR
example_test_011:
extends: .example_test_esp32_template
tags:
- ESP32
- Example_T2_RS485
example_test_016:
extends: .example_test_esp32_template
tags:
- ESP32
- Example_Modbus_TCP
example_test_017:
extends: .example_test_esp32s2_template
tags:

Wyświetl plik

@ -130,10 +130,6 @@ examples/protocols/mdns:
examples/protocols/modbus:
disable:
- if: IDF_TARGET in ["esp32h2"]
disable_test:
- if: IDF_TARGET != "esp32"
temporary: true
reason: lack of runners
examples/protocols/mqtt/ssl:
disable:

Wyświetl plik

@ -1,290 +0,0 @@
# SPDX-FileCopyrightText: 2016-2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
# Need Python 3 string formatting functions
from __future__ import print_function
import logging
import os
import re
from threading import Thread
import ttfw_idf
LOG_LEVEL = logging.DEBUG
LOGGER_NAME = 'modbus_test'
# Allowed parameter reads
TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings
TEST_READ_MAX_ERR_COUNT = 2 # Maximum allowed read errors during initialization
TEST_THREAD_EXPECT_TIMEOUT = 120 # Test theread expect timeout in seconds
TEST_THREAD_JOIN_TIMEOUT = 180 # Test theread join timeout in seconds
# Test definitions
TEST_MASTER_RTU = 'master_rtu'
TEST_SLAVE_RTU = 'slave_rtu'
TEST_MASTER_ASCII = 'master_ascii'
TEST_SLAVE_ASCII = 'slave_ascii'
# Define tuple of strings to expect for each DUT.
#
master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...')
slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.')
# The dictionary for expected values in listing
expect_dict_master_ok = {'START': (),
'READ_PAR_OK': (),
'ALARM_MSG': (u'7',)}
expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'),
'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')}
# The dictionary for regular expression patterns to check in listing
pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'),
'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+'
r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'),
'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')}
pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+'
r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'),
'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s'
r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')}
# The dictionary for expected values in listing
expect_dict_slave_ok = {'START': (),
'READ_PAR_OK': (),
'DESTROY': ()}
# The dictionary for regular expression patterns to check in listing
pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'),
'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s'
r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')}
logger = logging.getLogger(LOGGER_NAME)
class DutTestThread(Thread):
def __init__(self, dut=None, name=None, expect=None):
""" Initialize the thread parameters
"""
self.tname = name
self.dut = dut
self.expected = expect
self.result = False
self.data = None
super(DutTestThread, self).__init__()
def run(self):
""" The function implements thread functionality
"""
# Must reset again as flashing during start_app will reset multiple times, causing unexpected results
self.dut.reset()
# Capture output from the DUT
self.dut.start_capture_raw_data()
# Check expected strings in the listing
for string in self.expected:
self.dut.expect(string, TEST_THREAD_EXPECT_TIMEOUT)
# Check DUT exceptions
dut_exceptions = self.dut.get_exceptions()
if 'Guru Meditation Error:' in dut_exceptions:
raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions))
# Mark thread has run to completion without any exceptions
self.data = self.dut.stop_capture_raw_data()
self.result = True
def test_filter_output(data=None, start_pattern=None, end_pattern=None):
"""Use patters to filter output
"""
start_index = str(data).find(start_pattern)
end_index = str(data).find(end_pattern)
logger.debug('Listing start index= %d, end=%d' % (start_index, end_index))
if start_index == -1 or end_index == -1:
return data
return data[start_index:end_index + len(end_pattern)]
def test_expect_re(data, pattern):
"""
Check if re pattern is matched in data cache
:param data: data to process
:param pattern: compiled RegEx pattern
:return: match groups if match succeed otherwise None
"""
ret = None
if isinstance(pattern, type(u'')):
pattern = pattern.encode('utf-8')
regex = re.compile(pattern)
if isinstance(data, type(u'')):
data = data.encode('utf-8')
match = regex.search(data)
if match:
ret = tuple(None if x is None else x.decode() for x in match.groups())
index = match.end()
else:
index = None
return ret, index
def test_check_output(data=None, check_dict=None, expect_dict=None):
""" Check output for the test
Check log using regular expressions:
"""
global logger
match_count = 0
index = 0
data_lines = data.splitlines()
for key, pattern in check_dict.items():
if key not in expect_dict:
break
# Check pattern in the each line
for line in data_lines:
group, index = test_expect_re(line, pattern)
if index is not None:
logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line))
if expect_dict[key] == group:
logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group)))
match_count += 1
return match_count
def test_check_mode(dut=None, mode_str=None, value=None):
""" Check communication mode for dut
"""
global logger
try:
opt = dut.app.get_sdkconfig()[mode_str]
logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt))
return value == opt
except Exception:
logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
return False
@ttfw_idf.idf_example_test(env_tag='Example_T2_RS485', target=['esp32'])
def test_modbus_serial_communication(env, comm_mode):
global logger
# Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver
dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT)
dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT)
try:
logger.debug('Environment vars: %s\r\n' % os.environ)
logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig())
logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig())
# Check Kconfig configuration options for each built example
if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'):
logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n')
slave_name = TEST_SLAVE_ASCII
master_name = TEST_MASTER_ASCII
elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'):
logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n')
slave_name = TEST_SLAVE_RTU
master_name = TEST_MASTER_RTU
else:
logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
# Check if slave address for example application is default one to be able to communicate
if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'):
logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
# Flash app onto each DUT
dut_master.start_app()
dut_slave.start_app()
# Create thread for each dut
dut_master_thread = DutTestThread(dut=dut_master, name=master_name, expect=master_expect)
dut_slave_thread = DutTestThread(dut=dut_slave, name=slave_name, expect=slave_expect)
# Start each thread
dut_slave_thread.start()
dut_master_thread.start()
# Wait for threads to complete
dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
if dut_slave_thread.isAlive():
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
if dut_master_thread.isAlive():
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
finally:
dut_master.close()
dut_slave.close()
# Check if test threads completed successfully and captured data
if not dut_slave_thread.result or dut_slave_thread.data is None:
logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname)
if not dut_master_thread.result or dut_master_thread.data is None:
logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname)
# Filter output to get test messages
master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1])
if master_output is not None:
logger.info('The data for master thread is captured.')
logger.debug(master_output)
slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1])
if slave_output is not None:
logger.info('The data for slave thread is captured.')
logger.debug(slave_output)
# Check if parameters are read correctly by master
match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok)
if match_count < TEST_READ_MIN_COUNT:
logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
# If the test completed successfully (alarm triggered) but there are some errors during reading of parameters
match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err)
if match_count > TEST_READ_MAX_ERR_COUNT:
logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok)
if match_count < TEST_READ_MIN_COUNT:
logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count))
if __name__ == '__main__':
logger = logging.getLogger(LOGGER_NAME)
# create file handler which logs even debug messages
fh = logging.FileHandler('modbus_test.log')
fh.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
# create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# set format of output for both handlers
formatter = logging.Formatter('%(levelname)s:%(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('Start script %s.' % os.path.basename(__file__))
test_modbus_serial_communication()
logging.shutdown()

Wyświetl plik

@ -1,308 +0,0 @@
# SPDX-FileCopyrightText: 2016-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import re
from threading import Thread
import ttfw_idf
from tiny_test_fw import DUT
LOG_LEVEL = logging.DEBUG
LOGGER_NAME = 'modbus_test'
# Allowed options for the test
TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization
TEST_THREAD_JOIN_TIMEOUT = 60 # Test theread join timeout in seconds
TEST_EXPECT_STR_TIMEOUT = 30 # Test expect timeout in seconds
TEST_MASTER_TCP = 'mb_tcp_master'
TEST_SLAVE_TCP = 'mb_tcp_slave'
STACK_DEFAULT = 0
STACK_IPV4 = 1
STACK_IPV6 = 2
STACK_INIT = 3
STACK_CONNECT = 4
STACK_START = 5
STACK_PAR_OK = 6
STACK_PAR_FAIL = 7
STACK_DESTROY = 8
pattern_dict_slave = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
STACK_INIT: (r'.*I \(([0-9]+)\) MB_TCP_SLAVE_PORT: (Protocol stack initialized).'),
STACK_CONNECT: (r'.*I\s\(([0-9]+)\) MB_TCP_SLAVE_PORT: Socket \(#[0-9]+\), accept client connection from address: '
r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
STACK_START: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Start modbus test).*'),
STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: ([A-Z]+ [A-Z]+) \([a-zA-Z0-9_]+ us\),\s'
r'ADDR:([0-9]+), TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) SLAVE_TEST: Response time exceeds configured [0-9]+ [ms], ignore packet.*'),
STACK_DESTROY: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Modbus controller destroyed).')}
pattern_dict_master = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
STACK_INIT: (r'.*I \(([0-9]+)\) MASTER_TEST: (Modbus master stack initialized).*'),
STACK_CONNECT: (r'.*.*I\s\(([0-9]+)\) MB_TCP_MASTER_PORT: (Connected [0-9]+ slaves), start polling.*'),
STACK_START: (r'.*I \(([0-9]+)\) MASTER_TEST: (Start modbus test).*'),
STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+ ([a-zA-Z0-9_]+)'
r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.]+ \(0x[a-zA-Z0-9]+\) read successful.*'),
STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+\s\(([a-zA-Z0-9_]+)\)\s'
r'read fail, err = [0-9]+ \([_A-Z]+\).*'),
STACK_DESTROY: (r'.*I\s\(([0-9]+)\) MASTER_TEST: (Destroy master).*')}
logger = logging.getLogger(LOGGER_NAME)
class DutTestThread(Thread):
""" Test thread class
"""
def __init__(self, dut=None, name=None, ip_addr=None, expect=None):
""" Initialize the thread parameters
"""
self.tname = name
self.dut = dut
self.expected = expect
self.data = None
self.ip_addr = ip_addr
self.test_finish = False
self.param_fail_count = 0
self.param_ok_count = 0
self.test_stage = STACK_DEFAULT
super(DutTestThread, self).__init__()
def __enter__(self):
logger.debug('Restart %s.', self.tname)
# Reset DUT first
self.dut.reset()
# Capture output from the DUT
self.dut.start_capture_raw_data(capture_id=self.dut.name)
return self
def __exit__(self, exc_type, exc_value, traceback):
""" The exit method of context manager
"""
if exc_type is not None or exc_value is not None:
logger.info('Thread %s rised an exception type: %s, value: %s', self.tname, str(exc_type), str(exc_value))
def run(self):
""" The function implements thread functionality
"""
# Initialize slave IP for master board
if (self.ip_addr is not None):
self.set_ip(0)
# Check expected strings in the listing
self.test_start(TEST_EXPECT_STR_TIMEOUT)
# Check DUT exceptions
dut_exceptions = self.dut.get_exceptions()
if 'Guru Meditation Error:' in dut_exceptions:
raise RuntimeError('%s generated an exception(s): %s\n' % (str(self.dut), dut_exceptions))
# Mark thread has run to completion without any exceptions
self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name)
def set_ip(self, index=0):
""" The method to send slave IP to master application
"""
message = r'.*Waiting IP([0-9]{1,2}) from stdin.*'
# Read all data from previous restart to get prompt correctly
self.dut.read()
result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
if int(result[0]) != index:
raise RuntimeError('Incorrect index of IP=%s for %s\n' % (result[0], str(self.dut)))
# Use the same slave IP address for all characteristics during the test
self.dut.write('IP0=' + self.ip_addr, '\n', False)
self.dut.write('IP1=' + self.ip_addr, '\n', False)
self.dut.write('IP2=' + self.ip_addr, '\n', False)
logger.debug('Set IP address=%s for %s', self.ip_addr, self.tname)
message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*'
result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
logger.debug('Thread %s initialized with slave IP=%s.', self.tname, self.ip_addr)
def test_start(self, timeout_value):
""" The method to initialize and handle test stages
"""
def handle_get_ip4(data):
""" Handle get_ip v4
"""
logger.debug('%s[STACK_IPV4]: %s', self.tname, str(data))
self.test_stage = STACK_IPV4
def handle_get_ip6(data):
""" Handle get_ip v6
"""
logger.debug('%s[STACK_IPV6]: %s', self.tname, str(data))
self.test_stage = STACK_IPV6
def handle_init(data):
""" Handle init
"""
logger.debug('%s[STACK_INIT]: %s', self.tname, str(data))
self.test_stage = STACK_INIT
def handle_connect(data):
""" Handle connect
"""
logger.debug('%s[STACK_CONNECT]: %s', self.tname, str(data))
self.test_stage = STACK_CONNECT
def handle_test_start(data):
""" Handle connect
"""
logger.debug('%s[STACK_START]: %s', self.tname, str(data))
self.test_stage = STACK_START
def handle_par_ok(data):
""" Handle parameter ok
"""
logger.debug('%s[READ_PAR_OK]: %s', self.tname, str(data))
if self.test_stage >= STACK_START:
self.param_ok_count += 1
self.test_stage = STACK_PAR_OK
def handle_par_fail(data):
""" Handle parameter fail
"""
logger.debug('%s[READ_PAR_FAIL]: %s', self.tname, str(data))
self.param_fail_count += 1
self.test_stage = STACK_PAR_FAIL
def handle_destroy(data):
""" Handle destroy
"""
logger.debug('%s[DESTROY]: %s', self.tname, str(data))
self.test_stage = STACK_DESTROY
self.test_finish = True
while not self.test_finish:
try:
self.dut.expect_any((re.compile(self.expected[STACK_IPV4]), handle_get_ip4),
(re.compile(self.expected[STACK_IPV6]), handle_get_ip6),
(re.compile(self.expected[STACK_INIT]), handle_init),
(re.compile(self.expected[STACK_CONNECT]), handle_connect),
(re.compile(self.expected[STACK_START]), handle_test_start),
(re.compile(self.expected[STACK_PAR_OK]), handle_par_ok),
(re.compile(self.expected[STACK_PAR_FAIL]), handle_par_fail),
(re.compile(self.expected[STACK_DESTROY]), handle_destroy),
timeout=timeout_value)
except DUT.ExpectTimeout:
logger.debug('%s, expect timeout on stage #%d (%s seconds)', self.tname, self.test_stage, timeout_value)
self.test_finish = True
def test_check_mode(dut=None, mode_str=None, value=None):
""" Check communication mode for dut
"""
global logger
try:
opt = dut.app.get_sdkconfig()[mode_str]
logger.debug('%s {%s} = %s.\n', str(dut), mode_str, opt)
return value == opt
except Exception:
logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.', str(dut), mode_str)
return False
@ttfw_idf.idf_example_test(env_tag='Example_Modbus_TCP', target=['esp32'])
def test_modbus_tcp_communication(env, comm_mode):
global logger
rel_project_path = os.path.join('examples', 'protocols', 'modbus', 'tcp')
# Get device under test. Both duts must be able to be connected to WiFi router
dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP))
dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP))
log_file = os.path.join(env.log_path, 'modbus_tcp_test.log')
print('Logging file name: %s' % log_file)
try:
# create file handler which logs even debug messages
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
# set format of output for both handlers
formatter = logging.Formatter('%(levelname)s:%(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
# create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# set format of output for both handlers
formatter = logging.Formatter('%(levelname)s:%(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
# Check Kconfig configuration options for each built example
if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and
test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')):
slave_name = TEST_SLAVE_TCP
master_name = TEST_MASTER_TCP
else:
logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
raise RuntimeError('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
address = None
if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'):
logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n')
# Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason
dut_slave.start_app()
dut_master.start_app()
if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'):
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT)
else:
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT)
if address is not None:
print('Found IP slave address: %s' % address[0])
else:
raise RuntimeError('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n')
else:
raise RuntimeError('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n')
# Create thread for each dut
with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread:
with DutTestThread(dut=dut_slave, name=slave_name, ip_addr=None, expect=pattern_dict_slave) as dut_slave_thread:
# Start each thread
dut_slave_thread.start()
dut_master_thread.start()
# Wait for threads to complete
dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
if dut_slave_thread.is_alive():
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
raise RuntimeError('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
if dut_master_thread.is_alive():
logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
raise RuntimeError('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n',
dut_master_thread.tname, dut_master_thread.param_fail_count,
dut_slave_thread.tname, dut_slave_thread.param_fail_count)
logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n',
dut_master_thread.tname, dut_master_thread.param_ok_count,
dut_slave_thread.tname, dut_slave_thread.param_ok_count)
if ((dut_master_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
(dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
(dut_slave_thread.param_ok_count == 0) or
(dut_master_thread.param_ok_count == 0)):
raise RuntimeError('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' %
(dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n')
finally:
dut_master.close()
dut_slave.close()
logging.shutdown()
if __name__ == '__main__':
test_modbus_tcp_communication()