Porównaj commity

...

41 Commity

Autor SHA1 Wiadomość Data
Marceau Fillon 951ff7a5a9
Merge a973883985 into 45ead11f96 2024-04-22 20:03:50 +02:00
Damien George 45ead11f96 ssl: Use "from tls import *" to be compatible with axtls.
axtls doesn't define all the CERT_xxx constants, nor the MBEDTLS_VERSION
constant.

This change means that `tls.SSLContext` is imported into the module, but
that's subsequently overridden by the class definition in this module.

Signed-off-by: Damien George <damien@micropython.org>
2024-03-28 17:44:37 +11:00
iabdalkader 661efa48f0 senml: Use the updated cbor2 API.
Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
2024-03-19 17:29:22 +11:00
iabdalkader 8ee876dcd6 cbor2: Deprecate decoder and encoder modules.
Deprecate decoder and encoder modules to maintain compatibility with the
CPython cbor2 module.

Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
2024-03-19 17:28:35 +11:00
Jim Mussared 5c7e3fc0bc json: Move to unix-ffi.
It requires the unix pcre-based re module.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2024-03-19 17:23:07 +11:00
Damien George 23df50d0ea unix-ffi: Remove "unix_ffi" argument from require().
And describe how to use `add_library()` instead.

Signed-off-by: Damien George <damien@micropython.org>
2024-03-17 13:22:36 +11:00
Damien George ffb07dbce5 gzip: Fix recursion error in open() function.
And give the `mode` parameter a default, matching CPython.

Signed-off-by: Damien George <damien@micropython.org>
2024-02-29 14:54:24 +11:00
Angus Gratton 224246531e lora-sx126x: Clean up some struct formatting.
Changes are cosmetic - and maybe very minor code size - but not functional.
_reg_read() was calling struct.packinto() with an incorrect number of
arguments but it seems like MicroPython didn't mind, as result is correct
for both versions.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:58:38 +11:00
Angus Gratton 35bb7952ba lora-sx126x: Fix syncword setting.
Fixes issue #796.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:58:08 +11:00
Angus Gratton 546284817a lora-sx127x: Implement missing syncword support.
This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:58:06 +11:00
Angus Gratton ad6ab5a78c lora-sync: Fix race with fast or failed send().
If send completes before the first call to poll_send(), the driver could
get stuck in _sync_wait(). This had much less impact before rp2 port went
tickless, as _sync_wait(will_irq=True) calls machine.idle() which may not
wake very frequently on a tickless port.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:57:50 +11:00
Angus Gratton b712103519 lora-sx126x: Fix invalid default configuration after reset.
According to the docs, only freq_khz was needed for working output.
However:

- Without output_power setting, no output from SX1262 antenna (theory:
  output routed to the SX1261 antenna).

- SF,BW,etc. settings were different from the SX127x power on defaults, so
  modems with an identical configuration were unable to communicate.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:44:57 +11:00
Angus Gratton 4cc67065dd tools/ci.sh: Add unix-ffi library when testing unix-ffi subdirectory.
Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-19 23:21:04 +11:00
ubi de feo 8058b2935b tarfile-write: Fix permissions when adding to archive.
Signed-off-by: ubi de feo <me@ubidefeo.com>
2024-02-09 10:43:05 +11:00
Carlosgg 56f514f569 aiohttp: Fix binary data treatment.
- Fix binary data `Content-type` header and data `Content-Length`
  calculation.

- Fix query length when data is included.

- Fix `json` and `text` methods of `ClientResponse` to read
  `Content-Length` size

Signed-off-by: Carlos Gil <carlosgilglez@gmail.com>
2024-02-08 19:02:26 +11:00
Adam Knowles ddb1a27957 hmac: Fix passing in a string for digestmod argument.
The built-in `hashlib` module does not have a `.new` method (although the
Python version in this repository does).
2024-02-07 12:45:03 +11:00
Felix Dörre 35d41dbb0e ssl: Restructure micropython SSL interface to a new tls module.
MicroPython now supplies SSL/TLS functionality in a new built-in `tls`
module.  The `ssl` module is now implemented purely in Python, in this
repository.  Other libraries are updated to work with this scheme.

Signed-off-by: Felix Dörre <felix@dogcraft.de>
2024-02-07 12:12:13 +11:00
Felix Dörre 803452a1ac umqtt.simple: Simplify check for user being unused.
There don't seem to be any MQTT implementations that expect an empty
username (instead of the field missing), so the check for unused `user` can
be simplified.

Signed-off-by: Felix Dörre <felix@dogcraft.de>
2024-02-07 12:12:09 +11:00
Carlosgg 7cdf708815 aiohttp: Add new aiohttp package.
Implement `aiohttp` with `ClientSession`, websockets and `SSLContext`
support.

Only client is implemented and API is mostly compatible with CPython
`aiohttp`.

Signed-off-by: Carlos Gil <carlosgilglez@gmail.com>
2023-12-20 16:26:04 +11:00
Bhavesh Kakwani 57ce3ba95c aioble: Fix advertising variable name to use us not ms. 2023-12-20 15:22:21 +11:00
Mark Blakeney 9ceda53180 uaiohttpclient: Update example client code.
Signed-off-by: Mark Blakeney <mark.blakeney@bullet-systems.net>
2023-12-20 14:56:09 +11:00
Mark Blakeney 05efdd03a7 uaiohttpclient: Update "yield from" to "await".
Signed-off-by: Mark Blakeney <mark.blakeney@bullet-systems.net>
2023-12-20 14:56:04 +11:00
Mark Blakeney 9d09cdd4af uaiohttpclient: Make flake8 inspired improvements.
Signed-off-by: Mark Blakeney <mark.blakeney@bullet-systems.net>
2023-12-20 14:56:00 +11:00
Mark Blakeney 149226d3f7 uaiohttpclient: Fix hard coded port 80.
Signed-off-by: Mark Blakeney <mark.blakeney@bullet-systems.net>
2023-12-20 14:55:51 +11:00
scivision ae8ea8d113 os-path: Implement os.path.isfile().
Signed-off-by: Michael Hirsch <michael@scivision.dev>
2023-12-20 14:46:57 +11:00
Andrew Leech f672baa92b aiorepl: Add support for raw mode (ctrl-a).
Provides support for mpremote features like cp and mount.

Signed-off-by: Andrew Leech <andrew.leech@planetinnovation.com.au>
2023-12-20 12:35:40 +11:00
Andrew Leech 10c9281dad aiorepl: Add cursor left/right support.
Allows modifying current line, adding/deleting characters in the middle
etc.  Includes home/end keys to move to start/end of current line.

Signed-off-by: Andrew Leech <andrew.leech@planetinnovation.com.au>
2023-12-20 12:33:43 +11:00
Andrew Leech d41851ca72 aiorepl: Add support for paste mode (ctrl-e).
Signed-off-by: Andrew Leech <andrew.leech@planetinnovation.com.au>
2023-12-20 12:33:19 +11:00
Andrew Leech e051a120bc aiorepl: Update import of asyncio.
Signed-off-by: Andrew Leech <andrew.leech@planetinnovation.com.au>
2023-12-20 12:33:16 +11:00
Yu Ting 41aa257a31 base64: Implement custom maketrans and translate methods.
Re-implemented bytes.maketrans() and bytes.translate() as there are no such
functions in MicroPython.
2023-12-20 12:26:13 +11:00
Matt Trentini 340243e205 time: Add README to explain the purpose of the time extension library.
Signed-off-by: Matt Trentini <matt.trentini@gmail.com>
2023-12-20 12:05:35 +11:00
Jim Mussared 83f3991f41 lcd160cr: Remove support for options in manifest.
This is the last remaining use of the "options" feature. Nothing in the
main repo which `require()`'s this package sets it.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2023-12-20 12:01:22 +11:00
Jim Mussared cee0945f1c all: Replace "black" with "ruff format".
- Add config for [tool.ruff.format] to pyproject.toml.
- Update pre-commit to run both ruff and ruff-format.
- Update a small number of files that change with ruff's rules.
- Update CI.
- Simplify codeformat.py just forward directly to "ruff format"

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2023-12-20 11:56:24 +11:00
Jim Mussared ad0a2590cc tools/verifygitlog.py: Add git commit message checking.
This adds verifygitlog.py from the main repo, adds it to GitHub workflows,
and also pre-commit.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2023-12-20 11:46:40 +11:00
Christian Marangi d8e163bb5f unix-ffi/re: Convert to PCRE2.
PCRE is marked as EOL and won't receive any new security update.

Convert the re module to PCRE2 API to enforce security.  Additional
dependency is now needed with uctypes due to changes in how PCRE2 return
the match_data in a pointer and require special handling.

The converted module is tested with the test_re.py with no regression.

Signed-off-by: Christian Marangi <ansuelsmth@gmail.com>
2023-11-01 10:14:36 +11:00
Jim Mussared 0620d02290 .github/workflows/ruff.yml: Pin to 0.1.0.
The `--format` flag was changed to `--output-format` in the recent update.

Pin to this version to prevent further updates from breaking (e.g. through new rules or other changes).

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2023-10-26 11:48:26 +11:00
Brian Whitman e025c843b6 requests: Fix detection of iterators in chunked data requests.
Chunked detection does not work as generators never have an `__iter__`
attribute.  They do have `__next__`.

Example that now works with this commit:

    def read_in_chunks(file_object, chunk_size=4096):
        while True:
            data = file_object.read(chunk_size)
            if not data:
                break
            yield data

    file = open(filename, "rb")
    r = requests.post(url, data=read_in_chunks(file))
2023-10-05 10:42:14 +11:00
Jim Mussared 46748d2817 aioble/server.py: Allow BufferedCharacteristic to support all ops.
Previously a BufferedCharacteristic could only be read by the client, where
it should have been writeable. This makes it support all ops (read / write
/ write-with-response, etc).

Adds a test to check the max_len and append functionality of
BufferedCharacteristic.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2023-10-04 14:47:38 +11:00
Jim Mussared e5ba864470 aioble/server.py: Add data arg for indicate.
In micropython/micropython#11239 we added support for passing data to
gatts_indicate (to make it match gatts_notify).

This adds the same to aioble.

Also update the documentation to mention this (and fix some mistakes and
add a few more examples).

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2023-10-04 14:37:26 +11:00
Matthias Urlichs 55d1d23d6f __future__: Add "annotations".
MicroPython ignores types anyway.
2023-10-04 12:53:52 +11:00
Marceau Fillon a973883985 uaescmac: New module for aes cypher and aes-cmac algorithm
New module to provide aes cypher and aes-cmac algorithm.
This module is based on a pure python solution and adapted
to match the other micropython hashers methods.
This type of algorithm is used to compute the database hash
applied to Bluetooth services definition.

Signed-off-by: Marceau Fillon <marceau.fillon@planetinnovation.com.au>
2021-09-09 10:09:55 +10:00
116 zmienionych plików z 3117 dodań i 402 usunięć

Wyświetl plik

@ -1,16 +0,0 @@
name: Check code formatting
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- name: Install packages
run: source tools/ci.sh && ci_code_formatting_setup
- name: Run code formatting
run: source tools/ci.sh && ci_code_formatting_run
- name: Check code formatting
run: git diff --exit-code

Wyświetl plik

@ -0,0 +1,18 @@
name: Check commit message formatting
on: [push, pull_request]
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: '100'
- uses: actions/setup-python@v4
- name: Check commit message formatting
run: source tools/ci.sh && ci_commit_formatting_run

Wyświetl plik

@ -1,10 +1,11 @@
# https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python
name: Python code lint with ruff
name: Python code lint and formatting with ruff
on: [push, pull_request]
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: pip install --user ruff
- run: ruff --format=github .
- uses: actions/checkout@v4
- run: pip install --user ruff==0.1.2
- run: ruff check --output-format=github .
- run: ruff format --diff .

Wyświetl plik

@ -1,11 +1,14 @@
repos:
- repo: local
hooks:
- id: codeformat
name: MicroPython codeformat.py for changed files
entry: tools/codeformat.py -v -f
- id: verifygitlog
name: MicroPython git commit message format checker
entry: tools/verifygitlog.py --check-file --ignore-rebase
language: python
verbose: true
stages: [commit-msg]
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.280
rev: v0.1.2
hooks:
- id: ruff
id: ruff-format

Wyświetl plik

@ -21,7 +21,7 @@ To use this library, you need to import the library and then start the REPL task
For example, in main.py:
```py
import uasyncio as asyncio
import asyncio
import aiorepl
async def demo():

Wyświetl plik

@ -5,7 +5,7 @@ from micropython import const
import re
import sys
import time
import uasyncio as asyncio
import asyncio
# Import statement (needs to be global, and does not return).
_RE_IMPORT = re.compile("^import ([^ ]+)( as ([^ ]+))?")
@ -19,6 +19,13 @@ _RE_ASSIGN = re.compile("[^=]=[^=]")
_HISTORY_LIMIT = const(5 + 1)
CHAR_CTRL_A = const(1)
CHAR_CTRL_B = const(2)
CHAR_CTRL_C = const(3)
CHAR_CTRL_D = const(4)
CHAR_CTRL_E = const(5)
async def execute(code, g, s):
if not code.strip():
return
@ -39,13 +46,11 @@ async def __code():
{}
__exec_task = asyncio.create_task(__code())
""".format(
code
)
""".format(code)
async def kbd_intr_task(exec_task, s):
while True:
if ord(await s.read(1)) == 0x03:
if ord(await s.read(1)) == CHAR_CTRL_C:
exec_task.cancel()
return
@ -104,7 +109,9 @@ async def task(g=None, prompt="--> "):
while True:
hist_b = 0 # How far back in the history are we currently.
sys.stdout.write(prompt)
cmd = ""
cmd: str = ""
paste = False
curs = 0 # cursor offset from end of cmd buffer
while True:
b = await s.read(1)
pc = c # save previous character
@ -114,11 +121,19 @@ async def task(g=None, prompt="--> "):
if c < 0x20 or c > 0x7E:
if c == 0x0A:
# LF
if paste:
sys.stdout.write(b)
cmd += b
continue
# If the previous character was also LF, and was less
# than 20 ms ago, this was likely due to CRLF->LFLF
# conversion, so ignore this linefeed.
if pc == 0x0A and time.ticks_diff(t, pt) < 20:
continue
if curs:
# move cursor to end of the line
sys.stdout.write("\x1B[{}C".format(curs))
curs = 0
sys.stdout.write("\n")
if cmd:
# Push current command.
@ -135,31 +150,45 @@ async def task(g=None, prompt="--> "):
elif c == 0x08 or c == 0x7F:
# Backspace.
if cmd:
cmd = cmd[:-1]
sys.stdout.write("\x08 \x08")
elif c == 0x02:
# Ctrl-B
if curs:
cmd = "".join((cmd[: -curs - 1], cmd[-curs:]))
sys.stdout.write(
"\x08\x1B[K"
) # move cursor back, erase to end of line
sys.stdout.write(cmd[-curs:]) # redraw line
sys.stdout.write("\x1B[{}D".format(curs)) # reset cursor location
else:
cmd = cmd[:-1]
sys.stdout.write("\x08 \x08")
elif c == CHAR_CTRL_A:
await raw_repl(s, g)
break
elif c == CHAR_CTRL_B:
continue
elif c == 0x03:
# Ctrl-C
if pc == 0x03 and time.ticks_diff(t, pt) < 20:
# Two very quick Ctrl-C (faster than a human
# typing) likely means mpremote trying to
# escape.
asyncio.new_event_loop()
return
elif c == CHAR_CTRL_C:
if paste:
break
sys.stdout.write("\n")
break
elif c == 0x04:
# Ctrl-D
elif c == CHAR_CTRL_D:
if paste:
result = await execute(cmd, g, s)
if result is not None:
sys.stdout.write(repr(result))
sys.stdout.write("\n")
break
sys.stdout.write("\n")
# Shutdown asyncio.
asyncio.new_event_loop()
return
elif c == CHAR_CTRL_E:
sys.stdout.write("paste mode; Ctrl-C to cancel, Ctrl-D to finish\n===\n")
paste = True
elif c == 0x1B:
# Start of escape sequence.
key = await s.read(2)
if key in ("[A", "[B"):
if key in ("[A", "[B"): # up, down
# Stash the current command.
hist[(hist_i - hist_b) % _HISTORY_LIMIT] = cmd
# Clear current command.
@ -175,12 +204,122 @@ async def task(g=None, prompt="--> "):
# Update current command.
cmd = hist[(hist_i - hist_b) % _HISTORY_LIMIT]
sys.stdout.write(cmd)
elif key == "[D": # left
if curs < len(cmd) - 1:
curs += 1
sys.stdout.write("\x1B")
sys.stdout.write(key)
elif key == "[C": # right
if curs:
curs -= 1
sys.stdout.write("\x1B")
sys.stdout.write(key)
elif key == "[H": # home
pcurs = curs
curs = len(cmd)
sys.stdout.write("\x1B[{}D".format(curs - pcurs)) # move cursor left
elif key == "[F": # end
pcurs = curs
curs = 0
sys.stdout.write("\x1B[{}C".format(pcurs)) # move cursor right
else:
# sys.stdout.write("\\x")
# sys.stdout.write(hex(c))
pass
else:
sys.stdout.write(b)
cmd += b
if curs:
# inserting into middle of line
cmd = "".join((cmd[:-curs], b, cmd[-curs:]))
sys.stdout.write(cmd[-curs - 1 :]) # redraw line to end
sys.stdout.write("\x1B[{}D".format(curs)) # reset cursor location
else:
sys.stdout.write(b)
cmd += b
finally:
micropython.kbd_intr(3)
async def raw_paste(s, g, window=512):
sys.stdout.write("R\x01") # supported
sys.stdout.write(bytearray([window & 0xFF, window >> 8, 0x01]).decode())
eof = False
idx = 0
buff = bytearray(window)
file = b""
while not eof:
for idx in range(window):
b = await s.read(1)
c = ord(b)
if c == CHAR_CTRL_C or c == CHAR_CTRL_D:
# end of file
sys.stdout.write(chr(CHAR_CTRL_D))
if c == CHAR_CTRL_C:
raise KeyboardInterrupt
file += buff[:idx]
eof = True
break
buff[idx] = c
if not eof:
file += buff
sys.stdout.write("\x01") # indicate window available to host
return file
async def raw_repl(s: asyncio.StreamReader, g: dict):
heading = "raw REPL; CTRL-B to exit\n"
line = ""
sys.stdout.write(heading)
while True:
line = ""
sys.stdout.write(">")
while True:
b = await s.read(1)
c = ord(b)
if c == CHAR_CTRL_A:
rline = line
line = ""
if len(rline) == 2 and ord(rline[0]) == CHAR_CTRL_E:
if rline[1] == "A":
line = await raw_paste(s, g)
break
else:
# reset raw REPL
sys.stdout.write(heading)
sys.stdout.write(">")
continue
elif c == CHAR_CTRL_B:
# exit raw REPL
sys.stdout.write("\n")
return 0
elif c == CHAR_CTRL_C:
# clear line
line = ""
elif c == CHAR_CTRL_D:
# entry finished
# indicate reception of command
sys.stdout.write("OK")
break
else:
# let through any other raw 8-bit value
line += b
if len(line) == 0:
# Normally used to trigger soft-reset but stay in raw mode.
# Fake it for aiorepl / mpremote.
sys.stdout.write("Ignored: soft reboot\n")
sys.stdout.write(heading)
try:
result = exec(line, g)
if result is not None:
sys.stdout.write(repr(result))
sys.stdout.write(chr(CHAR_CTRL_D))
except Exception as ex:
print(line)
sys.stdout.write(chr(CHAR_CTRL_D))
sys.print_exception(ex, sys.stdout)
sys.stdout.write(chr(CHAR_CTRL_D))

Wyświetl plik

@ -1,5 +1,5 @@
metadata(
version="0.1.1",
version="0.2.0",
description="Provides an asynchronous REPL that can run concurrently with an asyncio, also allowing await expressions.",
)

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="0.3.0")
metadata(version="0.4.1")
require("aioble-core")

Wyświetl plik

@ -70,7 +70,7 @@ Alternatively, install the `aioble` package, which will install everything.
Usage
-----
Passive scan for nearby devices for 5 seconds: (Observer)
#### Passive scan for nearby devices for 5 seconds: (Observer)
```py
async with aioble.scan(duration_ms=5000) as scanner:
@ -87,7 +87,7 @@ async with aioble.scan(duration_ms=5000, interval_us=30000, window_us=30000, act
print(result, result.name(), result.rssi, result.services())
```
Connect to a peripheral device: (Central)
#### Connect to a peripheral device: (Central)
```py
# Either from scan result
@ -101,14 +101,14 @@ except asyncio.TimeoutError:
print('Timeout')
```
Register services and wait for connection: (Peripheral, Server)
#### Register services and wait for connection: (Peripheral, Server)
```py
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
_GENERIC_THERMOMETER = const(768)
_ADV_INTERVAL_MS = const(250000)
_ADV_INTERVAL_US = const(250000)
temp_service = aioble.Service(_ENV_SENSE_UUID)
temp_char = aioble.Characteristic(temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True)
@ -117,7 +117,7 @@ aioble.register_services(temp_service)
while True:
connection = await aioble.advertise(
_ADV_INTERVAL_MS,
_ADV_INTERVAL_US,
name="temp-sense",
services=[_ENV_SENSE_UUID],
appearance=_GENERIC_THERMOMETER,
@ -126,30 +126,95 @@ while True:
print("Connection from", device)
```
Update characteristic value: (Server)
#### Update characteristic value: (Server)
```py
# Write the local value.
temp_char.write(b'data')
temp_char.notify(b'optional data')
await temp_char.indicate(timeout_ms=2000)
```
Query the value of a characteristic: (Client)
```py
# Write the local value and notify/indicate subscribers.
temp_char.write(b'data', send_update=True)
```
#### Send notifications: (Server)
```py
# Notify with the current value.
temp_char.notify(connection)
```
```py
# Notify with a custom value.
temp_char.notify(connection, b'optional data')
```
#### Send indications: (Server)
```py
# Indicate with current value.
await temp_char.indicate(connection, timeout_ms=2000)
```
```py
# Indicate with custom value.
await temp_char.indicate(connection, b'optional data', timeout_ms=2000)
```
This will raise `GattError` if the indication is not acknowledged.
#### Wait for a write from the client: (Server)
```py
# Normal characteristic, returns the connection that did the write.
connection = await char.written(timeout_ms=2000)
```
```py
# Characteristic with capture enabled, also returns the value.
char = Characteristic(..., capture=True)
connection, data = await char.written(timeout_ms=2000)
```
#### Query the value of a characteristic: (Client)
```py
temp_service = await connection.service(_ENV_SENSE_UUID)
temp_char = await temp_service.characteristic(_ENV_SENSE_TEMP_UUID)
data = await temp_char.read(timeout_ms=1000)
```
#### Wait for a notification/indication: (Client)
```py
# Notification
data = await temp_char.notified(timeout_ms=1000)
```
```py
# Indication
data = await temp_char.indicated(timeout_ms=1000)
```
#### Subscribe to a characteristic: (Client)
```py
# Subscribe for notification.
await temp_char.subscribe(notify=True)
while True:
data = await temp_char.notified()
```
Open L2CAP channels: (Listener)
```py
# Subscribe for indication.
await temp_char.subscribe(indicate=True)
while True:
data = await temp_char.indicated()
```
#### Open L2CAP channels: (Listener)
```py
channel = await connection.l2cap_accept(_L2CAP_PSN, _L2CAP_MTU)
@ -158,7 +223,7 @@ n = channel.recvinto(buf)
channel.send(b'response')
```
Open L2CAP channels: (Initiator)
#### Open L2CAP channels: (Initiator)
```py
channel = await connection.l2cap_connect(_L2CAP_PSN, _L2CAP_MTU)

Wyświetl plik

@ -257,7 +257,7 @@ class Characteristic(BaseCharacteristic):
raise ValueError("Not supported")
ble.gatts_notify(connection._conn_handle, self._value_handle, data)
async def indicate(self, connection, timeout_ms=1000):
async def indicate(self, connection, data=None, timeout_ms=1000):
if not (self.flags & _FLAG_INDICATE):
raise ValueError("Not supported")
if self._indicate_connection is not None:
@ -270,7 +270,7 @@ class Characteristic(BaseCharacteristic):
try:
with connection.timeout(timeout_ms):
ble.gatts_indicate(connection._conn_handle, self._value_handle)
ble.gatts_indicate(connection._conn_handle, self._value_handle, data)
await self._indicate_event.wait()
if self._indicate_status != 0:
raise GattError(self._indicate_status)
@ -290,8 +290,8 @@ class Characteristic(BaseCharacteristic):
class BufferedCharacteristic(Characteristic):
def __init__(self, service, uuid, max_len=20, append=False):
super().__init__(service, uuid, read=True)
def __init__(self, *args, max_len=20, append=False, **kwargs):
super().__init__(*args, **kwargs)
self._max_len = max_len
self._append = append

Wyświetl plik

@ -3,7 +3,7 @@
# code. This allows (for development purposes) all the files to live in the
# one directory.
metadata(version="0.3.1")
metadata(version="0.4.1")
# Default installation gives you everything. Install the individual
# components (or a combination of them) if you want a more minimal install.

Wyświetl plik

@ -0,0 +1,139 @@
# Test characteristic read/write/notify from both GATTS and GATTC.
import sys
sys.path.append("")
from micropython import const
import time, machine
import uasyncio as asyncio
import aioble
import bluetooth
TIMEOUT_MS = 5000
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR1_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR2_UUID = bluetooth.UUID("00000000-1111-2222-3333-555555555555")
CHAR3_UUID = bluetooth.UUID("00000000-1111-2222-3333-666666666666")
# Acting in peripheral role.
async def instance0_task():
service = aioble.Service(SERVICE_UUID)
characteristic1 = aioble.BufferedCharacteristic(service, CHAR1_UUID, write=True)
characteristic2 = aioble.BufferedCharacteristic(service, CHAR2_UUID, write=True, max_len=40)
characteristic3 = aioble.BufferedCharacteristic(
service, CHAR3_UUID, write=True, max_len=80, append=True
)
aioble.register_services(service)
multitest.globals(BDADDR=aioble.config("mac"))
multitest.next()
# Wait for central to connect to us.
print("advertise")
connection = await aioble.advertise(
20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS
)
print("connected")
# The first will just see the second write (truncated).
await characteristic1.written(timeout_ms=TIMEOUT_MS)
await characteristic1.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic1.read())
# The second will just see the second write (still truncated because MTU
# exchange hasn't happened).
await characteristic2.written(timeout_ms=TIMEOUT_MS)
await characteristic2.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic2.read())
# MTU exchange should happen here.
# The second will now see the full second write.
await characteristic2.written(timeout_ms=TIMEOUT_MS)
await characteristic2.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic2.read())
# The third will see the two full writes concatenated.
await characteristic3.written(timeout_ms=TIMEOUT_MS)
await characteristic3.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic3.read())
# Wait for the central to disconnect.
await connection.disconnected(timeout_ms=TIMEOUT_MS)
print("disconnected")
def instance0():
try:
asyncio.run(instance0_task())
finally:
aioble.stop()
# Acting in central role.
async def instance1_task():
multitest.next()
# Connect to peripheral and then disconnect.
print("connect")
device = aioble.Device(*BDADDR)
connection = await device.connect(timeout_ms=TIMEOUT_MS)
# Discover characteristics.
service = await connection.service(SERVICE_UUID)
print("service", service.uuid)
characteristic1 = await service.characteristic(CHAR1_UUID)
print("characteristic1", characteristic1.uuid)
characteristic2 = await service.characteristic(CHAR2_UUID)
print("characteristic2", characteristic2.uuid)
characteristic3 = await service.characteristic(CHAR3_UUID)
print("characteristic3", characteristic3.uuid)
# Write to each characteristic twice, with a long enough value to trigger
# truncation.
print("write1")
await characteristic1.write(
"central1-aaaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic1.write(
"central1-bbbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
print("write2a")
await characteristic2.write(
"central2a-aaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic2.write(
"central2a-bbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
print("exchange mtu")
await connection.exchange_mtu(100)
print("write2b")
await characteristic2.write(
"central2b-aaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic2.write(
"central2b-bbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
print("write3")
await characteristic3.write(
"central3-aaaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic3.write(
"central3-bbbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
# Disconnect from peripheral.
print("disconnect")
await connection.disconnect(timeout_ms=TIMEOUT_MS)
print("disconnected")
def instance1():
try:
asyncio.run(instance1_task())
finally:
aioble.stop()

Wyświetl plik

@ -0,0 +1,21 @@
--- instance0 ---
advertise
connected
written b'central1-bbbbbbbbbbb'
written b'central2a-bbbbbbbbbb'
written b'central2b-bbbbbbbbbbbbbbbbbbbbbbbbbbbb'
written b'central3-aaaaaaaaaaaaaaaaaaaaaaaaaaaaacentral3-bbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
disconnected
--- instance1 ---
connect
service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
characteristic1 UUID('00000000-1111-2222-3333-444444444444')
characteristic2 UUID('00000000-1111-2222-3333-555555555555')
characteristic3 UUID('00000000-1111-2222-3333-666666666666')
write1
write2a
exchange mtu
write2b
write3
disconnect
disconnected

Wyświetl plik

@ -1,10 +1,11 @@
metadata(
version="0.1.0",
version="0.2.0",
description="Common networking packages for all network-capable deployments of MicroPython.",
)
require("mip")
require("ntptime")
require("ssl")
require("requests")
require("webrepl")

Wyświetl plik

@ -1,8 +1,3 @@
metadata(description="LCD160CR driver.", version="0.1.0")
options.defaults(test=False)
module("lcd160cr.py", opt=3)
if options.test:
module("lcd160cr_test.py", opt=3)

Wyświetl plik

@ -163,6 +163,9 @@ class _SX126x(BaseModem):
# 0x02 is 40us, default value appears undocumented but this is the SX1276 default
self._ramp_val = 0x02
# Configure the SX126x at least once after reset
self._configured = False
if reset:
# If the caller supplies a reset pin argument, reset the radio
reset.init(Pin.OUT, value=0)
@ -383,24 +386,24 @@ class _SX126x(BaseModem):
# see
# https://www.thethingsnetwork.org/forum/t/should-private-lorawan-networks-use-a-different-sync-word/34496/15
syncword = 0x0404 + ((syncword & 0x0F) << 4) + ((syncword & 0xF0) << 8)
self._cmd(">BBH", _CMD_WRITE_REGISTER, _REG_LSYNCRH, syncword)
self._cmd(">BHH", _CMD_WRITE_REGISTER, _REG_LSYNCRH, syncword)
if "output_power" in lora_cfg:
if not self._configured or any(
key in lora_cfg for key in ("output_power", "pa_ramp_us", "tx_ant")
):
pa_config_args, self._output_power = self._get_pa_tx_params(
lora_cfg["output_power"], lora_cfg.get("tx_ant", None)
lora_cfg.get("output_power", self._output_power), lora_cfg.get("tx_ant", None)
)
self._cmd("BBBBB", _CMD_SET_PA_CONFIG, *pa_config_args)
if "pa_ramp_us" in lora_cfg:
self._ramp_val = self._get_pa_ramp_val(
lora_cfg, [10, 20, 40, 80, 200, 800, 1700, 3400]
)
if "pa_ramp_us" in lora_cfg:
self._ramp_val = self._get_pa_ramp_val(
lora_cfg, [10, 20, 40, 80, 200, 800, 1700, 3400]
)
if "output_power" in lora_cfg or "pa_ramp_us" in lora_cfg:
# Only send the SetTxParams command if power level or PA ramp time have changed
self._cmd("BBB", _CMD_SET_TX_PARAMS, self._output_power, self._ramp_val)
if any(key in lora_cfg for key in ("sf", "bw", "coding_rate")):
if not self._configured or any(key in lora_cfg for key in ("sf", "bw", "coding_rate")):
if "sf" in lora_cfg:
self._sf = lora_cfg["sf"]
if self._sf < _CFG_SF_MIN or self._sf > _CFG_SF_MAX:
@ -441,6 +444,7 @@ class _SX126x(BaseModem):
self._reg_write(_REG_RX_GAIN, 0x96 if lora_cfg["rx_boost"] else 0x94)
self._check_error()
self._configured = True
def _invert_workaround(self, enable):
# Apply workaround for DS 15.4 Optimizing the Inverted IQ Operation
@ -465,7 +469,7 @@ class _SX126x(BaseModem):
# See DS 13.1.12 Calibrate Function
# calibParam 0xFE means to calibrate all blocks.
self._cmd("<BB", _CMD_CALIBRATE, 0xFE)
self._cmd("BB", _CMD_CALIBRATE, 0xFE)
time.sleep_us(_CALIBRATE_TYPICAL_TIME_US)
@ -541,7 +545,7 @@ class _SX126x(BaseModem):
else:
timeout = 0 # Single receive mode, no timeout
self._cmd(">BBH", _CMD_SET_RX, timeout >> 16, timeout)
self._cmd(">BBH", _CMD_SET_RX, timeout >> 16, timeout) # 24 bits
return self._dio1
@ -725,10 +729,10 @@ class _SX126x(BaseModem):
return res
def _reg_read(self, addr):
return self._cmd("BBBB", _CMD_READ_REGISTER, addr >> 8, addr & 0xFF, n_read=1)[0]
return self._cmd(">BHB", _CMD_READ_REGISTER, addr, 0, n_read=1)[0]
def _reg_write(self, addr, val):
return self._cmd("BBBB", _CMD_WRITE_REGISTER, addr >> 8, addr & 0xFF, val & 0xFF)
return self._cmd(">BHB", _CMD_WRITE_REGISTER, addr, val & 0xFF)
class _SX1262(_SX126x):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.1")
metadata(version="0.1.2")
require("lora")
package("lora")

Wyświetl plik

@ -519,6 +519,9 @@ class _SX127x(BaseModem):
self._reg_update(_REG_MODEM_CONFIG3, update_mask, modem_config3)
if "syncword" in lora_cfg:
self._reg_write(_REG_SYNC_WORD, lora_cfg["syncword"])
def _reg_write(self, reg, value):
self._cs(0)
if isinstance(value, int):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0")
metadata(version="0.1.1")
require("lora")
package("lora")

Wyświetl plik

@ -42,8 +42,8 @@ class SyncModem:
tx = True
while tx is True:
tx = self.poll_send()
self._sync_wait(will_irq)
tx = self.poll_send()
return tx
def recv(self, timeout_ms=None, rx_length=0xFF, rx_packet=None):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0")
metadata(version="0.1.1")
require("lora")
package("lora")

Wyświetl plik

@ -37,10 +37,11 @@ class BaseModem:
self._ant_sw = ant_sw
self._irq_callback = None
# Common configuration settings that need to be tracked by all modem drivers
# (Note that subclasses may set these to other values in their constructors, to match
# the power-on-reset configuration of a particular modem.)
# Common configuration settings that need to be tracked by all modem drivers.
#
# Where modem hardware sets different values after reset, the driver should
# set them back to these defaults (if not provided by the user), so that
# behaviour remains consistent between different modems using the same driver.
self._rf_freq_hz = 0 # Needs to be set via configure()
self._sf = 7 # Spreading factor
self._bw_hz = 125000 # Reset value

Wyświetl plik

@ -26,7 +26,7 @@ THE SOFTWARE.
from senml import *
import time
from cbor2 import decoder
import cbor2
pack = SenmlPack("device_name")
@ -38,5 +38,5 @@ while True:
cbor_val = pack.to_cbor()
print(cbor_val)
print(cbor_val.hex())
print(decoder.loads(cbor_val)) # convert to string again so we can print it.
print(cbor2.loads(cbor_val)) # convert to string again so we can print it.
time.sleep(1)

Wyświetl plik

@ -1,6 +1,6 @@
metadata(
description="SenML serialisation for MicroPython.",
version="0.1.0",
version="0.1.1",
pypi_publish="micropython-senml",
)

Wyświetl plik

@ -27,8 +27,7 @@ THE SOFTWARE.
from senml.senml_record import SenmlRecord
from senml.senml_base import SenmlBase
import json
from cbor2 import encoder
from cbor2 import decoder
import cbor2
class SenmlPackIterator:
@ -278,7 +277,7 @@ class SenmlPack(SenmlBase):
:param data: a byte array.
:return: None
"""
records = decoder.loads(data) # load the raw senml data
records = cbor2.loads(data) # load the raw senml data
naming_map = {
"bn": -2,
"bt": -3,
@ -320,7 +319,7 @@ class SenmlPack(SenmlBase):
}
converted = []
self._build_rec_dict(naming_map, converted)
return encoder.dumps(converted)
return cbor2.dumps(converted)
def add(self, item):
"""

Wyświetl plik

@ -1,31 +1,16 @@
#
# uaiohttpclient - fetch URL passed as command line argument.
#
import sys
import uasyncio as asyncio
import uaiohttpclient as aiohttp
def print_stream(resp):
print((yield from resp.read()))
return
while True:
line = yield from resp.readline()
if not line:
break
print(line.rstrip())
def run(url):
resp = yield from aiohttp.request("GET", url)
async def run(url):
resp = await aiohttp.request("GET", url)
print(resp)
yield from print_stream(resp)
print(await resp.read())
import sys
import logging
logging.basicConfig(level=logging.INFO)
url = sys.argv[1]
loop = asyncio.get_event_loop()
loop.run_until_complete(run(url))
loop.close()
asyncio.run(run(url))

Wyświetl plik

@ -1,4 +1,4 @@
metadata(description="HTTP client module for MicroPython uasyncio module", version="0.5.1")
metadata(description="HTTP client module for MicroPython uasyncio module", version="0.5.2")
# Originally written by Paul Sokolovsky.

Wyświetl plik

@ -5,8 +5,8 @@ class ClientResponse:
def __init__(self, reader):
self.content = reader
def read(self, sz=-1):
return (yield from self.content.read(sz))
async def read(self, sz=-1):
return await self.content.read(sz)
def __repr__(self):
return "<ClientResponse %d %s>" % (self.status, self.headers)
@ -17,22 +17,22 @@ class ChunkedClientResponse(ClientResponse):
self.content = reader
self.chunk_size = 0
def read(self, sz=4 * 1024 * 1024):
async def read(self, sz=4 * 1024 * 1024):
if self.chunk_size == 0:
l = yield from self.content.readline()
line = await self.content.readline()
# print("chunk line:", l)
l = l.split(b";", 1)[0]
self.chunk_size = int(l, 16)
line = line.split(b";", 1)[0]
self.chunk_size = int(line, 16)
# print("chunk size:", self.chunk_size)
if self.chunk_size == 0:
# End of message
sep = yield from self.content.read(2)
sep = await self.content.read(2)
assert sep == b"\r\n"
return b""
data = yield from self.content.read(min(sz, self.chunk_size))
data = await self.content.read(min(sz, self.chunk_size))
self.chunk_size -= len(data)
if self.chunk_size == 0:
sep = yield from self.content.read(2)
sep = await self.content.read(2)
assert sep == b"\r\n"
return data
@ -40,40 +40,46 @@ class ChunkedClientResponse(ClientResponse):
return "<ChunkedClientResponse %d %s>" % (self.status, self.headers)
def request_raw(method, url):
async def request_raw(method, url):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if ":" in host:
host, port = host.split(":")
port = int(port)
else:
port = 80
if proto != "http:":
raise ValueError("Unsupported protocol: " + proto)
reader, writer = yield from asyncio.open_connection(host, 80)
# Use protocol 1.0, because 1.1 always allows to use chunked transfer-encoding
# But explicitly set Connection: close, even though this should be default for 1.0,
# because some servers misbehave w/o it.
reader, writer = await asyncio.open_connection(host, port)
# Use protocol 1.0, because 1.1 always allows to use chunked
# transfer-encoding But explicitly set Connection: close, even
# though this should be default for 1.0, because some servers
# misbehave w/o it.
query = "%s /%s HTTP/1.0\r\nHost: %s\r\nConnection: close\r\nUser-Agent: compat\r\n\r\n" % (
method,
path,
host,
)
yield from writer.awrite(query.encode("latin-1"))
# yield from writer.aclose()
await writer.awrite(query.encode("latin-1"))
return reader
def request(method, url):
async def request(method, url):
redir_cnt = 0
redir_url = None
while redir_cnt < 2:
reader = yield from request_raw(method, url)
reader = await request_raw(method, url)
headers = []
sline = yield from reader.readline()
sline = await reader.readline()
sline = sline.split(None, 2)
status = int(sline[1])
chunked = False
while True:
line = yield from reader.readline()
line = await reader.readline()
if not line or line == b"\r\n":
break
headers.append(line)
@ -85,7 +91,7 @@ def request(method, url):
if 301 <= status <= 303:
redir_cnt += 1
yield from reader.aclose()
await reader.aclose()
continue
break

Wyświetl plik

@ -1,4 +1,4 @@
metadata(description="Lightweight MQTT client for MicroPython.", version="1.3.4")
metadata(description="Lightweight MQTT client for MicroPython.", version="1.4.0")
# Originally written by Paul Sokolovsky.

Wyświetl plik

@ -16,8 +16,7 @@ class MQTTClient:
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
ssl=None,
):
if port == 0:
port = 8883 if ssl else 1883
@ -26,7 +25,6 @@ class MQTTClient:
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
@ -67,15 +65,13 @@ class MQTTClient:
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
self.sock = self.ssl.wrap_socket(self.sock, server_hostname=self.server)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
if self.user:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
@ -101,7 +97,7 @@ class MQTTClient:
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
if self.user:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="0.6.0")
metadata(version="0.7.0")
# Originally written by Paul Sokolovsky.

Wyświetl plik

@ -12,7 +12,7 @@ def urlopen(url, data=None, method="GET"):
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
import tls
port = 443
else:
@ -29,7 +29,9 @@ def urlopen(url, data=None, method="GET"):
try:
s.connect(ai[-1])
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.verify_mode = tls.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
s.write(method)
s.write(b" /")

Wyświetl plik

@ -61,8 +61,10 @@ ignore = [
"F401",
"F403",
"F405",
"E501",
"F541",
"F841",
"ISC001",
"ISC003", # micropython does not support implicit concatenation of f-strings
"PIE810", # micropython does not support passing tuples to .startswith or .endswith
"PLC1901",
@ -74,8 +76,9 @@ ignore = [
"PLW2901",
"RUF012",
"RUF100",
"W191",
]
line-length = 260
line-length = 99
target-version = "py37"
[tool.ruff.mccabe]
@ -97,3 +100,5 @@ max-statements = 166
# ble multitests are evaluated with some names pre-defined
"micropython/bluetooth/aioble/multitests/*" = ["F821"]
[tool.ruff.format]

Wyświetl plik

@ -0,0 +1,32 @@
aiohttp is an HTTP client module for MicroPython asyncio module,
with API mostly compatible with CPython [aiohttp](https://github.com/aio-libs/aiohttp)
module.
> [!NOTE]
> Only client is implemented.
See `examples/client.py`
```py
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://micropython.org') as response:
print("Status:", response.status)
print("Content-Type:", response.headers['Content-Type'])
html = await response.text()
print("Body:", html[:15], "...")
asyncio.run(main())
```
```
$ micropython examples/client.py
Status: 200
Content-Type: text/html; charset=utf-8
Body: <!DOCTYPE html> ...
```

Wyświetl plik

@ -0,0 +1,269 @@
# MicroPython aiohttp library
# MIT license; Copyright (c) 2023 Carlos Gil
import asyncio
import json as _json
from .aiohttp_ws import (
_WSRequestContextManager,
ClientWebSocketResponse,
WebSocketClient,
WSMsgType,
)
HttpVersion10 = "HTTP/1.0"
HttpVersion11 = "HTTP/1.1"
class ClientResponse:
def __init__(self, reader):
self.content = reader
def _decode(self, data):
c_encoding = self.headers.get("Content-Encoding")
if c_encoding in ("gzip", "deflate", "gzip,deflate"):
try:
import deflate, io
if c_encoding == "deflate":
with deflate.DeflateIO(io.BytesIO(data), deflate.ZLIB) as d:
return d.read()
elif c_encoding == "gzip":
with deflate.DeflateIO(io.BytesIO(data), deflate.GZIP, 15) as d:
return d.read()
except ImportError:
print("WARNING: deflate module required")
return data
async def read(self, sz=-1):
return self._decode(await self.content.read(sz))
async def text(self, encoding="utf-8"):
return (await self.read(int(self.headers.get("Content-Length", -1)))).decode(encoding)
async def json(self):
return _json.loads(await self.read(int(self.headers.get("Content-Length", -1))))
def __repr__(self):
return "<ClientResponse %d %s>" % (self.status, self.headers)
class ChunkedClientResponse(ClientResponse):
def __init__(self, reader):
self.content = reader
self.chunk_size = 0
async def read(self, sz=4 * 1024 * 1024):
if self.chunk_size == 0:
l = await self.content.readline()
l = l.split(b";", 1)[0]
self.chunk_size = int(l, 16)
if self.chunk_size == 0:
# End of message
sep = await self.content.read(2)
assert sep == b"\r\n"
return b""
data = await self.content.read(min(sz, self.chunk_size))
self.chunk_size -= len(data)
if self.chunk_size == 0:
sep = await self.content.read(2)
assert sep == b"\r\n"
return self._decode(data)
def __repr__(self):
return "<ChunkedClientResponse %d %s>" % (self.status, self.headers)
class _RequestContextManager:
def __init__(self, client, request_co):
self.reqco = request_co
self.client = client
async def __aenter__(self):
return await self.reqco
async def __aexit__(self, *args):
await self.client._reader.aclose()
return await asyncio.sleep(0)
class ClientSession:
def __init__(self, base_url="", headers={}, version=HttpVersion10):
self._reader = None
self._base_url = base_url
self._base_headers = {"Connection": "close", "User-Agent": "compat"}
self._base_headers.update(**headers)
self._http_version = version
async def __aenter__(self):
return self
async def __aexit__(self, *args):
return await asyncio.sleep(0)
# TODO: Implement timeouts
async def _request(self, method, url, data=None, json=None, ssl=None, params=None, headers={}):
redir_cnt = 0
redir_url = None
while redir_cnt < 2:
reader = await self.request_raw(method, url, data, json, ssl, params, headers)
_headers = []
sline = await reader.readline()
sline = sline.split(None, 2)
status = int(sline[1])
chunked = False
while True:
line = await reader.readline()
if not line or line == b"\r\n":
break
_headers.append(line)
if line.startswith(b"Transfer-Encoding:"):
if b"chunked" in line:
chunked = True
elif line.startswith(b"Location:"):
url = line.rstrip().split(None, 1)[1].decode()
if 301 <= status <= 303:
redir_cnt += 1
await reader.aclose()
continue
break
if chunked:
resp = ChunkedClientResponse(reader)
else:
resp = ClientResponse(reader)
resp.status = status
resp.headers = _headers
resp.url = url
if params:
resp.url += "?" + "&".join(f"{k}={params[k]}" for k in sorted(params))
try:
resp.headers = {
val.split(":", 1)[0]: val.split(":", 1)[-1].strip()
for val in [hed.decode().strip() for hed in _headers]
}
except Exception:
pass
self._reader = reader
return resp
async def request_raw(
self,
method,
url,
data=None,
json=None,
ssl=None,
params=None,
headers={},
is_handshake=False,
version=None,
):
if json and isinstance(json, dict):
data = _json.dumps(json)
if data is not None and method == "GET":
method = "POST"
if params:
url += "?" + "&".join(f"{k}={params[k]}" for k in sorted(params))
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
port = 443
if ssl is None:
ssl = True
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
reader, writer = await asyncio.open_connection(host, port, ssl=ssl)
# Use protocol 1.0, because 1.1 always allows to use chunked transfer-encoding
# But explicitly set Connection: close, even though this should be default for 1.0,
# because some servers misbehave w/o it.
if version is None:
version = self._http_version
if "Host" not in headers:
headers.update(Host=host)
if not data:
query = b"%s /%s %s\r\n%s\r\n" % (
method,
path,
version,
"\r\n".join(f"{k}: {v}" for k, v in headers.items()) + "\r\n" if headers else "",
)
else:
if json:
headers.update(**{"Content-Type": "application/json"})
if isinstance(data, bytes):
headers.update(**{"Content-Type": "application/octet-stream"})
else:
data = data.encode()
headers.update(**{"Content-Length": len(data)})
query = b"""%s /%s %s\r\n%s\r\n%s""" % (
method,
path,
version,
"\r\n".join(f"{k}: {v}" for k, v in headers.items()) + "\r\n",
data,
)
if not is_handshake:
await writer.awrite(query)
return reader
else:
await writer.awrite(query)
return reader, writer
def request(self, method, url, data=None, json=None, ssl=None, params=None, headers={}):
return _RequestContextManager(
self,
self._request(
method,
self._base_url + url,
data=data,
json=json,
ssl=ssl,
params=params,
headers=dict(**self._base_headers, **headers),
),
)
def get(self, url, **kwargs):
return self.request("GET", url, **kwargs)
def post(self, url, **kwargs):
return self.request("POST", url, **kwargs)
def put(self, url, **kwargs):
return self.request("PUT", url, **kwargs)
def patch(self, url, **kwargs):
return self.request("PATCH", url, **kwargs)
def delete(self, url, **kwargs):
return self.request("DELETE", url, **kwargs)
def head(self, url, **kwargs):
return self.request("HEAD", url, **kwargs)
def options(self, url, **kwargs):
return self.request("OPTIONS", url, **kwargs)
def ws_connect(self, url, ssl=None):
return _WSRequestContextManager(self, self._ws_connect(url, ssl=ssl))
async def _ws_connect(self, url, ssl=None):
ws_client = WebSocketClient(None)
await ws_client.connect(url, ssl=ssl, handshake_request=self.request_raw)
self._reader = ws_client.reader
return ClientWebSocketResponse(ws_client)

Wyświetl plik

@ -0,0 +1,269 @@
# MicroPython aiohttp library
# MIT license; Copyright (c) 2023 Carlos Gil
# adapted from https://github.com/danni/uwebsockets
# and https://github.com/miguelgrinberg/microdot/blob/main/src/microdot_asyncio_websocket.py
import asyncio
import random
import json as _json
import binascii
import re
import struct
from collections import namedtuple
URL_RE = re.compile(r"(wss|ws)://([A-Za-z0-9-\.]+)(?:\:([0-9]+))?(/.+)?")
URI = namedtuple("URI", ("protocol", "hostname", "port", "path")) # noqa: PYI024
def urlparse(uri):
"""Parse ws:// URLs"""
match = URL_RE.match(uri)
if match:
protocol = match.group(1)
host = match.group(2)
port = match.group(3)
path = match.group(4)
if protocol == "wss":
if port is None:
port = 443
elif protocol == "ws":
if port is None:
port = 80
else:
raise ValueError("Scheme {} is invalid".format(protocol))
return URI(protocol, host, int(port), path)
class WebSocketMessage:
def __init__(self, opcode, data):
self.type = opcode
self.data = data
class WSMsgType:
TEXT = 1
BINARY = 2
ERROR = 258
class WebSocketClient:
CONT = 0
TEXT = 1
BINARY = 2
CLOSE = 8
PING = 9
PONG = 10
def __init__(self, params):
self.params = params
self.closed = False
self.reader = None
self.writer = None
async def connect(self, uri, ssl=None, handshake_request=None):
uri = urlparse(uri)
assert uri
if uri.protocol == "wss":
if not ssl:
ssl = True
await self.handshake(uri, ssl, handshake_request)
@classmethod
def _parse_frame_header(cls, header):
byte1, byte2 = struct.unpack("!BB", header)
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
fin = bool(byte1 & 0x80)
opcode = byte1 & 0x0F
# Byte 2: MASK(1) LENGTH(7)
mask = bool(byte2 & (1 << 7))
length = byte2 & 0x7F
return fin, opcode, mask, length
def _process_websocket_frame(self, opcode, payload):
if opcode == self.TEXT:
payload = payload.decode()
elif opcode == self.BINARY:
pass
elif opcode == self.CLOSE:
# raise OSError(32, "Websocket connection closed")
return opcode, payload
elif opcode == self.PING:
return self.PONG, payload
elif opcode == self.PONG: # pragma: no branch
return None, None
return None, payload
@classmethod
def _encode_websocket_frame(cls, opcode, payload):
if opcode == cls.TEXT:
payload = payload.encode()
length = len(payload)
fin = mask = True
# Frame header
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
byte1 = 0x80 if fin else 0
byte1 |= opcode
# Byte 2: MASK(1) LENGTH(7)
byte2 = 0x80 if mask else 0
if length < 126: # 126 is magic value to use 2-byte length header
byte2 |= length
frame = struct.pack("!BB", byte1, byte2)
elif length < (1 << 16): # Length fits in 2-bytes
byte2 |= 126 # Magic code
frame = struct.pack("!BBH", byte1, byte2, length)
elif length < (1 << 64):
byte2 |= 127 # Magic code
frame = struct.pack("!BBQ", byte1, byte2, length)
else:
raise ValueError
# Mask is 4 bytes
mask_bits = struct.pack("!I", random.getrandbits(32))
frame += mask_bits
payload = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(payload))
return frame + payload
async def handshake(self, uri, ssl, req):
headers = {}
_http_proto = "http" if uri.protocol != "wss" else "https"
url = f"{_http_proto}://{uri.hostname}:{uri.port}{uri.path or '/'}"
key = binascii.b2a_base64(bytes(random.getrandbits(8) for _ in range(16)))[:-1]
headers["Host"] = f"{uri.hostname}:{uri.port}"
headers["Connection"] = "Upgrade"
headers["Upgrade"] = "websocket"
headers["Sec-WebSocket-Key"] = key
headers["Sec-WebSocket-Version"] = "13"
headers["Origin"] = f"{_http_proto}://{uri.hostname}:{uri.port}"
self.reader, self.writer = await req(
"GET",
url,
ssl=ssl,
headers=headers,
is_handshake=True,
version="HTTP/1.1",
)
header = await self.reader.readline()
header = header[:-2]
assert header.startswith(b"HTTP/1.1 101 "), header
while header:
header = await self.reader.readline()
header = header[:-2]
async def receive(self):
while True:
opcode, payload = await self._read_frame()
send_opcode, data = self._process_websocket_frame(opcode, payload)
if send_opcode: # pragma: no cover
await self.send(data, send_opcode)
if opcode == self.CLOSE:
self.closed = True
return opcode, data
elif data: # pragma: no branch
return opcode, data
async def send(self, data, opcode=None):
frame = self._encode_websocket_frame(
opcode or (self.TEXT if isinstance(data, str) else self.BINARY), data
)
self.writer.write(frame)
await self.writer.drain()
async def close(self):
if not self.closed: # pragma: no cover
self.closed = True
await self.send(b"", self.CLOSE)
async def _read_frame(self):
header = await self.reader.read(2)
if len(header) != 2: # pragma: no cover
# raise OSError(32, "Websocket connection closed")
opcode = self.CLOSE
payload = b""
return opcode, payload
fin, opcode, has_mask, length = self._parse_frame_header(header)
if length == 126: # Magic number, length header is 2 bytes
(length,) = struct.unpack("!H", await self.reader.read(2))
elif length == 127: # Magic number, length header is 8 bytes
(length,) = struct.unpack("!Q", await self.reader.read(8))
if has_mask: # pragma: no cover
mask = await self.reader.read(4)
payload = await self.reader.read(length)
if has_mask: # pragma: no cover
payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload))
return opcode, payload
class ClientWebSocketResponse:
def __init__(self, wsclient):
self.ws = wsclient
def __aiter__(self):
return self
async def __anext__(self):
msg = WebSocketMessage(*await self.ws.receive())
# print(msg.data, msg.type) # DEBUG
if (not msg.data and msg.type == self.ws.CLOSE) or self.ws.closed:
raise StopAsyncIteration
return msg
async def close(self):
await self.ws.close()
async def send_str(self, data):
if not isinstance(data, str):
raise TypeError("data argument must be str (%r)" % type(data))
await self.ws.send(data)
async def send_bytes(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError("data argument must be byte-ish (%r)" % type(data))
await self.ws.send(data)
async def send_json(self, data):
await self.send_str(_json.dumps(data))
async def receive_str(self):
msg = WebSocketMessage(*await self.ws.receive())
if msg.type != self.ws.TEXT:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
return msg.data
async def receive_bytes(self):
msg = WebSocketMessage(*await self.ws.receive())
if msg.type != self.ws.BINARY:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
return msg.data
async def receive_json(self):
data = await self.receive_str()
return _json.loads(data)
class _WSRequestContextManager:
def __init__(self, client, request_co):
self.reqco = request_co
self.client = client
async def __aenter__(self):
return await self.reqco
async def __aexit__(self, *args):
await self.client._reader.aclose()
return await asyncio.sleep(0)

Wyświetl plik

@ -0,0 +1,18 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("http://micropython.org") as response:
print("Status:", response.status)
print("Content-Type:", response.headers["Content-Type"])
html = await response.text()
print("Body:", html[:15], "...")
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,20 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
headers = {"Accept-Encoding": "gzip,deflate"}
async def main():
async with aiohttp.ClientSession(headers=headers, version=aiohttp.HttpVersion11) as session:
async with session.get("http://micropython.org") as response:
print("Status:", response.status)
print("Content-Type:", response.headers["Content-Type"])
print(response.headers)
html = await response.text()
print(html)
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,29 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
URL = sys.argv.pop()
if not URL.startswith("http"):
URL = "http://micropython.org"
print(URL)
async def fetch(client):
async with client.get(URL) as resp:
assert resp.status == 200
return await resp.text()
async def main():
async with aiohttp.ClientSession() as client:
html = await fetch(client)
print(html)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,18 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
headers = {"Authorization": "Basic bG9naW46cGFzcw=="}
async def main():
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get("http://httpbin.org/headers") as r:
json_body = await r.json()
print(json_body)
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,25 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession("http://httpbin.org") as session:
async with session.get("/get") as resp:
assert resp.status == 200
rget = await resp.text()
print(f"GET: {rget}")
async with session.post("/post", json={"foo": "bar"}) as resp:
assert resp.status == 200
rpost = await resp.text()
print(f"POST: {rpost}")
async with session.put("/put", data=b"data") as resp:
assert resp.status == 200
rput = await resp.json()
print("PUT: ", rput)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,20 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
params = {"key1": "value1", "key2": "value2"}
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/get", params=params) as response:
expect = "http://httpbin.org/get?key1=value1&key2=value2"
assert str(response.url) == expect, f"{response.url} != {expect}"
html = await response.text()
print(html)
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,44 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
try:
URL = sys.argv[1] # expects a websocket echo server
except Exception:
URL = "ws://echo.websocket.events"
sslctx = False
if URL.startswith("wss:"):
try:
import ssl
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
sslctx.verify_mode = ssl.CERT_NONE
except Exception:
pass
async def ws_test_echo(session):
async with session.ws_connect(URL, ssl=sslctx) as ws:
await ws.send_str("hello world!\r\n")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
if "close" in msg.data:
break
await ws.send_str("close\r\n")
await ws.close()
async def main():
async with aiohttp.ClientSession() as session:
await ws_test_echo(session)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,53 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
try:
URL = sys.argv[1] # expects a websocket echo server
READ_BANNER = False
except Exception:
URL = "ws://echo.websocket.events"
READ_BANNER = True
sslctx = False
if URL.startswith("wss:"):
try:
import ssl
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
sslctx.verify_mode = ssl.CERT_NONE
except Exception:
pass
async def ws_test_echo(session):
async with session.ws_connect(URL, ssl=sslctx) as ws:
if READ_BANNER:
print(await ws.receive_str())
try:
while True:
await ws.send_str(f"{input('>>> ')}\r\n")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data, end="")
break
except KeyboardInterrupt:
pass
finally:
await ws.close()
async def main():
async with aiohttp.ClientSession() as session:
await ws_test_echo(session)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,7 @@
metadata(
description="HTTP client module for MicroPython asyncio module",
version="0.0.2",
pypi="aiohttp",
)
package("aiohttp")

Wyświetl plik

@ -24,5 +24,10 @@ THE SOFTWARE.
"""
from . import decoder
from . import encoder
from ._decoder import CBORDecoder
from ._decoder import load
from ._decoder import loads
from ._encoder import CBOREncoder
from ._encoder import dump
from ._encoder import dumps

Wyświetl plik

@ -210,8 +210,9 @@ class CBORDecoder(object):
data = self.fp.read(amount)
if len(data) < amount:
raise CBORDecodeError(
"premature end of stream (expected to read {} bytes, got {} "
"instead)".format(amount, len(data))
"premature end of stream (expected to read {} bytes, got {} instead)".format(
amount, len(data)
)
)
return data

Wyświetl plik

@ -24,16 +24,15 @@ THE SOFTWARE.
"""
from cbor2 import encoder
from cbor2 import decoder
import cbor2
input = [
{"bn": "urn:dev:ow:10e2073a01080063", "u": "Cel", "t": 1.276020076e09, "v": 23.5},
{"u": "Cel", "t": 1.276020091e09, "v": 23.6},
]
data = encoder.dumps(input)
data = cbor2.dumps(input)
print(data)
print(data.hex())
text = decoder.loads(data)
text = cbor2.loads(data)
print(text)

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0", pypi="cbor2")
metadata(version="1.0.0", pypi="cbor2")
package("cbor2")

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.8.0", pypi="requests")
metadata(version="0.9.0", pypi="requests")
package("requests")

Wyświetl plik

@ -45,7 +45,7 @@ def request(
parse_headers=True,
):
redirect = None # redirection url, None means no redirection
chunked_data = data and getattr(data, "__iter__", None) and not getattr(data, "__len__", None)
chunked_data = data and getattr(data, "__next__", None) and not getattr(data, "__len__", None)
if auth is not None:
import ubinascii
@ -63,7 +63,7 @@ def request(
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
import tls
port = 443
else:
@ -90,7 +90,9 @@ def request(
try:
s.connect(ai[-1])
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.verify_mode = tls.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if "Host" not in headers:
s.write(b"Host: %s\r\n" % host)

Wyświetl plik

@ -0,0 +1,11 @@
uaesmac is a module containing an aes cypher and aes-cmac hash algorithm,
it was adapted to Micropython based on the work from https://github.com/secworks/aes/blob/master/src/model/python/aes.py
and https://github.com/secworks/cmac/blob/master/src/model/python/cmac.py .
It was adapted to match the same functions as other hashers, with "update"
and "digest" methods.
Keys and messages to test the digest, cipher and decipher methods were taken
from https://csrc.nist.gov/CSRC/media/Projects/Cryptographic-Standards-and-Guidelines/documents/examples/AES_Core128.pdf ,
https://csrc.nist.gov/CSRC/media/Projects/Cryptographic-Standards-and-Guidelines/documents/examples/AES_Core256.pdf and
https://csrc.nist.gov/CSRC/media/Projects/Cryptographic-Standards-and-Guidelines/documents/examples/AES_CMAC.pdf .

Wyświetl plik

@ -0,0 +1,6 @@
srctype = micropython-lib
type = module
version = 0.0
author = Marceau Fillon
desc = Module containing an aes cypher and aes-cmac hash algorithm.
long_desc = README

Wyświetl plik

@ -0,0 +1,160 @@
import unittest
from uaes import AesCypher
nist_aes128_key = (0x2b7e1516, 0x28aed2a6, 0xabf71588, 0x09cf4f3c)
nist_aes256_key = (0x603deb10, 0x15ca71be, 0x2b73aef0, 0x857d7781,
0x1f352c07, 0x3b6108d7, 0x2d9810a3, 0x0914dff4)
class TestAes(unittest.TestCase):
def setUp(self):
self.cypher = AesCypher(verbose=False, dump_vars=False)
class TestAesMixColumns():
def test_mixcolumns(self):
print("Test of _mixcolumns and inverse _mixcolumns:")
mixresult = self._mixcolumns(nist_aes128_key)
inv_mixresult = self._inv__mixcolumns(mixresult)
print("Test of _mixw ochi _inv__mixw:")
testw = 0xdb135345
expw = 0x8e4da1bc
mixresult = self._mixw(testw)
inv_mixresult = self._inv__mixw(mixresult)
print("Testword: 0x%08x" % testw)
print("expexted: 0x%08x" % expw)
print("_mixword: 0x%08x" % mixresult)
print("inv_mixword: 0x%08x" % inv_mixresult)
class TestAesCypher128Bits(TestAes):
"""
Plain text and expected values come from: https://csrc.nist.gov/CSRC/media/Projects/Cryptographic-Standards-and-Guidelines/documents/examples/AES_Core128.pdf
"""
def _test_aes_cypher_128_bits(self, plain_text, expected):
result = self.cypher.aes_encipher_block(nist_aes128_key, plain_text)
self.assertEqual(result, expected)
def test_aes_1(self):
plain_text = (0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a)
expected = (0x3ad77bb4, 0x0d7a3660, 0xa89ecaf3, 0x2466ef97)
self._test_aes_cypher_128_bits(plain_text, expected)
def test_aes_2(self):
plain_text = (0xae2d8a57, 0x1e03ac9c, 0x9eb76fac, 0x45af8e51)
expected = (0xf5d3d585, 0x03b9699d, 0xe785895a, 0x96fdbaaf)
self._test_aes_cypher_128_bits(plain_text, expected)
def test_aes_3(self):
plain_text = (0x30c81c46, 0xa35ce411, 0xe5fbc119, 0x1a0a52ef)
expected = (0x43b1cd7f, 0x598ece23, 0x881b00e3, 0xed030688)
self._test_aes_cypher_128_bits(plain_text, expected)
def test_aes_4(self):
plain_text = (0xf69f2445, 0xdf4f9b17, 0xad2b417b, 0xe66c3710)
expected = (0x7b0c785e, 0x27e8ad3f, 0x82232071, 0x04725dd4)
self._test_aes_cypher_128_bits(plain_text, expected)
class TestAesCypher256Bits(TestAes):
"""
Plain text and expected values come from: https://csrc.nist.gov/CSRC/media/Projects/Cryptographic-Standards-and-Guidelines/documents/examples/AES_Core256.pdf
"""
def _test_aes_cypher_256_bits(self, plain_text, expected):
result = self.cypher.aes_encipher_block(nist_aes256_key, plain_text)
self.assertEqual(result, expected)
def test_aes_1(self):
plain_text = (0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a)
expected = (0xf3eed1bd, 0xb5d2a03c, 0x064b5a7e, 0x3db181f8)
self._test_aes_cypher_256_bits(plain_text, expected)
def test_aes_2(self):
plain_text = (0xae2d8a57, 0x1e03ac9c, 0x9eb76fac, 0x45af8e51)
expected = (0x591ccb10, 0xd410ed26, 0xdc5ba74a, 0x31362870)
self._test_aes_cypher_256_bits(plain_text, expected)
def test_aes_3(self):
plain_text = (0x30c81c46, 0xa35ce411, 0xe5fbc119, 0x1a0a52ef)
expected = (0xb6ed21b9, 0x9ca6f4f9, 0xf153e7b1, 0xbeafed1d)
self._test_aes_cypher_256_bits(plain_text, expected)
def test_aes_4(self):
plain_text = (0xf69f2445, 0xdf4f9b17, 0xad2b417b, 0xe66c3710)
expected = (0x23304b7a, 0x39f9f3ff, 0x067d8d8f, 0x9e24ecc7)
self._test_aes_cypher_256_bits(plain_text, expected)
class TestAesDecypher128Bits(TestAes):
"""
Plain text and expected values come from: https://csrc.nist.gov/CSRC/media/Projects/Cryptographic-Standards-and-Guidelines/documents/examples/AES_Core128.pdf
"""
def _test_aes_decypher_128_bits(self, cipher_text, expected):
result = self.cypher.aes_decipher_block(nist_aes128_key, cipher_text)
self.assertEqual(result, expected)
def test_aes_1(self):
plain_text = (0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a)
cipher_text = (0x3ad77bb4, 0x0d7a3660, 0xa89ecaf3, 0x2466ef97)
self._test_aes_decypher_128_bits(cipher_text, plain_text)
def test_aes_2(self):
plain_text = (0xae2d8a57, 0x1e03ac9c, 0x9eb76fac, 0x45af8e51)
cipher_text = (0xf5d3d585, 0x03b9699d, 0xe785895a, 0x96fdbaaf)
self._test_aes_decypher_128_bits(cipher_text, plain_text)
def test_aes_3(self):
plain_text = (0x30c81c46, 0xa35ce411, 0xe5fbc119, 0x1a0a52ef)
cipher_text = (0x43b1cd7f, 0x598ece23, 0x881b00e3, 0xed030688)
self._test_aes_decypher_128_bits(cipher_text, plain_text)
def test_aes_4(self):
plain_text = (0xf69f2445, 0xdf4f9b17, 0xad2b417b, 0xe66c3710)
cipher_text = (0x7b0c785e, 0x27e8ad3f, 0x82232071, 0x04725dd4)
self._test_aes_decypher_128_bits(cipher_text, plain_text)
class TestAesDecypher256Bits(TestAes):
"""
Plain text and expected values come from: https://csrc.nist.gov/CSRC/media/Projects/Cryptographic-Standards-and-Guidelines/documents/examples/AES_Core256.pdf
"""
def _test_aes_decypher_256_bits(self, cipher_text, expected):
result = self.cypher.aes_decipher_block(nist_aes256_key, cipher_text)
self.assertEqual(result, expected)
def test_aes_1(self):
plain_text = (0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a)
cipher_text = (0xf3eed1bd, 0xb5d2a03c, 0x064b5a7e, 0x3db181f8)
self._test_aes_decypher_256_bits(cipher_text, plain_text)
def test_aes_2(self):
plain_text = (0xae2d8a57, 0x1e03ac9c, 0x9eb76fac, 0x45af8e51)
cipher_text = (0x591ccb10, 0xd410ed26, 0xdc5ba74a, 0x31362870)
self._test_aes_decypher_256_bits(cipher_text, plain_text)
def test_aes_3(self):
plain_text = (0x30c81c46, 0xa35ce411, 0xe5fbc119, 0x1a0a52ef)
cipher_text = (0xb6ed21b9, 0x9ca6f4f9, 0xf153e7b1, 0xbeafed1d)
self._test_aes_decypher_256_bits(cipher_text, plain_text)
def test_aes_4(self):
plain_text = (0xf69f2445, 0xdf4f9b17, 0xad2b417b, 0xe66c3710)
cipher_text = (0x23304b7a, 0x39f9f3ff, 0x067d8d8f, 0x9e24ecc7)
self._test_aes_decypher_256_bits(cipher_text, plain_text)
if __name__ == "__main__":
unittest.main()

Wyświetl plik

@ -0,0 +1,155 @@
import unittest
from ucmac import CmacHasher
import struct
test_sub_block_bytes = b'12345678'
test_sub_block_int = 305419896
test_block = (test_sub_block_int, test_sub_block_int, test_sub_block_int, test_sub_block_int)
class TestCmac(unittest.TestCase):
def setUp(self):
self.hasher = CmacHasher()
class TestCmacUpdate(TestCmac):
def test_update_empty_message(self):
self.hasher.update(b'12345')
self.assertEqual(b'12345', self.hasher.current_block)
self.assertEqual(self.hasher.message, [])
def test_update_message_enough_space(self):
self.hasher.current_block = b'12345'
self.hasher.update(b'678')
self.assertEqual(test_sub_block_bytes, self.hasher.current_block)
self.assertEqual(self.hasher.message, [])
def test_update_message_exact_space(self):
self.hasher.current_block = test_sub_block_bytes + test_sub_block_bytes
self.hasher.update(test_sub_block_bytes + test_sub_block_bytes)
self.assertEqual(b'', self.hasher.current_block)
self.assertEqual(self.hasher.message, [(test_sub_block_int,
test_sub_block_int,
test_sub_block_int,
test_sub_block_int
)])
def test_update_message_extra_bytes(self):
self.hasher.current_block = b'12345678123456781234567812'
self.hasher.update(b'34567812345')
self.assertEqual(b'12345', self.hasher.current_block)
self.assertEqual(self.hasher.message, [test_block])
def test_update_message_extra_block(self):
self.hasher.current_block = b'12345678123456781234567812'
self.hasher.update(b'3456781234567812345678123456781234567812345')
self.assertEqual(self.hasher.current_block, b'12345')
self.assertEqual(len(self.hasher.message), 2)
self.assertEqual(self.hasher.message, [test_block,
test_block])
class TestCmacXor(TestCmac):
def test_xor(self):
a = (0x00000000, 0x55555555, 0xaaaaaaaa, 0xff00ff00)
b = (0xdeadbeef, 0xaa00aa00, 0x55555555, 0xffffffff)
c = self.hasher.xor_words(a , b)
self.assertEqual((0xdeadbeef, 0xff55ff55, 0xffffffff, 0x00ff00ff), c)
class TestCmacSubkeyGenerator(TestCmac):
def test_cmac_subkey_gen(self):
"""
Test the subkey functionality by itself. Testvectors are
from the first examples in NISTs test case suite.
"""
nist_key128 = (0x2b7e1516, 0x28aed2a6, 0xabf71588, 0x09cf4f3c)
nist_exp_k1 = (0xfbeed618, 0x35713366, 0x7c85e08f, 0x7236a8de)
nist_exp_k2 = (0xf7ddac30, 0x6ae266cc, 0xf90bc11e, 0xe46d513b)
(K1, K2) = self.hasher.cmac_gen_subkeys(nist_key128)
self.assertEqual(K1, nist_exp_k1)
self.assertEqual(K2, nist_exp_k2)
class TestCmacDigest(unittest.TestCase):
def setUp(self):
key = (0x2b7e1516, 0x28aed2a6, 0xabf71588, 0x09cf4f3c)
self.hasher = CmacHasher(key)
def test_zero_length_message(self):
expected = (0xbb1d6929, 0xe9593728, 0x7fa37d12, 0x9b756746)
digest = struct.unpack('<IIII', self.hasher.digest())
self.assertEqual(digest, expected)
def test_message_1_block(self):
expected = (0x070a16b4, 0x6b4d4144, 0xf79bdd9d, 0xd04a287c)
self.hasher.message = [(0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a)]
digest = struct.unpack('<IIII', self.hasher.digest())
self.assertEqual(digest, expected)
def test_message_one_and_quart_blocks(self):
expected = (0x7d85449e, 0xa6ea19c8, 0x23a7bf78, 0x837dfade)
self.hasher.message = [(0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a)]
self.hasher.current_block = b"ae2d8a57"
digest = struct.unpack('<IIII', self.hasher.digest())
self.assertEqual(digest, expected)
def test_message_incomplete_last_block(self):
expected = (0x72675e5d, 0x1289f7f3, 0x96166b1d, 0xd5e38149)
self.hasher.message = [(0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a)]
self.hasher.current_block = b"ae2d8a"
digest = struct.unpack('<IIII', self.hasher.digest())
self.assertEqual(digest, expected)
def test_message_four_blocks(self):
print("128 bit key, four block message.")
expected = (0x51f0bebf, 0x7e3b9d92, 0xfc497417, 0x79363cfe)
self.hasher.message = [(0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a),
(0xae2d8a57, 0x1e03ac9c, 0x9eb76fac, 0x45af8e51),
(0x30c81c46, 0xa35ce411, 0xe5fbc119, 0x1a0a52ef),
(0xf69f2445, 0xdf4f9b17, 0xad2b417b, 0xe66c3710)]
digest = struct.unpack('<IIII', self.hasher.digest())
self.assertEqual(digest, expected)
class TestCmacDigestKey256Bits(unittest.TestCase):
def setUp(self):
nist_key256 = (0x603deb10, 0x15ca71be, 0x2b73aef0, 0x857d7781,
0x1f352c07, 0x3b6108d7, 0x2d9810a3, 0x0914dff4)
self.hasher = CmacHasher(key=nist_key256)
def test_digest_zero_length_message(self):
expected = (0x028962f6, 0x1b7bf89e, 0xfc6b551f, 0x4667d983)
self.hasher.message = []
digest = struct.unpack('<IIII', self.hasher.digest())
self.assertEqual(digest, expected)
def test_digest_four_blocks_message(self):
expected = (0xe1992190, 0x549f6ed5, 0x696a2c05, 0x6c315410)
self.hasher.message = [(0x6bc1bee2, 0x2e409f96, 0xe93d7e11, 0x7393172a),
(0xae2d8a57, 0x1e03ac9c, 0x9eb76fac, 0x45af8e51),
(0x30c81c46, 0xa35ce411, 0xe5fbc119, 0x1a0a52ef),
(0xf69f2445, 0xdf4f9b17, 0xad2b417b, 0xe66c3710)]
digest = struct.unpack('<IIII', self.hasher.digest())
self.assertEqual(digest, expected)
class TestCmacPadding(TestCmac):
def test_padding_empty_block(self):
padded_block = self.hasher.pad_block(b'')
self.assertEqual(padded_block, (0x80000000, 0x00000000, 0x00000000, 0x00000000))
def test_padding_first_block_complete(self):
padded_block = self.hasher.pad_block(b'12345678')
self.assertEqual(padded_block, (0x12345678, 0x80000000, 0x00000000, 0x00000000))
def test_padding_first_block_incomplete(self):
padded_block = self.hasher.pad_block(b'1234567')
self.assertEqual(padded_block, (0x12345678, 0x00000000, 0x00000000, 0x00000000))
def test_padding_all_blocks_complete(self):
padded_block = self.hasher.pad_block(b'12345678123456781234567812345678')
self.assertEqual(padded_block, (0x12345678, 0x12345678, 0x12345678, 0x12345678))
if __name__ == "__main__":
unittest.main()

Wyświetl plik

@ -0,0 +1,702 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#=======================================================================
# Author: Joachim Strömbergson
# Copyright (c) 2014, Secworks Sweden AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#=======================================================================
AES_128_ROUNDS = 10
AES_256_ROUNDS = 14
class AesCypher():
sbox = [0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5,
0x30, 0x01, 0x67, 0x2b, 0xfe, 0xd7, 0xab, 0x76,
0xca, 0x82, 0xc9, 0x7d, 0xfa, 0x59, 0x47, 0xf0,
0xad, 0xd4, 0xa2, 0xaf, 0x9c, 0xa4, 0x72, 0xc0,
0xb7, 0xfd, 0x93, 0x26, 0x36, 0x3f, 0xf7, 0xcc,
0x34, 0xa5, 0xe5, 0xf1, 0x71, 0xd8, 0x31, 0x15,
0x04, 0xc7, 0x23, 0xc3, 0x18, 0x96, 0x05, 0x9a,
0x07, 0x12, 0x80, 0xe2, 0xeb, 0x27, 0xb2, 0x75,
0x09, 0x83, 0x2c, 0x1a, 0x1b, 0x6e, 0x5a, 0xa0,
0x52, 0x3b, 0xd6, 0xb3, 0x29, 0xe3, 0x2f, 0x84,
0x53, 0xd1, 0x00, 0xed, 0x20, 0xfc, 0xb1, 0x5b,
0x6a, 0xcb, 0xbe, 0x39, 0x4a, 0x4c, 0x58, 0xcf,
0xd0, 0xef, 0xaa, 0xfb, 0x43, 0x4d, 0x33, 0x85,
0x45, 0xf9, 0x02, 0x7f, 0x50, 0x3c, 0x9f, 0xa8,
0x51, 0xa3, 0x40, 0x8f, 0x92, 0x9d, 0x38, 0xf5,
0xbc, 0xb6, 0xda, 0x21, 0x10, 0xff, 0xf3, 0xd2,
0xcd, 0x0c, 0x13, 0xec, 0x5f, 0x97, 0x44, 0x17,
0xc4, 0xa7, 0x7e, 0x3d, 0x64, 0x5d, 0x19, 0x73,
0x60, 0x81, 0x4f, 0xdc, 0x22, 0x2a, 0x90, 0x88,
0x46, 0xee, 0xb8, 0x14, 0xde, 0x5e, 0x0b, 0xdb,
0xe0, 0x32, 0x3a, 0x0a, 0x49, 0x06, 0x24, 0x5c,
0xc2, 0xd3, 0xac, 0x62, 0x91, 0x95, 0xe4, 0x79,
0xe7, 0xc8, 0x37, 0x6d, 0x8d, 0xd5, 0x4e, 0xa9,
0x6c, 0x56, 0xf4, 0xea, 0x65, 0x7a, 0xae, 0x08,
0xba, 0x78, 0x25, 0x2e, 0x1c, 0xa6, 0xb4, 0xc6,
0xe8, 0xdd, 0x74, 0x1f, 0x4b, 0xbd, 0x8b, 0x8a,
0x70, 0x3e, 0xb5, 0x66, 0x48, 0x03, 0xf6, 0x0e,
0x61, 0x35, 0x57, 0xb9, 0x86, 0xc1, 0x1d, 0x9e,
0xe1, 0xf8, 0x98, 0x11, 0x69, 0xd9, 0x8e, 0x94,
0x9b, 0x1e, 0x87, 0xe9, 0xce, 0x55, 0x28, 0xdf,
0x8c, 0xa1, 0x89, 0x0d, 0xbf, 0xe6, 0x42, 0x68,
0x41, 0x99, 0x2d, 0x0f, 0xb0, 0x54, 0xbb, 0x16]
inv_sbox = [0x52, 0x09, 0x6a, 0xd5, 0x30, 0x36, 0xa5, 0x38,
0xbf, 0x40, 0xa3, 0x9e, 0x81, 0xf3, 0xd7, 0xfb,
0x7c, 0xe3, 0x39, 0x82, 0x9b, 0x2f, 0xff, 0x87,
0x34, 0x8e, 0x43, 0x44, 0xc4, 0xde, 0xe9, 0xcb,
0x54, 0x7b, 0x94, 0x32, 0xa6, 0xc2, 0x23, 0x3d,
0xee, 0x4c, 0x95, 0x0b, 0x42, 0xfa, 0xc3, 0x4e,
0x08, 0x2e, 0xa1, 0x66, 0x28, 0xd9, 0x24, 0xb2,
0x76, 0x5b, 0xa2, 0x49, 0x6d, 0x8b, 0xd1, 0x25,
0x72, 0xf8, 0xf6, 0x64, 0x86, 0x68, 0x98, 0x16,
0xd4, 0xa4, 0x5c, 0xcc, 0x5d, 0x65, 0xb6, 0x92,
0x6c, 0x70, 0x48, 0x50, 0xfd, 0xed, 0xb9, 0xda,
0x5e, 0x15, 0x46, 0x57, 0xa7, 0x8d, 0x9d, 0x84,
0x90, 0xd8, 0xab, 0x00, 0x8c, 0xbc, 0xd3, 0x0a,
0xf7, 0xe4, 0x58, 0x05, 0xb8, 0xb3, 0x45, 0x06,
0xd0, 0x2c, 0x1e, 0x8f, 0xca, 0x3f, 0x0f, 0x02,
0xc1, 0xaf, 0xbd, 0x03, 0x01, 0x13, 0x8a, 0x6b,
0x3a, 0x91, 0x11, 0x41, 0x4f, 0x67, 0xdc, 0xea,
0x97, 0xf2, 0xcf, 0xce, 0xf0, 0xb4, 0xe6, 0x73,
0x96, 0xac, 0x74, 0x22, 0xe7, 0xad, 0x35, 0x85,
0xe2, 0xf9, 0x37, 0xe8, 0x1c, 0x75, 0xdf, 0x6e,
0x47, 0xf1, 0x1a, 0x71, 0x1d, 0x29, 0xc5, 0x89,
0x6f, 0xb7, 0x62, 0x0e, 0xaa, 0x18, 0xbe, 0x1b,
0xfc, 0x56, 0x3e, 0x4b, 0xc6, 0xd2, 0x79, 0x20,
0x9a, 0xdb, 0xc0, 0xfe, 0x78, 0xcd, 0x5a, 0xf4,
0x1f, 0xdd, 0xa8, 0x33, 0x88, 0x07, 0xc7, 0x31,
0xb1, 0x12, 0x10, 0x59, 0x27, 0x80, 0xec, 0x5f,
0x60, 0x51, 0x7f, 0xa9, 0x19, 0xb5, 0x4a, 0x0d,
0x2d, 0xe5, 0x7a, 0x9f, 0x93, 0xc9, 0x9c, 0xef,
0xa0, 0xe0, 0x3b, 0x4d, 0xae, 0x2a, 0xf5, 0xb0,
0xc8, 0xeb, 0xbb, 0x3c, 0x83, 0x53, 0x99, 0x61,
0x17, 0x2b, 0x04, 0x7e, 0xba, 0x77, 0xd6, 0x26,
0xe1, 0x69, 0x14, 0x63, 0x55, 0x21, 0x0c, 0x7d]
def __init__(self, verbose = True, dump_vars = True):
self.verbose = verbose
self.dump_vars = dump_vars
def check_block(self, expected, result):
"""
Checks and reports if a result block matches expected block.
Parameters
----------
expected : tuple(int, int, int, int)
Expected block
result : tuple(int, int, int, int)
Block to compare with expected result
Returns
-------
int
0 if block matches the expected block
1 otherwise
"""
if (expected[0] == result[0]) and (expected[1] == result[1]) and\
(expected[2] == result[2]) and (expected[3] == result[3]):
if self.verbose:
print("OK. Result matches expected.")
print("")
return 0
else:
print("ERROR. Result does not match expected.")
print("Expected:")
self._print_block(expected)
print("Got:")
self._print_block(result)
print("")
return 1
def aes_encipher_block(self, key, block):
"""
Perform AES encipher operation for the given block using the
given key length.
Parameters
----------
key : tuple
128 or 256 bits key split in 32 bits words
block : tuple(int, int, int, int)
128 bits block split in 32 bits words
Returns
-------
tuple(int, int, int, int)
128 bits block, result from the encipher operation
"""
# Get round keys based on the given key.
if len(key) == 4:
round_keys = self._key_gen128(key)
num_rounds = AES_128_ROUNDS
else:
round_keys = self._key_gen256(key)
num_rounds = AES_256_ROUNDS
# Init round
if self.verbose:
print(" Initial AddRoundKeys round.")
tmp_block4 = self._addroundkey(round_keys[0], block)
# Main rounds
for i in range(1 , (num_rounds)):
if self.verbose:
print("")
print(" Round %02d" % i)
print(" ---------")
tmp_block1 = self._subbytes(tmp_block4)
tmp_block2 = self._shiftrows(tmp_block1)
tmp_block3 = self._mixcolumns(tmp_block2)
tmp_block4 = self._addroundkey(round_keys[i], tmp_block3)
# Final round
if self.verbose:
print(" Final round.")
tmp_block1 = self._subbytes(tmp_block4)
tmp_block2 = self._shiftrows(tmp_block1)
tmp_block3 = self._addroundkey(round_keys[num_rounds], tmp_block2)
return tmp_block3
def aes_decipher_block(self, key, block):
"""
Perform AES decipher operation for the given block
using the given key length.
Parameters
----------
key : tuple
128 or 256 bits key split in 32 bits words
block : tuple(int, int, int, int)
128 bits block split in 32 bits words
Returns
-------
tuple(int, int, int, int)
128 bits block, result from the decipher operation
"""
tmp_block = block[:]
# Get round keys based on the given key.
if len(key) == 4:
round_keys = self._key_gen128(key)
num_rounds = AES_128_ROUNDS
else:
round_keys = self._key_gen256(key)
num_rounds = AES_256_ROUNDS
# Initial round
if self.verbose:
print(" Initial, partial round.")
tmp_block1 = self._addroundkey(round_keys[len(round_keys) - 1], tmp_block)
tmp_block2 = self._inv__shiftrows(tmp_block1)
tmp_block4 = self._inv__subbytes(tmp_block2)
# Main rounds
for i in range(1 , (num_rounds)):
if self.verbose:
print("")
print(" Round %02d" % i)
print(" ---------")
tmp_block1 = self._addroundkey(round_keys[(len(round_keys) - i - 1)], tmp_block4)
tmp_block2 = self._inv__mixcolumns(tmp_block1)
tmp_block3 = self._inv__shiftrows(tmp_block2)
tmp_block4 = self._inv__subbytes(tmp_block3)
# Final round
if self.verbose:
print(" Final AddRoundKeys round.")
res_block = self._addroundkey(round_keys[0], tmp_block4)
return res_block
def _print_block(self, block):
"""
Print the given block as four 32 bit words.
"""
(w0, w1, w2, w3) = block
print("0x%08x, 0x%08x, 0x%08x, 0x%08x" % (w0, w1, w2, w3))
def _print_key(self, key):
"""
Print the given key as on or two sets of four 32 bit words.
"""
if len(key) == 8:
(k0, k1, k2, k3, k4, k5, k6, k7) = key
self._print_block((k0, k1, k2, k3))
self._print_block((k4, k5, k6, k7))
else:
self._print_block(key)
def _b2w(self, b0, b1, b2, b3):
"""
Creates a word from the given bytes.
"""
return (b0 << 24) + (b1 << 16) + (b2 << 8) + b3
def _w2b(self, w):
"""
Extracts the bytes in a word.
"""
b0 = w >> 24
b1 = w >> 16 & 0xff
b2 = w >> 8 & 0xff
b3 = w & 0xff
return (b0, b1, b2, b3)
def _gm2(self, b):
"""
The specific Galois Multiplication by two for a given byte.
"""
return ((b << 1) ^ (0x1b & ((b >> 7) * 0xff))) & 0xff
def _gm3(self, b):
"""
The specific Galois Multiplication by three for a given byte.
"""
return self._gm2(b) ^ b
def _gm4(self, b):
"""
The specific Galois Multiplication by four for a given byte.
"""
return self._gm2(self._gm2(b))
def _gm8(self, b):
"""
The specific Galois Multiplication by eight for a given byte.
"""
return self._gm2(self._gm4(b))
def _gm09(self, b):
"""
The specific Galois Multiplication by nine for a given byte.
"""
return self._gm8(b) ^ b
def _gm11(self, b):
"""
The specific Galois Multiplication by 11 for a given byte.
"""
return self._gm8(b) ^ self._gm2(b) ^ b
def _gm13(self, b):
"""
The specific Galois Multiplication by 13 for a given byte.
"""
return self._gm8(b) ^ self._gm4(b) ^ b
def gm14(self, b):
"""
The specific Galois Multiplication by 14 for a given byte.
"""
return self._gm8(b) ^ self._gm4(b) ^ self._gm2(b)
def _substw(self, w):
"""
Returns a 32-bit word in which each of the bytes in the
given 32-bit word has been used as lookup into the AES S-box.
"""
(b0, b1, b2, b3) = self._w2b(w)
s0 = self.sbox[b0]
s1 = self.sbox[b1]
s2 = self.sbox[b2]
s3 = self.sbox[b3]
res = self._b2w(s0, s1, s2, s3)
if (self.verbose):
print("Inside _substw:")
print("b0 = 0x%02x, b1 = 0x%02x, b2 = 0x%02x, b3 = 0x%02x" %
(b0, b1, b2, b3))
print("s0 = 0x%02x, s1 = 0x%02x, s2 = 0x%02x, s3 = 0x%02x" %
(s0, s1, s2, s3))
print("res = 0x%08x" % (res))
return res
def _inv__substw(self, w):
"""
Returns a 32-bit word in which each of the bytes in the
given 32-bit word has been used as lookup into
the inverse AES S-box.
"""
(b0, b1, b2, b3) = self._w2b(w)
s0 = self.inv_sbox[b0]
s1 = self.inv_sbox[b1]
s2 = self.inv_sbox[b2]
s3 = self.inv_sbox[b3]
res = self._b2w(s0, s1, s2, s3)
if (self.verbose):
print("Inside _inv__substw:")
print("b0 = 0x%02x, b1 = 0x%02x, b2 = 0x%02x, b3 = 0x%02x" %
(b0, b1, b2, b3))
print("s0 = 0x%02x, s1 = 0x%02x, s2 = 0x%02x, s3 = 0x%02x" %
(s0, s1, s2, s3))
print("res = 0x%08x" % (res))
return res
def _rolx(self, w, x):
"""
Rotate the given 32 bit word x bits left.
"""
return ((w << x) | (w >> (32 - x))) & 0xffffffff
def _next_128bit_key(self, prev_key, rcon):
"""
Generate the next four key words for aes-128 based on given
rcon and previous key words.
"""
(w0, w1, w2, w3) = prev_key
rol = self._rolx(w3, 8)
subst = self._substw(rol)
t = subst ^ (rcon << 24)
k0 = w0 ^ t
k1 = w1 ^ w0 ^ t
k2 = w2 ^ w1 ^ w0 ^ t
k3 = w3 ^ w2 ^ w1 ^ w0 ^ t
if (self.verbose):
print("Inside next 128bit key:")
print("w0 = 0x%08x, w1 = 0x%08x, w2 = 0x%08x, w3 = 0x%08x" %
(w0, w1, w2, w3))
print("rol = 0x%08x, subst = 0x%08x, rcon = 0x%02x, t = 0x%08x" %
(rol, subst, rcon, t))
print("k0 = 0x%08x, k1 = 0x%08x, k2 = 0x%08x, k3 = 0x%08x" %
(k0, k1, k2, k3))
return (k0, k1, k2, k3)
def _key_gen128(self, key):
"""
Generating the keys for 128 bit keys.
"""
if self.verbose:
print("Doing the 128 bit key expansion")
round_keys = []
round_keys.append(key)
for i in range(10):
round_keys.append(self._next_128bit_key(round_keys[i], self._get_rcon(i + 1)))
if (self.verbose):
print("Input key:")
self._print_block(key)
print("")
print("Generated keys:")
for k in round_keys:
self._print_block(k)
print("")
return round_keys
def _next_256bit_key_a(self, key0, key1, rcon):
"""
Generate the next four key words for aes-256 using algorithm A
based on given rcon and the previous two keys.
"""
(w0, w1, w2, w3) = key0
(w4, w5, w6, w7) = key1
sw = self._substw(self._rolx(w7, 8))
rw = (rcon << 24)
t = sw ^ rw
k0 = w0 ^ t
k1 = w1 ^ w0 ^ t
k2 = w2 ^ w1 ^ w0 ^ t
k3 = w3 ^ w2 ^ w1 ^ w0 ^ t
if (self.dump_vars):
print("next_256bit_key_a:")
print("w0 = 0x%08x, w0 = 0x%08x, w0 = 0x%08x, w0 = 0x%08x" % (w0, w1, w2, w3))
print("w4 = 0x%08x, w5 = 0x%08x, w6 = 0x%08x, w7 = 0x%08x" % (w4, w5, w6, w7))
print("t = 0x%08x, sw = 0x%08x, rw = 0x%08x" % (t, sw, rw))
print("k0 = 0x%08x, k0 = 0x%08x, k0 = 0x%08x, k0 = 0x%08x" % (k0, k1, k2, k3))
print("")
return (k0, k1, k2, k3)
def _next_256bit_key_b(self, key0, key1):
"""
Generate the next four key words for aes-256 using algorithm B
based on given previous eight keywords.
"""
(w0, w1, w2, w3) = key0
(w4, w5, w6, w7) = key1
t = self._substw(w7)
k0 = w0 ^ t
k1 = w1 ^ w0 ^ t
k2 = w2 ^ w1 ^ w0 ^ t
k3 = w3 ^ w2 ^ w1 ^ w0 ^ t
if (self.dump_vars):
print("next_256bit_key_b:")
print("w0 = 0x%08x, w0 = 0x%08x, w0 = 0x%08x, w0 = 0x%08x" % (w0, w1, w2, w3))
print("w4 = 0x%08x, w5 = 0x%08x, w6 = 0x%08x, w7 = 0x%08x" % (w4, w5, w6, w7))
print("t = 0x%08x" % (t))
print("k0 = 0x%08x, k0 = 0x%08x, k0 = 0x%08x, k0 = 0x%08x" % (k0, k1, k2, k3))
print("")
return (k0, k1, k2, k3)
def _key_gen256(self, key):
"""
Generating the keys for 256 bit keys.
"""
round_keys = []
(k0, k1, k2, k3, k4, k5, k6, k7) = key
round_keys.append((k0, k1, k2, k3))
round_keys.append((k4, k5, k6, k7))
j = 1
for i in range(0, (AES_256_ROUNDS - 2), 2):
k = self._next_256bit_key_a(round_keys[i], round_keys[i + 1], self._get_rcon(j))
round_keys.append(k)
k = self._next_256bit_key_b(round_keys[i + 1], round_keys[i + 2])
round_keys.append(k)
j += 1
# One final key needs to be generated.
k = self._next_256bit_key_a(round_keys[12], round_keys[13], self._get_rcon(7))
round_keys.append(k)
if (self.verbose):
print("Input key:")
self._print_block((k0, k1, k2, k3))
self._print_block((k4, k5, k6, k7))
print("")
print("Generated keys:")
for k in round_keys:
self._print_block(k)
print("")
return round_keys
def _get_rcon(self, round):
"""
Function implementation of rcon. Calculates rcon for a
given round. This could be implemented as an iterator.
"""
rcon = 0x8d
for i in range(0, round):
rcon = ((rcon << 1) ^ (0x11b & - (rcon >> 7))) & 0xff
return rcon
def _addroundkey(self, key, block):
"""
AES AddRoundKey block operation.
Perform XOR combination of the given block and the given key.
"""
(w0, w1, w2, w3) = block
(k0, k1, k2, k3) = key
res_block = (w0 ^ k0, w1 ^ k1, w2 ^ k2, w3 ^ k3)
if (self.verbose):
print("AddRoundKey key, block in and block out:")
self._print_block(key)
self._print_block(block)
self._print_block(res_block)
print("")
return res_block
def _mixw(self, w):
"""
Perform bit mixing of the given words.
"""
(b0, b1, b2, b3) = self._w2b(w)
mb0 = self._gm2(b0) ^ self._gm3(b1) ^ b2 ^ b3
mb1 = b0 ^ self._gm2(b1) ^ self._gm3(b2) ^ b3
mb2 = b0 ^ b1 ^ self._gm2(b2) ^ self._gm3(b3)
mb3 = self._gm3(b0) ^ b1 ^ b2 ^ self._gm2(b3)
return self._b2w(mb0, mb1, mb2, mb3)
def _mixcolumns(self, block):
"""
AES MixColumns on the given block.
"""
(c0, c1, c2, c3) = block
mc0 = self._mixw(c0)
mc1 = self._mixw(c1)
mc2 = self._mixw(c2)
mc3 = self._mixw(c3)
res_block = (mc0, mc1, mc2, mc3)
if (self.verbose):
print("MixColumns block in and block out:")
self._print_block(block)
self._print_block(res_block)
print("")
return res_block
def _subbytes(self, block):
"""
AES SubBytes operation on the given block.
"""
(w0, w1, w2, w3) = block
res_block = (self._substw(w0), self._substw(w1),
self._substw(w2), self._substw(w3))
if (self.verbose):
print("SubBytes block in and block out:")
self._print_block(block)
self._print_block(res_block)
print("")
return res_block
def _shiftrows(self, block):
"""
AES ShiftRows block operation.
"""
(w0, w1, w2, w3) = block
c0 = self._w2b(w0)
c1 = self._w2b(w1)
c2 = self._w2b(w2)
c3 = self._w2b(w3)
ws0 = self._b2w(c0[0], c1[1], c2[2], c3[3])
ws1 = self._b2w(c1[0], c2[1], c3[2], c0[3])
ws2 = self._b2w(c2[0], c3[1], c0[2], c1[3])
ws3 = self._b2w(c3[0], c0[1], c1[2], c2[3])
res_block = (ws0, ws1, ws2, ws3)
if (self.verbose):
print("ShiftRows block in and block out:")
self._print_block(block)
self._print_block(res_block)
print("")
return res_block
def _inv__mixw(self, w):
"""
Perform inverse bit mixing of the given words.
"""
(b0, b1, b2, b3) = self._w2b(w)
mb0 = self.gm14(b0) ^ self._gm11(b1) ^ self._gm13(b2) ^ self._gm09(b3)
mb1 = self._gm09(b0) ^ self.gm14(b1) ^ self._gm11(b2) ^ self._gm13(b3)
mb2 = self._gm13(b0) ^ self._gm09(b1) ^ self.gm14(b2) ^ self._gm11(b3)
mb3 = self._gm11(b0) ^ self._gm13(b1) ^ self._gm09(b2) ^ self.gm14(b3)
return self._b2w(mb0, mb1, mb2, mb3)
def _inv__mixcolumns(self, block):
"""
AES Inverse MixColumns on the given block.
"""
(c0, c1, c2, c3) = block
mc0 = self._inv__mixw(c0)
mc1 = self._inv__mixw(c1)
mc2 = self._inv__mixw(c2)
mc3 = self._inv__mixw(c3)
res_block = (mc0, mc1, mc2, mc3)
if (self.verbose):
print("Inverse MixColumns block in and block out:")
self._print_block(block)
self._print_block(res_block)
print("")
return res_block
def _inv__shiftrows(self, block):
"""
AES inverse ShiftRows block operation.
"""
(w0, w1, w2, w3) = block
c0 = self._w2b(w0)
c1 = self._w2b(w1)
c2 = self._w2b(w2)
c3 = self._w2b(w3)
ws0 = self._b2w(c0[0], c3[1], c2[2], c1[3])
ws1 = self._b2w(c1[0], c0[1], c3[2], c2[3])
ws2 = self._b2w(c2[0], c1[1], c0[2], c3[3])
ws3 = self._b2w(c3[0], c2[1], c1[2], c0[3])
res_block = (ws0, ws1, ws2, ws3)
if (self.verbose):
print("Inverse ShiftRows block in and block out:")
self._print_block(block)
self._print_block(res_block)
print("")
return res_block
def _inv__subbytes(self, block):
"""
AES inverse SubBytes operation on the given block.
"""
(w0, w1, w2, w3) = block
res_block = (self._inv__substw(w0), self._inv__substw(w1),
self._inv__substw(w2), self._inv__substw(w3))
if (self.verbose):
print("Inverse SubBytes block in and block out:")
self._print_block(block)
self._print_block(res_block)
print("")
return res_block

Wyświetl plik

@ -0,0 +1,265 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#=======================================================================
# Author: Joachim Strömbergson
# Copyright (c) 2016, Secworks Sweden AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#=======================================================================
from uaes import AesCypher
R128 = (0, 0, 0, 0x00000087)
MAX128 = ((2**128) - 1)
AES_BLOCK_LENGTH = 128
AES_BLOCK_BYTES_LENGTH = int(AES_BLOCK_LENGTH / 4)
SUB_BLOCK_BYTES_LENGTH = 8
class CmacHasher():
def __init__(self, key=(0x00000000, 0x00000000, 0x00000000, 0x00000000), verbose = False):
self.verbose = verbose
self.cypher = AesCypher(verbose=verbose, dump_vars=verbose)
self.current_block = b''
self.message = []
self.key = key
def pad_block(self, block_in_bytes):
"""
Pads a given block with the "1000...." padding.
Parameters
----------
block_in_bytes : bytes array
bytes array to complete with the appropriate padding
Returns
-------
tuple(int, int, int, int)
Block with padding split in the cmac block format
"""
if len(block_in_bytes) < AES_BLOCK_BYTES_LENGTH:
block_in_bytes += b'8'
block_in_bytes += b'0' * (AES_BLOCK_BYTES_LENGTH - len(block_in_bytes))
return self._bytes_to_block(block_in_bytes)
def update(self, bytes_value):
"""
Adds the given bytes value to the current block.
If the current block reaches the CMAC block size, the bytes array
is converted into a block of 4 integers and added to the message.
Parameters
----------
bytes_value : bytes array
Hexadecimal value to concatenate in the message
"""
free_space = AES_BLOCK_BYTES_LENGTH - len(self.current_block)
n_extra_bytes = len(bytes_value) - free_space
if n_extra_bytes > 0:
while(n_extra_bytes > 0):
self.current_block += bytes_value[:free_space]
self._update_message()
self.current_block = b''
bytes_value = bytes_value[free_space:]
free_space = AES_BLOCK_BYTES_LENGTH - len(self.current_block)
n_extra_bytes = len(bytes_value) - free_space
self.current_block = bytes_value
else:
self.current_block += bytes_value
if n_extra_bytes == 0:
self._update_message()
self.current_block = b''
def xor_words(self, a, b):
"""
Apply an XOR operator element to element between a and b.
Parameters
----------
a : tuple(int, int, int, int)
Tuple of four integers to be 'xor-ed' with b
b : tuple(int, int, int, int)
Tuple of four integers to be 'xor-ed' with a
Returns
-------
tuple(int, int, int, int)
Result of the element to element xor operation.
"""
c = (a[0] ^ b[0], a[1] ^ b[1], a[2] ^ b[2], a[3] ^ b[3])
if (self.verbose):
print("XORing words in the following two 128 bit block gives the result:")
self.cypher.print_block(a)
self.cypher.print_block(b)
self.cypher.print_block(c)
return c
def cmac_gen_subkeys(self, key):
"""
Generate subkeys K1 and K2.
K1 is used to generate complete message, i.e. where all blocks contain
128 bits of information.
K2 is used to generate incomplete messages, i.e with a final block length
lower than 128 bits.
Parameters
----------
key : tuple
128 or 256 bits key to compute the aes cmac key
Returns
-------
tuple
K1 and K2
"""
L = self.cypher.aes_encipher_block(key, (0, 0, 0, 0))
Pre_K1 = self._shift_words(L)
MSBL = (L[0] >> 31) & 0x01
if MSBL:
K1 = self.xor_words(Pre_K1, R128)
else:
K1 = Pre_K1
Pre_K2 = self._shift_words(K1)
MSBK1 = (K1[0] >> 31) & 0x01
if MSBK1:
K2 = self.xor_words(Pre_K2, R128)
else:
K2 = Pre_K2
if (self.verbose):
print("Internal data during sub key generation")
print("---------------------------------------")
print("L:")
self.cypher.print_block(L)
print("MSBL = 0x%01x" % MSBL)
print("Pre_K1:")
self.cypher.print_block(Pre_K1)
print("K1:")
self.cypher.print_block(K1)
print("MSBK1 = 0x%01x" % MSBK1)
print("Pre_K2:")
self.cypher.print_block(Pre_K2)
print("K2:")
self.cypher.print_block(K2)
print()
return (K1, K2)
def digest(self):
"""
Hash the message given to the hasher.
The message is a list of 4-integers blocks, if the last block
does not contain 128 bits of information, it will be padded.
Returns
-------
bytes array
Hash value for the given message
"""
import struct
key = self.key
final_length = len(self.current_block) * 4 if len(self.current_block) != 0 else AES_BLOCK_LENGTH
message = self.message
# Start by generating the subkeys
(K1, K2) = self.cmac_gen_subkeys(key)
print("CMAC Subkeys generated.")
state = (0x00000000, 0x00000000, 0x00000000, 0x00000000)
blocks = len(message) if final_length == AES_BLOCK_LENGTH else len(message) + 1
if blocks == 0:
# Empty message.
paddded_block = self.pad_block(self.current_block)
tweak = self.xor_words(paddded_block, K2)
if (self.verbose):
print("tweak empty block")
self.cypher.print_block(tweak)
cmac_hash = self.cypher.aes_encipher_block(key, tweak)
else:
for i in range(blocks - 1):
state = self.xor_words(state, message[i])
if (self.verbose):
print("state before aes block %d:" % (i + 1))
self.cypher.print_block(state)
state = self.cypher.aes_encipher_block(key, state)
if (self.verbose):
print("state after aes block %d:" % (i + 1))
self.cypher.print_block(state)
if (final_length == AES_BLOCK_LENGTH):
tweak = self.xor_words(K1, message[-1])
if (self.verbose):
print("tweak complete final block")
self.cypher.print_block(tweak)
else:
padded_block = self.pad_block(self.current_block)
tweak = self.xor_words(K2, padded_block)
if (self.verbose):
print("tweak incomplete final block")
self.cypher.print_block(tweak)
state = self.xor_words(state, tweak)
if (self.verbose):
print("state before aes final block:")
self.cypher.print_block(state)
cmac_hash = self.cypher.aes_encipher_block(key, state)
if (self.verbose):
print("state after aes final block:")
self.cypher.print_block(cmac_hash)
print("CMAC hash generated: ", cmac_hash)
cmac_hash = struct.pack('<IIII', *cmac_hash)
print("CMAC hash (bytes): ", cmac_hash)
return cmac_hash
def _bytes_to_block(self, block_in_bytes):
sub_blocks = []
for sb_start in range(0, AES_BLOCK_BYTES_LENGTH, SUB_BLOCK_BYTES_LENGTH):
sub_blocks.append(int("0x" + block_in_bytes[sb_start:sb_start + SUB_BLOCK_BYTES_LENGTH].decode()) )
return tuple(sub_blocks)
def _update_message(self):
block = self._bytes_to_block(self.current_block)
self.message.append(block)
def _shift_words(self, wl):
w = ((wl[0] << 96) + (wl[1] << 64) + (wl[2] << 32) + wl[3]) & MAX128
ws = w << 1 & MAX128
return ((ws >> 96) & 0xffffffff, (ws >> 64) & 0xffffffff,
(ws >> 32) & 0xffffffff, ws & 0xffffffff)

Wyświetl plik

@ -5,3 +5,4 @@ absolute_import = True
with_statement = True
print_function = True
unicode_literals = True
annotations = True

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.0.3")
metadata(version="0.1.0")
module("__future__.py")

Wyświetl plik

@ -52,6 +52,25 @@ def _bytes_from_decode_data(s):
raise TypeError("argument should be bytes or ASCII string, not %s" % s.__class__.__name__)
def _maketrans(f, t):
"""Re-implement bytes.maketrans() as there is no such function in micropython"""
if len(f) != len(t):
raise ValueError("maketrans arguments must have same length")
translation_table = dict(zip(f, t))
return translation_table
def _translate(input_bytes, trans_table):
"""Re-implement bytes.translate() as there is no such function in micropython"""
result = bytearray()
for byte in input_bytes:
translated_byte = trans_table.get(byte, byte)
result.append(translated_byte)
return bytes(result)
# Base64 encoding/decoding uses binascii
@ -73,7 +92,7 @@ def b64encode(s, altchars=None):
if not isinstance(altchars, bytes_types):
raise TypeError("expected bytes, not %s" % altchars.__class__.__name__)
assert len(altchars) == 2, repr(altchars)
return encoded.translate(bytes.maketrans(b"+/", altchars))
encoded = _translate(encoded, _maketrans(b"+/", altchars))
return encoded
@ -95,7 +114,7 @@ def b64decode(s, altchars=None, validate=False):
if altchars is not None:
altchars = _bytes_from_decode_data(altchars)
assert len(altchars) == 2, repr(altchars)
s = s.translate(bytes.maketrans(altchars, b"+/"))
s = _translate(s, _maketrans(altchars, b"+/"))
if validate and not re.match(b"^[A-Za-z0-9+/]*=*$", s):
raise binascii.Error("Non-base64 digit found")
return binascii.a2b_base64(s)
@ -120,8 +139,8 @@ def standard_b64decode(s):
return b64decode(s)
# _urlsafe_encode_translation = bytes.maketrans(b'+/', b'-_')
# _urlsafe_decode_translation = bytes.maketrans(b'-_', b'+/')
# _urlsafe_encode_translation = _maketrans(b'+/', b'-_')
# _urlsafe_decode_translation = _maketrans(b'-_', b'+/')
def urlsafe_b64encode(s):
@ -132,7 +151,7 @@ def urlsafe_b64encode(s):
'/'.
"""
# return b64encode(s).translate(_urlsafe_encode_translation)
raise NotImplementedError()
return b64encode(s, b"-_").rstrip(b"\n")
def urlsafe_b64decode(s):
@ -266,7 +285,7 @@ def b32decode(s, casefold=False, map01=None):
if map01 is not None:
map01 = _bytes_from_decode_data(map01)
assert len(map01) == 1, repr(map01)
s = s.translate(bytes.maketrans(b"01", b"O" + map01))
s = _translate(s, _maketrans(b"01", b"O" + map01))
if casefold:
s = s.upper()
# Strip off pad characters from the right. We need to count the pad

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="3.3.4")
metadata(version="3.3.5")
require("binascii")
require("struct")

Wyświetl plik

@ -3,15 +3,15 @@
_WBITS = const(15)
import io, deflate
import builtins, io, deflate
def GzipFile(fileobj):
return deflate.DeflateIO(fileobj, deflate.GZIP, _WBITS)
def open(filename, mode):
return deflate.DeflateIO(open(filename, mode), deflate.GZIP, _WBITS, True)
def open(filename, mode="rb"):
return deflate.DeflateIO(builtins.open(filename, mode), deflate.GZIP, _WBITS, True)
if hasattr(deflate.DeflateIO, "write"):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="1.0.0")
metadata(version="1.0.1")
module("gzip.py")

Wyświetl plik

@ -17,7 +17,7 @@ class HMAC:
make_hash = digestmod # A
elif isinstance(digestmod, str):
# A hash name suitable for hashlib.new().
make_hash = lambda d=b"": hashlib.new(digestmod, d) # B
make_hash = lambda d=b"": getattr(hashlib, digestmod)(d)
else:
# A module supporting PEP 247.
make_hash = digestmod.new # C

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="3.4.3")
metadata(version="3.4.4")
module("hmac.py")

Wyświetl plik

@ -8,7 +8,7 @@ import hashlib
msg = b"zlutoucky kun upel dabelske ody"
dig = hmac.new(b"1234567890", msg=msg, digestmod=hashlib.sha256).hexdigest()
dig = hmac.new(b"1234567890", msg=msg, digestmod="sha256").hexdigest()
print("c735e751e36b08fb01e25794bdb15e7289b82aecdb652c8f4f72f307b39dad39")
print(dig)

Wyświetl plik

@ -1,3 +0,0 @@
metadata(version="0.1.0")
package("json")

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="0.1.4")
metadata(version="0.2.0")
# Originally written by Paul Sokolovsky.

Wyświetl plik

@ -66,6 +66,13 @@ def isdir(path):
return False
def isfile(path):
try:
return bool(os.stat(path)[0] & 0x8000)
except OSError:
return False
def expanduser(s):
if s == "~" or s.startswith("~/"):
h = os.getenv("HOME")

Wyświetl plik

@ -20,3 +20,7 @@ assert not exists(dir + "/test_path.py--")
assert isdir(dir + "/os")
assert not isdir(dir + "/os--")
assert not isdir(dir + "/test_path.py")
assert not isfile(dir + "/os")
assert isfile(dir + "/test_path.py")
assert not isfile(dir + "/test_path.py--")

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0")
metadata(version="0.2.1")
module("ssl.py")
module("ssl.py", opt=3)

Wyświetl plik

@ -1,36 +1,65 @@
from ussl import *
import ussl as _ussl
import tls
from tls import *
# Constants
for sym in "CERT_NONE", "CERT_OPTIONAL", "CERT_REQUIRED":
if sym not in globals():
globals()[sym] = object()
class SSLContext:
def __init__(self, *args):
self._context = tls.SSLContext(*args)
self._context.verify_mode = CERT_NONE
@property
def verify_mode(self):
return self._context.verify_mode
@verify_mode.setter
def verify_mode(self, val):
self._context.verify_mode = val
def load_cert_chain(self, certfile, keyfile):
if isinstance(certfile, str):
with open(certfile, "rb") as f:
certfile = f.read()
if isinstance(keyfile, str):
with open(keyfile, "rb") as f:
keyfile = f.read()
self._context.load_cert_chain(certfile, keyfile)
def load_verify_locations(self, cafile=None, cadata=None):
if cafile:
with open(cafile, "rb") as f:
cadata = f.read()
self._context.load_verify_locations(cadata)
def wrap_socket(
self, sock, server_side=False, do_handshake_on_connect=True, server_hostname=None
):
return self._context.wrap_socket(
sock,
server_side=server_side,
do_handshake_on_connect=do_handshake_on_connect,
server_hostname=server_hostname,
)
def wrap_socket(
sock,
keyfile=None,
certfile=None,
server_side=False,
key=None,
cert=None,
cert_reqs=CERT_NONE,
*,
ca_certs=None,
server_hostname=None
cadata=None,
server_hostname=None,
do_handshake=True,
):
# TODO: More arguments accepted by CPython could also be handled here.
# That would allow us to accept ca_certs as a positional argument, which
# we should.
kw = {}
if keyfile is not None:
kw["keyfile"] = keyfile
if certfile is not None:
kw["certfile"] = certfile
if server_side is not False:
kw["server_side"] = server_side
if cert_reqs is not CERT_NONE:
kw["cert_reqs"] = cert_reqs
if ca_certs is not None:
kw["ca_certs"] = ca_certs
if server_hostname is not None:
kw["server_hostname"] = server_hostname
return _ussl.wrap_socket(sock, **kw)
con = SSLContext(PROTOCOL_TLS_SERVER if server_side else PROTOCOL_TLS_CLIENT)
if cert or key:
con.load_cert_chain(cert, key)
if cadata:
con.load_verify_locations(cadata=cadata)
con.verify_mode = cert_reqs
return con.wrap_socket(
sock,
server_side=server_side,
do_handshake_on_connect=do_handshake,
server_hostname=server_hostname,
)

Wyświetl plik

@ -1,4 +1,4 @@
metadata(description="Adds write (create/append) support to tarfile.", version="0.1.1")
metadata(description="Adds write (create/append) support to tarfile.", version="0.1.2")
require("tarfile")
package("tarfile")

Wyświetl plik

@ -67,7 +67,7 @@ def addfile(self, tarinfo, fileobj=None):
name += "/"
hdr = uctypes.struct(uctypes.addressof(buf), _TAR_HEADER, uctypes.LITTLE_ENDIAN)
hdr.name[:] = name.encode("utf-8")[:100]
hdr.mode[:] = b"%07o\0" % (tarinfo.mode & 0o7777)
hdr.mode[:] = b"%07o\0" % ((0o755 if tarinfo.isdir() else 0o644) & 0o7777)
hdr.uid[:] = b"%07o\0" % tarinfo.uid
hdr.gid[:] = b"%07o\0" % tarinfo.gid
hdr.size[:] = b"%011o\0" % size
@ -96,9 +96,10 @@ def addfile(self, tarinfo, fileobj=None):
def add(self, name, recursive=True):
from . import TarInfo
tarinfo = TarInfo(name)
try:
stat = os.stat(name)
res_name = (name + '/') if (stat[0] & 0xf000) == 0x4000 else name
tarinfo = TarInfo(res_name)
tarinfo.mode = stat[0]
tarinfo.uid = stat[4]
tarinfo.gid = stat[5]

Wyświetl plik

@ -0,0 +1,45 @@
# time
This library _extends_ the built-in [MicroPython `time`
module](https://docs.micropython.org/en/latest/library/time.html#module-time) to
include
[`time.strftime()`](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior).
`strftime()` is omitted from the built-in `time` module to conserve space.
## Installation
Use `mip` via `mpremote`:
```bash
> mpremote mip install time
```
See [Package management](https://docs.micropython.org/en/latest/reference/packages.html) for more details on using `mip` and `mpremote`.
## Common uses
`strftime()` is used when using a loggging [Formatter
Object](https://docs.python.org/3/library/logging.html#formatter-objects) that
employs
[`asctime`](https://docs.python.org/3/library/logging.html#formatter-objects).
For example:
```python
logging.Formatter('%(asctime)s | %(name)s | %(levelname)s - %(message)s')
```
The expected output might look like:
```text
Tue Feb 17 09:42:58 2009 | MAIN | INFO - test
```
But if this `time` extension library isn't installed, `asctime` will always be
`None`:
```text
None | MAIN | INFO - test
```

Wyświetl plik

@ -1,18 +1,15 @@
#!/bin/bash
########################################################################################
# code formatting
# commit formatting
function ci_code_formatting_setup {
sudo apt-add-repository --yes --update ppa:pybricks/ppa
sudo apt-get install uncrustify
pip3 install black
uncrustify --version
black --version
}
function ci_code_formatting_run {
tools/codeformat.py -v
function ci_commit_formatting_run {
git remote add upstream https://github.com/micropython/micropython-lib.git
git fetch --depth=100 upstream master
# If the common ancestor commit hasn't been found, fetch more.
git merge-base upstream/master HEAD || git fetch --unshallow upstream master
# For a PR, upstream/master..HEAD ends with a merge commit into master, exclude that one.
tools/verifygitlog.py -v upstream/master..HEAD --no-merges
}
########################################################################################
@ -33,7 +30,11 @@ function ci_build_packages_check_manifest {
for file in $(find -name manifest.py); do
echo "##################################################"
echo "# Testing $file"
python3 /tmp/micropython/tools/manifestfile.py --lib . --compile $file
extra_args=
if [[ "$file" =~ "/unix-ffi/" ]]; then
extra_args="--unix-ffi"
fi
python3 /tmp/micropython/tools/manifestfile.py $extra_args --lib . --compile $file
done
}

Wyświetl plik

@ -25,87 +25,19 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# This is based on tools/codeformat.py from the main micropython/micropython
# repository but without support for .c/.h files.
# This is just a wrapper around running ruff format, so that code formatting can be
# invoked in the same way as in the main repo.
import argparse
import glob
import itertools
import os
import re
import subprocess
# Relative to top-level repo dir.
PATHS = [
"**/*.py",
]
EXCLUSIONS = []
# Path to repo top-level dir.
TOP = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
PY_EXTS = (".py",)
def list_files(paths, exclusions=None, prefix=""):
files = set()
for pattern in paths:
files.update(glob.glob(os.path.join(prefix, pattern), recursive=True))
for pattern in exclusions or []:
files.difference_update(glob.fnmatch.filter(files, os.path.join(prefix, pattern)))
return sorted(files)
def main():
cmd_parser = argparse.ArgumentParser(description="Auto-format Python files.")
cmd_parser.add_argument("-v", action="store_true", help="Enable verbose output")
cmd_parser.add_argument(
"-f",
action="store_true",
help="Filter files provided on the command line against the default list of files to check.",
)
cmd_parser.add_argument("files", nargs="*", help="Run on specific globs")
args = cmd_parser.parse_args()
# Expand the globs passed on the command line, or use the default globs above.
files = []
if args.files:
files = list_files(args.files)
if args.f:
# Filter against the default list of files. This is a little fiddly
# because we need to apply both the inclusion globs given in PATHS
# as well as the EXCLUSIONS, and use absolute paths
files = {os.path.abspath(f) for f in files}
all_files = set(list_files(PATHS, EXCLUSIONS, TOP))
if args.v: # In verbose mode, log any files we're skipping
for f in files - all_files:
print("Not checking: {}".format(f))
files = list(files & all_files)
else:
files = list_files(PATHS, EXCLUSIONS, TOP)
# Extract files matching a specific language.
def lang_files(exts):
for file in files:
if os.path.splitext(file)[1].lower() in exts:
yield file
# Run tool on N files at a time (to avoid making the command line too long).
def batch(cmd, files, N=200):
while True:
file_args = list(itertools.islice(files, N))
if not file_args:
break
subprocess.check_call(cmd + file_args)
# Format Python files with black.
command = ["black", "--fast", "--line-length=99"]
if args.v:
command.append("-v")
else:
command.append("-q")
batch(command, lang_files(PY_EXTS))
command = ["ruff", "format", "."]
subprocess.check_call(command, cwd=TOP)
if __name__ == "__main__":

Wyświetl plik

@ -185,9 +185,7 @@ urls = {{ Homepage = "https://github.com/micropython/micropython-lib" }}
"""
[tool.hatch.build]
packages = ["{}"]
""".format(
top_level_package
),
""".format(top_level_package),
file=toml_file,
)

Wyświetl plik

@ -0,0 +1,173 @@
#!/usr/bin/env python3
# This is an exact duplicate of verifygitlog.py from the main repo.
import re
import subprocess
import sys
verbosity = 0 # Show what's going on, 0 1 or 2.
suggestions = 1 # Set to 0 to not include lengthy suggestions in error messages.
ignore_prefixes = []
def verbose(*args):
if verbosity:
print(*args)
def very_verbose(*args):
if verbosity > 1:
print(*args)
class ErrorCollection:
# Track errors and warnings as the program runs
def __init__(self):
self.has_errors = False
self.has_warnings = False
self.prefix = ""
def error(self, text):
print("error: {}{}".format(self.prefix, text))
self.has_errors = True
def warning(self, text):
print("warning: {}{}".format(self.prefix, text))
self.has_warnings = True
def git_log(pretty_format, *args):
# Delete pretty argument from user args so it doesn't interfere with what we do.
args = ["git", "log"] + [arg for arg in args if "--pretty" not in args]
args.append("--pretty=format:" + pretty_format)
very_verbose("git_log", *args)
# Generator yielding each output line.
for line in subprocess.Popen(args, stdout=subprocess.PIPE).stdout:
yield line.decode().rstrip("\r\n")
def diagnose_subject_line(subject_line, subject_line_format, err):
err.error("Subject line: " + subject_line)
if not subject_line.endswith("."):
err.error('* must end with "."')
if not re.match(r"^[^!]+: ", subject_line):
err.error('* must start with "path: "')
if re.match(r"^[^!]+: *$", subject_line):
err.error("* must contain a subject after the path.")
m = re.match(r"^[^!]+: ([a-z][^ ]*)", subject_line)
if m:
err.error('* first word of subject ("{}") must be capitalised.'.format(m.group(1)))
if re.match(r"^[^!]+: [^ ]+$", subject_line):
err.error("* subject must contain more than one word.")
err.error("* must match: " + repr(subject_line_format))
err.error('* Example: "py/runtime: Add support for foo to bar."')
def verify(sha, err):
verbose("verify", sha)
err.prefix = "commit " + sha + ": "
# Author and committer email.
for line in git_log("%ae%n%ce", sha, "-n1"):
very_verbose("email", line)
if "noreply" in line:
err.error("Unwanted email address: " + line)
# Message body.
raw_body = list(git_log("%B", sha, "-n1"))
verify_message_body(raw_body, err)
def verify_message_body(raw_body, err):
if not raw_body:
err.error("Message is empty")
return
# Subject line.
subject_line = raw_body[0]
for prefix in ignore_prefixes:
if subject_line.startswith(prefix):
verbose("Skipping ignored commit message")
return
very_verbose("subject_line", subject_line)
subject_line_format = r"^[^!]+: [A-Z]+.+ .+\.$"
if not re.match(subject_line_format, subject_line):
diagnose_subject_line(subject_line, subject_line_format, err)
if len(subject_line) >= 73:
err.error("Subject line must be 72 or fewer characters: " + subject_line)
# Second one divides subject and body.
if len(raw_body) > 1 and raw_body[1]:
err.error("Second message line must be empty: " + raw_body[1])
# Message body lines.
for line in raw_body[2:]:
# Long lines with URLs are exempt from the line length rule.
if len(line) >= 76 and "://" not in line:
err.error("Message lines should be 75 or less characters: " + line)
if not raw_body[-1].startswith("Signed-off-by: ") or "@" not in raw_body[-1]:
err.error('Message must be signed-off. Use "git commit -s".')
def run(args):
verbose("run", *args)
err = ErrorCollection()
if "--check-file" in args:
filename = args[-1]
verbose("checking commit message from", filename)
with open(args[-1]) as f:
# Remove comment lines as well as any empty lines at the end.
lines = [line.rstrip("\r\n") for line in f if not line.startswith("#")]
while not lines[-1]:
lines.pop()
verify_message_body(lines, err)
else: # Normal operation, pass arguments to git log
for sha in git_log("%h", *args):
verify(sha, err)
if err.has_errors or err.has_warnings:
if suggestions:
print("See https://github.com/micropython/micropython/blob/master/CODECONVENTIONS.md")
else:
print("ok")
if err.has_errors:
sys.exit(1)
def show_help():
print("usage: verifygitlog.py [-v -n -h --check-file] ...")
print("-v : increase verbosity, can be specified multiple times")
print("-n : do not print multi-line suggestions")
print("-h : print this help message and exit")
print(
"--check-file : Pass a single argument which is a file containing a candidate commit message"
)
print(
"--ignore-rebase : Skip checking commits with git rebase autosquash prefixes or WIP as a prefix"
)
print("... : arguments passed to git log to retrieve commits to verify")
print(" see https://www.git-scm.com/docs/git-log")
print(" passing no arguments at all will verify all commits")
print("examples:")
print("verifygitlog.py -n10 # Check last 10 commits")
print("verifygitlog.py -v master..HEAD # Check commits since master")
if __name__ == "__main__":
args = sys.argv[1:]
verbosity = args.count("-v")
suggestions = args.count("-n") == 0
if "--ignore-rebase" in args:
args.remove("--ignore-rebase")
ignore_prefixes = ["squash!", "fixup!", "amend!", "WIP"]
if "-h" in args:
show_help()
else:
args = [arg for arg in args if arg not in ["-v", "-n", "-h"]]
run(args)

Wyświetl plik

@ -19,9 +19,13 @@ replacement for CPython.
### Usage
To use a unix-specific library, pass `unix_ffi=True` to `require()` in your
manifest file.
To use a unix-specific library, a manifest file must add the `unix-ffi`
library to the library search path using `add_library()`:
```py
require("os", unix_ffi=True) # Use the unix-ffi version instead of python-stdlib.
add_library("unix-ffi", "$(MPY_LIB_DIR)/unix-ffi", prepend=True)
```
Prepending the `unix-ffi` library to the path will make it so that the
`unix-ffi` version of a package will be preferred if that package appears in
both `unix-ffi` and another library (eg `python-stdlib`).

Wyświetl plik

@ -1,5 +1,5 @@
metadata(version="3.3.4")
require("re", unix_ffi=True)
require("re")
module("_markupbase.py")

Wyświetl plik

@ -1,7 +1,7 @@
metadata(version="0.5.1")
require("functools")
require("email.encoders", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.encoders")
require("email.errors")
package("email")

Wyświetl plik

@ -3,7 +3,7 @@ metadata(version="0.5.1")
require("base64")
require("binascii")
require("quopri")
require("re", unix_ffi=True)
require("re")
require("string")
package("email")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="0.5.1")
require("re", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.message", unix_ffi=True)
require("email.internal", unix_ffi=True)
require("re")
require("email.errors")
require("email.message")
require("email.internal")
package("email")

Wyświetl plik

@ -1,9 +1,9 @@
metadata(version="0.5.2")
require("re", unix_ffi=True)
require("re")
require("binascii")
require("email.encoders", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.charset", unix_ffi=True)
require("email.encoders")
require("email.errors")
require("email.charset")
package("email")

Wyświetl plik

@ -1,15 +1,15 @@
metadata(version="0.5.1")
require("re", unix_ffi=True)
require("re")
require("base64")
require("binascii")
require("functools")
require("string")
# require("calendar") TODO
require("abc")
require("email.errors", unix_ffi=True)
require("email.header", unix_ffi=True)
require("email.charset", unix_ffi=True)
require("email.utils", unix_ffi=True)
require("email.errors")
require("email.header")
require("email.charset")
require("email.utils")
package("email")

Wyświetl plik

@ -1,11 +1,11 @@
metadata(version="0.5.3")
require("re", unix_ffi=True)
require("re")
require("uu")
require("base64")
require("binascii")
require("email.utils", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.charset", unix_ffi=True)
require("email.utils")
require("email.errors")
require("email.charset")
package("email")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="0.5.1")
require("warnings")
require("email.feedparser", unix_ffi=True)
require("email.message", unix_ffi=True)
require("email.internal", unix_ffi=True)
require("email.feedparser")
require("email.message")
require("email.internal")
package("email")

Wyświetl plik

@ -1,13 +1,13 @@
metadata(version="3.3.4")
require("os", unix_ffi=True)
require("re", unix_ffi=True)
require("os")
require("re")
require("base64")
require("random")
require("datetime")
require("urllib.parse", unix_ffi=True)
require("urllib.parse")
require("warnings")
require("quopri")
require("email.charset", unix_ffi=True)
require("email.charset")
package("email")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.0.4")
# Originally written by Paul Sokolovsky.
require("ffilib", unix_ffi=True)
require("ffilib")
module("fcntl.py")

Wyświetl plik

@ -1,5 +1,5 @@
metadata(version="3.3.4")
require("os", unix_ffi=True)
require("os")
module("getopt.py")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.1.0")
# Originally written by Riccardo Magliocchetti.
require("ffilib", unix_ffi=True)
require("ffilib")
module("gettext.py")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="0.5.2")
require("os", unix_ffi=True)
require("os")
require("os-path")
require("re", unix_ffi=True)
require("re")
require("fnmatch")
module("glob.py")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="3.3.4")
require("_markupbase", unix_ffi=True)
require("_markupbase")
require("warnings")
require("html.entities", unix_ffi=True)
require("re", unix_ffi=True)
require("html.entities")
require("re")
package("html")

Wyświetl plik

@ -1,10 +1,10 @@
metadata(version="0.5.1")
require("email.parser", unix_ffi=True)
require("email.message", unix_ffi=True)
require("socket", unix_ffi=True)
require("email.parser")
require("email.message")
require("socket")
require("collections")
require("urllib.parse", unix_ffi=True)
require("urllib.parse")
require("warnings")
package("http")

Some files were not shown because too many files have changed in this diff Show More