Merge branch 'ci/fix_socket_ipv6_test' into 'master'

CI: Migrate socket example tests to pytest

See merge request espressif/esp-idf!21596
pull/10469/head
Chen Yu Dong 2022-12-20 11:13:09 +08:00
commit e5b318ab71
21 zmienionych plików z 643 dodań i 573 usunięć

Wyświetl plik

@ -869,14 +869,6 @@ example_test_001C:
- ESP32
- Example_GENERIC
example_test_protocols:
extends:
- .example_test_esp32_template
- .rules:test:example_test-esp32-wifi
tags:
- ESP32
- wifi_router
example_test_002:
extends:
- .example_test_esp32_template

Wyświetl plik

@ -30,7 +30,38 @@ There are many host-side tools which can be used to interact with the UDP/TCP se
One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets.
Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following commands.
In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples.
In addition to those tools, There are some python scripts under `examples/protocols/sockets/scripts`.
And scripts for automated tests named `pytest_xxx.py` can be found under each example directory.
### Python Scripts Socket Tools
Python scripts under `examples/protocols/sockets/scripts` could be used to exercise the socket communication.
Command line arguments such as IP version and IP address shall be supplied. Use `python xxxx.py --help` to see how to use these scripts.
Examples:
```bash
# python run_tcp_client.py --help
python run_tcp_client.py 192.168.1.2 [--port=3333] [--message="Data to ESP"]
python run_tcp_client.py fe80::2%eth0 [--port=3333] [--message="Data to ESP"]
# python run_tcp_server.py --help
python run_tcp_server.py [--port=3333] [--ipv6]
```
### Python Scripts For Automated Tests
Script named `pytest_xxxx` in the application directory can be used for automated tests.
They can also be run locally. Ref: [ESP-IDF Tests with Pytest Guide](https://docs.espressif.com/projects/esp-idf/en/latest/esp32/contribute/esp-idf-tests-with-pytest.html).
Example:
```bash
$ cd $IDF_PATH
$ bash install.sh --enable-pytest
$ . ./export.sh
$ cd examples/protocols/sockets/tcp_client
$ python $IDF_PATH/tools/ci/ci_build_apps.py . --target esp32 -vv --pytest-apps
$ pytest --target esp32
```
### Send UDP packet via netcat
```
@ -62,16 +93,6 @@ nc 192.168.0.167 3333
nc -l 192.168.0.167 -p 3333
```
### Python scripts
Each script in the application directory could be used to exercise the socket communication.
Command line arguments such as IP version (IPv4 or IPv6) and IP address and payload data (only clients) shall be supplied.
In addition to that, port number and interface id are hardcoded in the scripts and might need to be altered to match the values used by the application. Example:
```
PORT = 3333
INTERFACE = 'en0'
```
### Note about IPv6 addresses
Examples are configured to obtain multiple IPv6 addresses. The actual behavior may differ depending on the local network, typically the ESP gets assigned these two addresses

Wyświetl plik

@ -1,26 +0,0 @@
# This example code is in the Public Domain (or CC0 licensed, at your option.)
# Unless required by applicable law or agreed to in writing, this
# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied.
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import re
import ttfw_idf
@ttfw_idf.idf_example_test(env_tag='Example_GENERIC')
def test_examples_protocol_socket_non_block(env, _):
dut = env.get_dut('non_blocking_socket', 'examples/protocols/sockets/non_blocking', dut_class=ttfw_idf.ESP32DUT)
# start the test and expect the client to receive back it's original data
dut.start_app()
dut.expect(re.compile(r'nonblocking-socket-client: Received: GET / HTTP/1.1'), timeout=30)
if __name__ == '__main__':
test_examples_protocol_socket_non_block()

Wyświetl plik

@ -0,0 +1,11 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import pytest
from pytest_embedded import Dut
@pytest.mark.supported_targets
@pytest.mark.generic
def test_examples_non_block_socket_localhost(dut: Dut) -> None:
dut.expect(r'nonblocking-socket-client: Received: GET / HTTP/1.1', timeout=30)

Wyświetl plik

@ -0,0 +1,50 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import argparse
import os
import socket
DEF_PORT = 3333
DEF_MESSAGE = 'Data to ESP'
def tcp_client(address: str, port: int, payload: str) -> str:
for res in socket.getaddrinfo(address, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
family_addr, _, _, _, addr = res
try:
sock = socket.socket(family_addr, socket.SOCK_STREAM)
sock.settimeout(60.0)
except socket.error as msg:
print('Could not create socket')
print(os.strerror(msg.errno))
raise
try:
sock.connect(addr)
except socket.error as e:
print('Could not open socket: ' + str(e))
sock.close()
raise
sock.sendall(payload.encode())
data = sock.recv(1024)
if not data:
return ''
print('Reply : ' + data.decode())
sock.close()
return data.decode()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('remote_ip', help='TCP server ip address, eg: 192.168.1.1, fe80::2%%eth0')
parser.add_argument('--port', default=DEF_PORT, type=int, help='TCP server port')
parser.add_argument('--message', default=DEF_MESSAGE, help='Message to send to the server.')
args = parser.parse_args()
print(f'Send message to the server: {args.remote_ip}')
data = tcp_client(args.remote_ip, args.port, args.message)
print(f'Received From server: {data}')
if __name__ == '__main__':
main()

Wyświetl plik

@ -0,0 +1,85 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import argparse
import socket
from threading import Event, Thread
DEF_PORT = 3333
class TcpServer(object):
def __init__(self, port, family_addr, persist=False, timeout=60): # type: ignore
self.port = port
self.socket = socket.socket(family_addr, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.settimeout(timeout)
self.shutdown = Event()
self.persist = persist
self.family_addr = family_addr
self.server_thread = None
def __enter__(self): # type: ignore
try:
self.socket.bind(('', self.port))
except socket.error as e:
print('Bind failed:{}'.format(e))
raise
self.socket.listen(1)
print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr))
self.server_thread = Thread(target=self.run_server)
self.server_thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore
if self.persist:
sock = socket.socket(self.family_addr, socket.SOCK_STREAM)
sock.connect(('localhost', self.port))
sock.sendall(b'Stop', )
sock.close()
self.shutdown.set()
self.shutdown.set()
self.server_thread.join()
self.socket.close()
def run_server(self) -> None:
while not self.shutdown.is_set():
try:
conn, address = self.socket.accept() # accept new connection
print('Connection from: {}'.format(address))
conn.setblocking(1)
data = conn.recv(1024)
if not data:
return
data = data.decode()
print('Received data: ' + data)
reply = 'OK: ' + data
conn.send(reply.encode())
conn.close()
except socket.timeout:
print(f'socket accept timeout ({self.socket.timeout}s)')
except socket.error as e:
print('Running server failed:{}'.format(e))
raise
if not self.persist:
break
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--port', default=DEF_PORT, type=int, help='TCP server port')
parser.add_argument('--ipv6', action='store_true', help='Create IPv6 server.')
parser.add_argument('--timeout', default=10, type=int, help='socket accept/recv timeout.')
args = parser.parse_args()
if args.ipv6:
family = socket.AF_INET6
else:
family = socket.AF_INET
with TcpServer(args.port, family, persist=True, timeout=args.timeout):
input('Server Running. Press Enter or CTRL-C to exit...\n')
if __name__ == '__main__':
main()

Wyświetl plik

@ -0,0 +1,52 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import argparse
import os
import socket
DEF_PORT = 3333
DEF_MESSAGE = 'Data to ESP'
def udp_client(address: str, port: int, payload: str) -> str:
for res in socket.getaddrinfo(address, port, socket.AF_UNSPEC,
socket.SOCK_DGRAM, 0, socket.AI_PASSIVE):
family_addr, _, _, _, addr = res
try:
sock = socket.socket(family_addr, socket.SOCK_DGRAM)
sock.settimeout(20.0)
except socket.error as msg:
print('Could not create socket')
print(os.strerror(msg.errno))
raise
try:
sock.sendto(payload.encode(), addr)
reply, addr = sock.recvfrom(128)
if not reply:
return ''
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply))
except socket.timeout:
print('Socket operation timeout')
return ''
except socket.error as msg:
print('Error while sending or receiving data from the socket')
print(os.strerror(msg.errno))
sock.close()
raise
return reply.decode()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('remote_ip', help='UDP server ip address, eg: 192.168.1.1, fe80::2%%eth0')
parser.add_argument('--port', default=DEF_PORT, type=int, help='UDP server port')
parser.add_argument('--message', default=DEF_MESSAGE, help='Message to send to the server.')
args = parser.parse_args()
print(f'Send message to the server: {args.remote_ip}')
data = udp_client(args.remote_ip, args.port, args.message)
print(f'Received From server: {data}')
if __name__ == '__main__':
main()

Wyświetl plik

@ -0,0 +1,80 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import argparse
import socket
from threading import Event, Thread
DEF_PORT = 3333
class UdpServer:
def __init__(self, port, family_addr, persist=False, timeout=60): # type: ignore
self.port = port
self.family_addr = family_addr
self.socket = socket.socket(family_addr, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.settimeout(timeout)
self.shutdown = Event()
self.persist = persist
self.server_thread = None
def __enter__(self): # type: ignore
try:
self.socket.bind(('', self.port))
except socket.error as e:
print('Bind failed:{}'.format(e))
raise
print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr))
self.server_thread = Thread(target=self.run_server)
self.server_thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
if self.persist:
sock = socket.socket(self.family_addr, socket.SOCK_DGRAM)
sock.sendto(b'Stop', ('localhost', self.port))
sock.close()
self.shutdown.set()
self.server_thread.join()
self.socket.close()
def run_server(self) -> None:
while not self.shutdown.is_set():
try:
data, addr = self.socket.recvfrom(1024)
print(addr)
if not data:
return
data = data.decode()
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data)
reply = 'OK: ' + data
self.socket.sendto(reply.encode(), addr)
except socket.timeout:
print(f'socket recvfrom timeout ({self.socket.timeout}s)')
except socket.error as e:
print('Running server failed:{}'.format(e))
raise
if not self.persist:
break
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--port', default=DEF_PORT, type=int, help='UDP server port')
parser.add_argument('--ipv6', action='store_true', help='Create IPv6 server.')
parser.add_argument('--timeout', default=10, type=int, help='socket recvfrom timeout.')
args = parser.parse_args()
if args.ipv6:
family = socket.AF_INET6
else:
family = socket.AF_INET
with UdpServer(args.port, family, persist=True, timeout=args.timeout):
input('Server Running. Press Enter or CTRL-C to exit...\n')
if __name__ == '__main__':
main()

Wyświetl plik

@ -41,27 +41,13 @@ In order to create TCP server that communicates with TCP Client example, choose
There are many host-side tools which can be used to interact with the UDP/TCP server/client.
One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets.
In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples.
Ref to the [upper level README](../README.md#host-tools) for more information.
### TCP server using netcat
```
nc -l 192.168.0.167 3333
```
### Python scripts
Script example_test.py could be used as a counter part to the tcp-client project, ip protocol name (IPv4 or IPv6) shall be stated as argument.
Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported.
Please run the following commands to configure the terminal to execute the script.
```
export PYTHONPATH="$IDF_PATH/tools:$IDF_PATH/tools/ci/python_packages"
python -m pip install -r $IDF_PATH/tools/ci/python_packages/ttfw_idf/requirements.txt
```
Example:
```
python example_test.py IPv4
```
## Hardware Required
This example can be run on any commonly available ESP32 development board.

Wyświetl plik

@ -1,129 +0,0 @@
# This example code is in the Public Domain (or CC0 licensed, at your option.)
# Unless required by applicable law or agreed to in writing, this
# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied.
# -*- coding: utf-8 -*-
import os
import re
import socket
import sys
from threading import Event, Thread
import netifaces
import ttfw_idf
from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip
# ----------- Config ----------
PORT = 3333
# -------------------------------
class TcpServer:
def __init__(self, port, family_addr, persist=False):
self.port = port
self.socket = socket.socket(family_addr, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.settimeout(60.0)
self.shutdown = Event()
self.persist = persist
self.family_addr = family_addr
def __enter__(self):
try:
self.socket.bind(('', self.port))
except socket.error as e:
print('Bind failed:{}'.format(e))
raise
self.socket.listen(1)
print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr))
self.server_thread = Thread(target=self.run_server)
self.server_thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.persist:
sock = socket.socket(self.family_addr, socket.SOCK_STREAM)
sock.connect(('localhost', self.port))
sock.sendall(b'Stop', )
sock.close()
self.shutdown.set()
self.shutdown.set()
self.server_thread.join()
self.socket.close()
def run_server(self):
while not self.shutdown.is_set():
try:
conn, address = self.socket.accept() # accept new connection
print('Connection from: {}'.format(address))
conn.setblocking(1)
data = conn.recv(1024)
if not data:
return
data = data.decode()
print('Received data: ' + data)
reply = 'OK: ' + data
conn.send(reply.encode())
conn.close()
except socket.error as e:
print('Running server failed:{}'.format(e))
raise
if not self.persist:
break
@ttfw_idf.idf_example_test(env_tag='wifi_router')
def test_examples_protocol_socket_tcpclient(env, extra_data):
"""
steps:
1. join AP
2. have the board connect to the server
3. send and receive data
"""
dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_client', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'tcp_client.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('tcp_client_bin_size', '{}KB'.format(bin_size // 1024))
# start test
dut1.start_app()
if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
dut1.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut1.write(f'{ap_ssid} {ap_password}')
ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
my_interface = get_my_interface_by_dest_ip(ipv4)
# test IPv4
with TcpServer(PORT, socket.AF_INET):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET)
print('Connect tcp client to server IP={}'.format(server_ip))
dut1.write(server_ip)
dut1.expect(re.compile(r'OK: Message from ESP32'))
# test IPv6
with TcpServer(PORT, socket.AF_INET6):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6)
print('Connect tcp client to server IP={}'.format(server_ip))
dut1.write(server_ip)
dut1.expect(re.compile(r'OK: Message from ESP32'))
if __name__ == '__main__':
if sys.argv[1:] and sys.argv[1].startswith('IPv'): # if additional arguments provided:
# Usage: example_test.py <IPv4|IPv6>
family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET
with TcpServer(PORT, family_addr, persist=True) as s:
print(input('Press Enter stop the server...'))
else:
test_examples_protocol_socket_tcpclient()

Wyświetl plik

@ -0,0 +1,69 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
import socket
import pytest
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
get_my_interface_by_dest_ip)
from pytest_embedded import Dut
try:
from run_tcp_server import TcpServer
except ImportError:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
from run_tcp_server import TcpServer
PORT = 3333
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_tcp_client_ipv4(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print(f'Connected with IPv4={ipv4}')
# test IPv4
with TcpServer(PORT, socket.AF_INET):
server_ip = get_host_ip4_by_dest_ip(ipv4)
print('Connect tcp client to server IP={}'.format(server_ip))
dut.write(server_ip)
dut.expect('OK: Message from ESP32')
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_tcp_client_ipv6(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
# test IPv6
my_interface = get_my_interface_by_dest_ip(ipv4)
with TcpServer(PORT, socket.AF_INET6):
server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
print('Connect tcp client to server IP={}'.format(server_ip))
dut.write(server_ip)
dut.expect('OK: Message from ESP32')

Wyświetl plik

@ -16,23 +16,13 @@ There are many host-side tools which can be used to interact with the UDP/TCP se
One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets.
Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following command.
In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples.
Ref to the [upper level README](../README.md#host-tools) for more information.
### TCP client using netcat
```
nc 192.168.0.167 3333
```
### Python scripts
Script example_test.py could be used as a counter part to the tcp-server application,
IP address and the message to be send to the server shall be stated as arguments. Example:
```
python example_test.py 192.168.0.167 Message
```
Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported;
please add `$IDF_PATH/tools/ci/python_packages` to `PYTHONPATH`.
## Hardware Required
This example can be run on any commonly available ESP32 development board.

Wyświetl plik

@ -1,96 +0,0 @@
# This example code is in the Public Domain (or CC0 licensed, at your option.)
# Unless required by applicable law or agreed to in writing, this
# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied.
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import os
import re
import socket
import sys
import ttfw_idf
from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
# ----------- Config ----------
PORT = 3333
# -------------------------------
def tcp_client(address, payload):
for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
family_addr, socktype, proto, canonname, addr = res
try:
sock = socket.socket(family_addr, socket.SOCK_STREAM)
sock.settimeout(60.0)
except socket.error as msg:
print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1])
raise
try:
sock.connect(addr)
except socket.error as msg:
print('Could not open socket: ', msg)
sock.close()
raise
sock.sendall(payload.encode())
data = sock.recv(1024)
if not data:
return
print('Reply : ' + data.decode())
sock.close()
return data.decode()
@ttfw_idf.idf_example_test(env_tag='wifi_router')
def test_examples_protocol_socket_tcpserver(env, extra_data):
MESSAGE = 'Data to ESP'
"""
steps:
1. join AP
2. have the board connect to the server
3. send and receive data
"""
dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_server', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'tcp_server.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('tcp_server_bin_size', '{}KB'.format(bin_size // 1024))
# start test
dut1.start_app()
if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
dut1.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut1.write(f'{ap_ssid} {ap_password}')
ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
interface = get_my_interface_by_dest_ip(ipv4)
# test IPv4
received = tcp_client(ipv4, MESSAGE)
if not received == MESSAGE:
raise
dut1.expect(MESSAGE)
# test IPv6
received = tcp_client('{}%{}'.format(ipv6, interface), MESSAGE)
if not received == MESSAGE:
raise
dut1.expect(MESSAGE)
if __name__ == '__main__':
if sys.argv[2:]: # if two arguments provided:
# Usage: example_test.py <server_address> <message_to_send_to_server>
tcp_client(sys.argv[1], sys.argv[2])
else: # otherwise run standard example test as in the CI
test_examples_protocol_socket_tcpserver()

Wyświetl plik

@ -0,0 +1,69 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
import time
import pytest
from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
from pytest_embedded import Dut
try:
from run_tcp_client import tcp_client
except ImportError:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
from run_tcp_client import tcp_client
PORT = 3333
MESSAGE = 'Data to ESP'
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_tcp_server_ipv4(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print(f'Connected with IPv4={ipv4}')
time.sleep(1)
# test IPv4
received = tcp_client(ipv4, PORT, MESSAGE)
if not received == MESSAGE:
raise
dut.expect(MESSAGE)
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_tcp_server_ipv6(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
time.sleep(1)
interface = get_my_interface_by_dest_ip(ipv4)
# test IPv6
received = tcp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
if not received == MESSAGE:
raise
dut.expect(MESSAGE)

Wyświetl plik

@ -16,7 +16,7 @@ There are many host-side tools which can be used to interact with the UDP/TCP se
One command line tool is [netcat](http://netcat.sourceforge.net) which can send and receive many kinds of packets.
Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following commands.
In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples.
Ref to the [upper level README](../README.md#host-tools) for more information.
### Send UDP packet via netcat
```
@ -33,16 +33,6 @@ echo "Hello from PC" | nc -w1 -u 192.168.0.167 3333
nc -u -l 192.168.0.167 3333
```
### Python scripts
Script example_test.py could be used as a counter part to the udp-client application, ip protocol name (IPv4 or IPv6) shall be stated as argument. Example:
```
python example_test.py IPv4
```
Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported;
please add `$IDF_PATH/tools/ci/python_packages` to `PYTHONPATH`.
## Hardware Required
This example can be run on any commonly available ESP32 development board.

Wyświetl plik

@ -1,138 +0,0 @@
# This example code is in the Public Domain (or CC0 licensed, at your option.)
# Unless required by applicable law or agreed to in writing, this
# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied.
# -*- coding: utf-8 -*-
import os
import re
import socket
import sys
from threading import Event, Thread
import netifaces
import ttfw_idf
from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip
from tiny_test_fw.DUT import ExpectTimeout
# ----------- Config ----------
PORT = 3333
# -------------------------------
class UdpServer:
def __init__(self, port, family_addr, persist=False):
self.port = port
self.family_addr = family_addr
self.socket = socket.socket(family_addr, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.settimeout(60.0)
self.shutdown = Event()
self.persist = persist
def __enter__(self):
try:
self.socket.bind(('', self.port))
except socket.error as e:
print('Bind failed:{}'.format(e))
raise
print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr))
self.server_thread = Thread(target=self.run_server)
self.server_thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.persist:
sock = socket.socket(self.family_addr, socket.SOCK_DGRAM)
sock.sendto(b'Stop', ('localhost', self.port))
sock.close()
self.shutdown.set()
self.server_thread.join()
self.socket.close()
def run_server(self):
while not self.shutdown.is_set():
try:
data, addr = self.socket.recvfrom(1024)
print(addr)
if not data:
return
data = data.decode()
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data)
reply = 'OK: ' + data
self.socket.sendto(reply.encode(), addr)
except socket.error as e:
print('Running server failed:{}'.format(e))
raise
if not self.persist:
break
@ttfw_idf.idf_example_test(env_tag='wifi_router')
def test_examples_protocol_socket_udpclient(env, extra_data):
"""
steps:
1. join AP
2. have the board connect to the server
3. send and receive data
"""
dut1 = env.get_dut('udp_client', 'examples/protocols/sockets/udp_client', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'udp_client.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('udp_client_bin_size', '{}KB'.format(bin_size // 1024))
# start test
dut1.start_app()
if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
dut1.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut1.write(f'{ap_ssid} {ap_password}')
ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
my_interface = get_my_interface_by_dest_ip(ipv4)
# test IPv4
with UdpServer(PORT, socket.AF_INET):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET)
print('Connect udp client to server IP={}'.format(server_ip))
for _ in range(3):
try:
dut1.write(server_ip)
dut1.expect(re.compile(r'OK: Message from ESP32'))
break
except ExpectTimeout:
pass
else:
raise ValueError('Failed to send/recv udp packets.')
# test IPv6
with UdpServer(PORT, socket.AF_INET6):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6)
print('Connect udp client to server IP={}'.format(server_ip))
for _ in range(3):
try:
dut1.write(server_ip)
dut1.expect(re.compile(r'OK: Message from ESP32'))
break
except ExpectTimeout:
pass
else:
raise ValueError('Failed to send/recv udp packets.')
if __name__ == '__main__':
if sys.argv[1:] and sys.argv[1].startswith('IPv'): # if additional arguments provided:
# Usage: example_test.py <IPv4|IPv6>
family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET
with UdpServer(PORT, family_addr, persist=True) as s:
print(input('Press Enter stop the server...'))
else:
test_examples_protocol_socket_udpclient()

Wyświetl plik

@ -0,0 +1,85 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
import socket
import pytest
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
get_my_interface_by_dest_ip)
from pexpect.exceptions import TIMEOUT
from pytest_embedded import Dut
try:
from run_udp_server import UdpServer
except ImportError:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
from run_udp_server import UdpServer
PORT = 3333
MAX_RETRIES = 3
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_udp_client_ipv4(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print(f'Connected with IPv4={ipv4}')
# test IPv4
with UdpServer(PORT, socket.AF_INET):
server_ip = get_host_ip4_by_dest_ip(ipv4)
print('Connect udp client to server IP={}'.format(server_ip))
for _ in range(MAX_RETRIES):
try:
dut.write(server_ip)
dut.expect('OK: Message from ESP32')
break
except TIMEOUT:
pass
else:
raise ValueError('Failed to send/recv udp packets.')
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_udp_client_ipv6(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
interface = get_my_interface_by_dest_ip(ipv4)
# test IPv6
with UdpServer(PORT, socket.AF_INET6):
server_ip = get_host_ip6_by_dest_ip(ipv6, interface)
print('Connect udp client to server IP={}'.format(server_ip))
for _ in range(MAX_RETRIES):
try:
dut.write(server_ip)
dut.expect('OK: Message from ESP32')
break
except TIMEOUT:
pass
else:
raise ValueError('Failed to send/recv udpv6 packets.')

Wyświetl plik

@ -17,7 +17,7 @@ One command line tool is [netcat](http://netcat.sourceforge.net) which can send
Note: please replace `192.168.0.167 3333` with desired IPV4/IPV6 address (displayed in monitor console) and port number in the following commands.
If want to use this RECVINFO function, please enable LWIP_NETBUF_RECVINFO in menuconfig,this function can only resolve the destination address of IPV4.
In addition to those tools, simple Python scripts can be found under sockets/scripts directory. Every script is designed to interact with one of the examples.
Ref to the [upper level README](../README.md#host-tools) for more information.
### Send UDP packet via netcat
```
@ -34,16 +34,6 @@ echo "Hello from PC" | nc -w1 -u 192.168.0.167 3333
nc -u 192.168.0.167 3333
```
### Python scripts
Script example_test.py could be used as a counter part to the udp-server application,
IP address and the message to be send to the server shall be stated as arguments. Example:
```
python example_test.py 192.168.0.167 Message
```
Note that this script is used in automated tests, as well, so the IDF test framework packages need to be imported;
please add `$IDF_PATH/tools/ci/python_packages` to `PYTHONPATH`.
## Hardware Required
This example can be run on any commonly available ESP32 development board.

Wyświetl plik

@ -1,112 +0,0 @@
# This example code is in the Public Domain (or CC0 licensed, at your option.)
# Unless required by applicable law or agreed to in writing, this
# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied.
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import os
import re
import socket
import sys
import ttfw_idf
from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
# ----------- Config ----------
PORT = 3333
# -------------------------------
def udp_client(address, payload):
for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC,
socket.SOCK_DGRAM, 0, socket.AI_PASSIVE):
family_addr, socktype, proto, canonname, addr = res
try:
sock = socket.socket(family_addr, socket.SOCK_DGRAM)
sock.settimeout(20.0)
except socket.error as msg:
print('Could not create socket')
print(os.strerror(msg.errno))
raise
try:
sock.sendto(payload.encode(), addr)
reply, addr = sock.recvfrom(128)
if not reply:
return
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply))
except socket.timeout:
print('Socket operation timeout')
return str(None)
except socket.error as msg:
print('Error while sending or receiving data from the socket')
print(os.strerror(msg.errno))
sock.close()
raise
return reply.decode()
@ttfw_idf.idf_example_test(env_tag='wifi_router')
def test_examples_protocol_socket_udpserver(env, extra_data):
MESSAGE = 'Data to ESP'
MAX_RETRIES = 3
"""
steps:
1. join AP
2. have the board connect to the server
3. send and receive data
"""
dut1 = env.get_dut('udp_server', 'examples/protocols/sockets/udp_server', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'udp_server.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('udp_server_bin_size', '{}KB'.format(bin_size // 1024))
# start test
dut1.start_app()
if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'):
dut1.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut1.write(f'{ap_ssid} {ap_password}')
ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0]
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
dut1.expect(re.compile(r'Waiting for data'), timeout=10)
interface = get_my_interface_by_dest_ip(ipv4)
# test IPv4
for _ in range(MAX_RETRIES):
print('Testing UDP on IPv4...')
received = udp_client(ipv4, MESSAGE)
if received == MESSAGE:
print('OK')
break
else:
raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
dut1.expect(MESSAGE)
# test IPv6
for _ in range(MAX_RETRIES):
print('Testing UDP on IPv6...')
received = udp_client('{}%{}'.format(ipv6, interface), MESSAGE)
if received == MESSAGE:
print('OK')
break
else:
raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
dut1.expect(MESSAGE)
if __name__ == '__main__':
if sys.argv[2:]: # if two arguments provided:
# Usage: example_test.py <server_address> <message_to_send_to_server>
udp_client(sys.argv[1], sys.argv[2])
else: # otherwise run standard example test as in the CI
test_examples_protocol_socket_udpserver()

Wyświetl plik

@ -0,0 +1,77 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
import pytest
from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
from pytest_embedded import Dut
try:
from run_udp_client import udp_client
except ImportError:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
from run_udp_client import udp_client
PORT = 3333
MESSAGE = 'Data to ESP'
MAX_RETRIES = 3
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_udp_server_ipv4(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print(f'Connected with IPv4={ipv4}')
# test IPv4
for _ in range(MAX_RETRIES):
print('Testing UDP on IPv4...')
received = udp_client(ipv4, PORT, MESSAGE)
if received == MESSAGE:
print('OK')
break
else:
raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
dut.expect(MESSAGE)
@pytest.mark.esp32
@pytest.mark.wifi_router
def test_examples_udp_server_ipv6(dut: Dut) -> None:
# Parse IP address of STA
logging.info('Waiting to connect with AP')
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
env_name = 'wifi_router'
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
interface = get_my_interface_by_dest_ip(ipv4)
# test IPv6
for _ in range(MAX_RETRIES):
print('Testing UDP on IPv6...')
received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
if received == MESSAGE:
print('OK')
break
else:
raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
dut.expect(MESSAGE)

Wyświetl plik

@ -4,7 +4,7 @@
import logging
import os
import socket
from typing import Any
from typing import Any, List
import netifaces
import yaml
@ -26,10 +26,23 @@ $IDF_PATH/EnvConfig.yml:
def get_host_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str:
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
assert isinstance(host_ip, str)
return host_ip
if ip_type == netifaces.AF_INET:
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
assert isinstance(host_ip, str)
return host_ip
elif ip_type == netifaces.AF_INET6:
ip6_addrs: List[str] = []
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
assert isinstance(host_ip, str)
# prefer to use link local address due to example settings
if host_ip.startswith('FE80::'):
ip6_addrs.insert(0, host_ip)
else:
ip6_addrs.append(host_ip)
if ip6_addrs:
return ip6_addrs[0]
return ''
@ -45,6 +58,17 @@ def get_host_ip4_by_dest_ip(dest_ip: str = '') -> str:
return host_ip
def get_host_ip6_by_dest_ip(dest_ip: str, interface: str) -> str:
addr_info = socket.getaddrinfo(f'{dest_ip}%{interface}', 80, socket.AF_INET6, socket.SOCK_DGRAM)
s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s1.connect(addr_info[0][-1])
host_ip = s1.getsockname()[0]
s1.close()
assert isinstance(host_ip, str)
print(f'Using host ip: {host_ip}')
return host_ip
def get_my_interface_by_dest_ip(dest_ip: str = '') -> str:
my_ip = get_host_ip4_by_dest_ip(dest_ip)
interfaces = netifaces.interfaces()