Porównaj commity

...

20 Commity

Autor SHA1 Wiadomość Data
Matt Trentini e2cc351a8d
Merge dd2079dbb2 into 45ead11f96 2024-04-07 14:08:57 +02:00
Damien George 45ead11f96 ssl: Use "from tls import *" to be compatible with axtls.
axtls doesn't define all the CERT_xxx constants, nor the MBEDTLS_VERSION
constant.

This change means that `tls.SSLContext` is imported into the module, but
that's subsequently overridden by the class definition in this module.

Signed-off-by: Damien George <damien@micropython.org>
2024-03-28 17:44:37 +11:00
iabdalkader 661efa48f0 senml: Use the updated cbor2 API.
Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
2024-03-19 17:29:22 +11:00
iabdalkader 8ee876dcd6 cbor2: Deprecate decoder and encoder modules.
Deprecate decoder and encoder modules to maintain compatibility with the
CPython cbor2 module.

Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
2024-03-19 17:28:35 +11:00
Jim Mussared 5c7e3fc0bc json: Move to unix-ffi.
It requires the unix pcre-based re module.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2024-03-19 17:23:07 +11:00
Damien George 23df50d0ea unix-ffi: Remove "unix_ffi" argument from require().
And describe how to use `add_library()` instead.

Signed-off-by: Damien George <damien@micropython.org>
2024-03-17 13:22:36 +11:00
Damien George ffb07dbce5 gzip: Fix recursion error in open() function.
And give the `mode` parameter a default, matching CPython.

Signed-off-by: Damien George <damien@micropython.org>
2024-02-29 14:54:24 +11:00
Angus Gratton 224246531e lora-sx126x: Clean up some struct formatting.
Changes are cosmetic - and maybe very minor code size - but not functional.
_reg_read() was calling struct.packinto() with an incorrect number of
arguments but it seems like MicroPython didn't mind, as result is correct
for both versions.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:58:38 +11:00
Angus Gratton 35bb7952ba lora-sx126x: Fix syncword setting.
Fixes issue #796.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:58:08 +11:00
Angus Gratton 546284817a lora-sx127x: Implement missing syncword support.
This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:58:06 +11:00
Angus Gratton ad6ab5a78c lora-sync: Fix race with fast or failed send().
If send completes before the first call to poll_send(), the driver could
get stuck in _sync_wait(). This had much less impact before rp2 port went
tickless, as _sync_wait(will_irq=True) calls machine.idle() which may not
wake very frequently on a tickless port.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:57:50 +11:00
Angus Gratton b712103519 lora-sx126x: Fix invalid default configuration after reset.
According to the docs, only freq_khz was needed for working output.
However:

- Without output_power setting, no output from SX1262 antenna (theory:
  output routed to the SX1261 antenna).

- SF,BW,etc. settings were different from the SX127x power on defaults, so
  modems with an identical configuration were unable to communicate.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-20 16:44:57 +11:00
Angus Gratton 4cc67065dd tools/ci.sh: Add unix-ffi library when testing unix-ffi subdirectory.
Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-02-19 23:21:04 +11:00
ubi de feo 8058b2935b tarfile-write: Fix permissions when adding to archive.
Signed-off-by: ubi de feo <me@ubidefeo.com>
2024-02-09 10:43:05 +11:00
Carlosgg 56f514f569 aiohttp: Fix binary data treatment.
- Fix binary data `Content-type` header and data `Content-Length`
  calculation.

- Fix query length when data is included.

- Fix `json` and `text` methods of `ClientResponse` to read
  `Content-Length` size

Signed-off-by: Carlos Gil <carlosgilglez@gmail.com>
2024-02-08 19:02:26 +11:00
Adam Knowles ddb1a27957 hmac: Fix passing in a string for digestmod argument.
The built-in `hashlib` module does not have a `.new` method (although the
Python version in this repository does).
2024-02-07 12:45:03 +11:00
Felix Dörre 35d41dbb0e ssl: Restructure micropython SSL interface to a new tls module.
MicroPython now supplies SSL/TLS functionality in a new built-in `tls`
module.  The `ssl` module is now implemented purely in Python, in this
repository.  Other libraries are updated to work with this scheme.

Signed-off-by: Felix Dörre <felix@dogcraft.de>
2024-02-07 12:12:13 +11:00
Felix Dörre 803452a1ac umqtt.simple: Simplify check for user being unused.
There don't seem to be any MQTT implementations that expect an empty
username (instead of the field missing), so the check for unused `user` can
be simplified.

Signed-off-by: Felix Dörre <felix@dogcraft.de>
2024-02-07 12:12:09 +11:00
Matt Trentini dd2079dbb2 Basic UUID functionality - for each uuidX method - appears to be working 2022-07-06 17:23:56 +10:00
Matt Trentini 412aab200e Copy UUID implementation and tests from CPython 2022-07-03 16:22:22 +10:00
73 zmienionych plików z 1531 dodań i 166 usunięć

Wyświetl plik

@ -1,10 +1,11 @@
metadata(
version="0.1.0",
version="0.2.0",
description="Common networking packages for all network-capable deployments of MicroPython.",
)
require("mip")
require("ntptime")
require("ssl")
require("requests")
require("webrepl")

Wyświetl plik

@ -163,6 +163,9 @@ class _SX126x(BaseModem):
# 0x02 is 40us, default value appears undocumented but this is the SX1276 default
self._ramp_val = 0x02
# Configure the SX126x at least once after reset
self._configured = False
if reset:
# If the caller supplies a reset pin argument, reset the radio
reset.init(Pin.OUT, value=0)
@ -383,24 +386,24 @@ class _SX126x(BaseModem):
# see
# https://www.thethingsnetwork.org/forum/t/should-private-lorawan-networks-use-a-different-sync-word/34496/15
syncword = 0x0404 + ((syncword & 0x0F) << 4) + ((syncword & 0xF0) << 8)
self._cmd(">BBH", _CMD_WRITE_REGISTER, _REG_LSYNCRH, syncword)
self._cmd(">BHH", _CMD_WRITE_REGISTER, _REG_LSYNCRH, syncword)
if "output_power" in lora_cfg:
if not self._configured or any(
key in lora_cfg for key in ("output_power", "pa_ramp_us", "tx_ant")
):
pa_config_args, self._output_power = self._get_pa_tx_params(
lora_cfg["output_power"], lora_cfg.get("tx_ant", None)
lora_cfg.get("output_power", self._output_power), lora_cfg.get("tx_ant", None)
)
self._cmd("BBBBB", _CMD_SET_PA_CONFIG, *pa_config_args)
if "pa_ramp_us" in lora_cfg:
self._ramp_val = self._get_pa_ramp_val(
lora_cfg, [10, 20, 40, 80, 200, 800, 1700, 3400]
)
if "pa_ramp_us" in lora_cfg:
self._ramp_val = self._get_pa_ramp_val(
lora_cfg, [10, 20, 40, 80, 200, 800, 1700, 3400]
)
if "output_power" in lora_cfg or "pa_ramp_us" in lora_cfg:
# Only send the SetTxParams command if power level or PA ramp time have changed
self._cmd("BBB", _CMD_SET_TX_PARAMS, self._output_power, self._ramp_val)
if any(key in lora_cfg for key in ("sf", "bw", "coding_rate")):
if not self._configured or any(key in lora_cfg for key in ("sf", "bw", "coding_rate")):
if "sf" in lora_cfg:
self._sf = lora_cfg["sf"]
if self._sf < _CFG_SF_MIN or self._sf > _CFG_SF_MAX:
@ -441,6 +444,7 @@ class _SX126x(BaseModem):
self._reg_write(_REG_RX_GAIN, 0x96 if lora_cfg["rx_boost"] else 0x94)
self._check_error()
self._configured = True
def _invert_workaround(self, enable):
# Apply workaround for DS 15.4 Optimizing the Inverted IQ Operation
@ -465,7 +469,7 @@ class _SX126x(BaseModem):
# See DS 13.1.12 Calibrate Function
# calibParam 0xFE means to calibrate all blocks.
self._cmd("<BB", _CMD_CALIBRATE, 0xFE)
self._cmd("BB", _CMD_CALIBRATE, 0xFE)
time.sleep_us(_CALIBRATE_TYPICAL_TIME_US)
@ -541,7 +545,7 @@ class _SX126x(BaseModem):
else:
timeout = 0 # Single receive mode, no timeout
self._cmd(">BBH", _CMD_SET_RX, timeout >> 16, timeout)
self._cmd(">BBH", _CMD_SET_RX, timeout >> 16, timeout) # 24 bits
return self._dio1
@ -725,10 +729,10 @@ class _SX126x(BaseModem):
return res
def _reg_read(self, addr):
return self._cmd("BBBB", _CMD_READ_REGISTER, addr >> 8, addr & 0xFF, n_read=1)[0]
return self._cmd(">BHB", _CMD_READ_REGISTER, addr, 0, n_read=1)[0]
def _reg_write(self, addr, val):
return self._cmd("BBBB", _CMD_WRITE_REGISTER, addr >> 8, addr & 0xFF, val & 0xFF)
return self._cmd(">BHB", _CMD_WRITE_REGISTER, addr, val & 0xFF)
class _SX1262(_SX126x):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.1")
metadata(version="0.1.2")
require("lora")
package("lora")

Wyświetl plik

@ -519,6 +519,9 @@ class _SX127x(BaseModem):
self._reg_update(_REG_MODEM_CONFIG3, update_mask, modem_config3)
if "syncword" in lora_cfg:
self._reg_write(_REG_SYNC_WORD, lora_cfg["syncword"])
def _reg_write(self, reg, value):
self._cs(0)
if isinstance(value, int):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0")
metadata(version="0.1.1")
require("lora")
package("lora")

Wyświetl plik

@ -42,8 +42,8 @@ class SyncModem:
tx = True
while tx is True:
tx = self.poll_send()
self._sync_wait(will_irq)
tx = self.poll_send()
return tx
def recv(self, timeout_ms=None, rx_length=0xFF, rx_packet=None):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0")
metadata(version="0.1.1")
require("lora")
package("lora")

Wyświetl plik

@ -37,10 +37,11 @@ class BaseModem:
self._ant_sw = ant_sw
self._irq_callback = None
# Common configuration settings that need to be tracked by all modem drivers
# (Note that subclasses may set these to other values in their constructors, to match
# the power-on-reset configuration of a particular modem.)
# Common configuration settings that need to be tracked by all modem drivers.
#
# Where modem hardware sets different values after reset, the driver should
# set them back to these defaults (if not provided by the user), so that
# behaviour remains consistent between different modems using the same driver.
self._rf_freq_hz = 0 # Needs to be set via configure()
self._sf = 7 # Spreading factor
self._bw_hz = 125000 # Reset value

Wyświetl plik

@ -26,7 +26,7 @@ THE SOFTWARE.
from senml import *
import time
from cbor2 import decoder
import cbor2
pack = SenmlPack("device_name")
@ -38,5 +38,5 @@ while True:
cbor_val = pack.to_cbor()
print(cbor_val)
print(cbor_val.hex())
print(decoder.loads(cbor_val)) # convert to string again so we can print it.
print(cbor2.loads(cbor_val)) # convert to string again so we can print it.
time.sleep(1)

Wyświetl plik

@ -1,6 +1,6 @@
metadata(
description="SenML serialisation for MicroPython.",
version="0.1.0",
version="0.1.1",
pypi_publish="micropython-senml",
)

Wyświetl plik

@ -27,8 +27,7 @@ THE SOFTWARE.
from senml.senml_record import SenmlRecord
from senml.senml_base import SenmlBase
import json
from cbor2 import encoder
from cbor2 import decoder
import cbor2
class SenmlPackIterator:
@ -278,7 +277,7 @@ class SenmlPack(SenmlBase):
:param data: a byte array.
:return: None
"""
records = decoder.loads(data) # load the raw senml data
records = cbor2.loads(data) # load the raw senml data
naming_map = {
"bn": -2,
"bt": -3,
@ -320,7 +319,7 @@ class SenmlPack(SenmlBase):
}
converted = []
self._build_rec_dict(naming_map, converted)
return encoder.dumps(converted)
return cbor2.dumps(converted)
def add(self, item):
"""

Wyświetl plik

@ -1,4 +1,4 @@
metadata(description="Lightweight MQTT client for MicroPython.", version="1.3.4")
metadata(description="Lightweight MQTT client for MicroPython.", version="1.4.0")
# Originally written by Paul Sokolovsky.

Wyświetl plik

@ -16,8 +16,7 @@ class MQTTClient:
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
ssl=None,
):
if port == 0:
port = 8883 if ssl else 1883
@ -26,7 +25,6 @@ class MQTTClient:
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
@ -67,15 +65,13 @@ class MQTTClient:
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
self.sock = self.ssl.wrap_socket(self.sock, server_hostname=self.server)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
if self.user:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
@ -101,7 +97,7 @@ class MQTTClient:
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
if self.user:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="0.6.0")
metadata(version="0.7.0")
# Originally written by Paul Sokolovsky.

Wyświetl plik

@ -12,7 +12,7 @@ def urlopen(url, data=None, method="GET"):
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
import tls
port = 443
else:
@ -29,7 +29,9 @@ def urlopen(url, data=None, method="GET"):
try:
s.connect(ai[-1])
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.verify_mode = tls.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
s.write(method)
s.write(b" /")

Wyświetl plik

@ -38,10 +38,10 @@ class ClientResponse:
return self._decode(await self.content.read(sz))
async def text(self, encoding="utf-8"):
return (await self.read(sz=-1)).decode(encoding)
return (await self.read(int(self.headers.get("Content-Length", -1)))).decode(encoding)
async def json(self):
return _json.loads(await self.read())
return _json.loads(await self.read(int(self.headers.get("Content-Length", -1))))
def __repr__(self):
return "<ClientResponse %d %s>" % (self.status, self.headers)
@ -121,7 +121,7 @@ class ClientSession:
if b"chunked" in line:
chunked = True
elif line.startswith(b"Location:"):
url = line.rstrip().split(None, 1)[1].decode("latin-1")
url = line.rstrip().split(None, 1)[1].decode()
if 301 <= status <= 303:
redir_cnt += 1
@ -195,17 +195,22 @@ class ClientSession:
if "Host" not in headers:
headers.update(Host=host)
if not data:
query = "%s /%s %s\r\n%s\r\n" % (
query = b"%s /%s %s\r\n%s\r\n" % (
method,
path,
version,
"\r\n".join(f"{k}: {v}" for k, v in headers.items()) + "\r\n" if headers else "",
)
else:
headers.update(**{"Content-Length": len(str(data))})
if json:
headers.update(**{"Content-Type": "application/json"})
query = """%s /%s %s\r\n%s\r\n%s\r\n\r\n""" % (
if isinstance(data, bytes):
headers.update(**{"Content-Type": "application/octet-stream"})
else:
data = data.encode()
headers.update(**{"Content-Length": len(data)})
query = b"""%s /%s %s\r\n%s\r\n%s""" % (
method,
path,
version,
@ -213,10 +218,10 @@ class ClientSession:
data,
)
if not is_handshake:
await writer.awrite(query.encode("latin-1"))
await writer.awrite(query)
return reader
else:
await writer.awrite(query.encode())
await writer.awrite(query)
return reader, writer
def request(self, method, url, data=None, json=None, ssl=None, params=None, headers={}):

Wyświetl plik

@ -1,6 +1,6 @@
metadata(
description="HTTP client module for MicroPython asyncio module",
version="0.0.1",
version="0.0.2",
pypi="aiohttp",
)

Wyświetl plik

@ -24,5 +24,10 @@ THE SOFTWARE.
"""
from . import decoder
from . import encoder
from ._decoder import CBORDecoder
from ._decoder import load
from ._decoder import loads
from ._encoder import CBOREncoder
from ._encoder import dump
from ._encoder import dumps

Wyświetl plik

@ -24,16 +24,15 @@ THE SOFTWARE.
"""
from cbor2 import encoder
from cbor2 import decoder
import cbor2
input = [
{"bn": "urn:dev:ow:10e2073a01080063", "u": "Cel", "t": 1.276020076e09, "v": 23.5},
{"u": "Cel", "t": 1.276020091e09, "v": 23.6},
]
data = encoder.dumps(input)
data = cbor2.dumps(input)
print(data)
print(data.hex())
text = decoder.loads(data)
text = cbor2.loads(data)
print(text)

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0", pypi="cbor2")
metadata(version="1.0.0", pypi="cbor2")
package("cbor2")

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.8.1", pypi="requests")
metadata(version="0.9.0", pypi="requests")
package("requests")

Wyświetl plik

@ -63,7 +63,7 @@ def request(
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
import tls
port = 443
else:
@ -90,7 +90,9 @@ def request(
try:
s.connect(ai[-1])
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.verify_mode = tls.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if "Host" not in headers:
s.write(b"Host: %s\r\n" % host)

Wyświetl plik

@ -3,15 +3,15 @@
_WBITS = const(15)
import io, deflate
import builtins, io, deflate
def GzipFile(fileobj):
return deflate.DeflateIO(fileobj, deflate.GZIP, _WBITS)
def open(filename, mode):
return deflate.DeflateIO(open(filename, mode), deflate.GZIP, _WBITS, True)
def open(filename, mode="rb"):
return deflate.DeflateIO(builtins.open(filename, mode), deflate.GZIP, _WBITS, True)
if hasattr(deflate.DeflateIO, "write"):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="1.0.0")
metadata(version="1.0.1")
module("gzip.py")

Wyświetl plik

@ -17,7 +17,7 @@ class HMAC:
make_hash = digestmod # A
elif isinstance(digestmod, str):
# A hash name suitable for hashlib.new().
make_hash = lambda d=b"": hashlib.new(digestmod, d) # B
make_hash = lambda d=b"": getattr(hashlib, digestmod)(d)
else:
# A module supporting PEP 247.
make_hash = digestmod.new # C

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="3.4.3")
metadata(version="3.4.4")
module("hmac.py")

Wyświetl plik

@ -8,7 +8,7 @@ import hashlib
msg = b"zlutoucky kun upel dabelske ody"
dig = hmac.new(b"1234567890", msg=msg, digestmod=hashlib.sha256).hexdigest()
dig = hmac.new(b"1234567890", msg=msg, digestmod="sha256").hexdigest()
print("c735e751e36b08fb01e25794bdb15e7289b82aecdb652c8f4f72f307b39dad39")
print(dig)

Wyświetl plik

@ -1,3 +0,0 @@
metadata(version="0.1.0")
package("json")

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0")
metadata(version="0.2.1")
module("ssl.py")
module("ssl.py", opt=3)

Wyświetl plik

@ -1,36 +1,65 @@
from ussl import *
import ussl as _ussl
import tls
from tls import *
# Constants
for sym in "CERT_NONE", "CERT_OPTIONAL", "CERT_REQUIRED":
if sym not in globals():
globals()[sym] = object()
class SSLContext:
def __init__(self, *args):
self._context = tls.SSLContext(*args)
self._context.verify_mode = CERT_NONE
@property
def verify_mode(self):
return self._context.verify_mode
@verify_mode.setter
def verify_mode(self, val):
self._context.verify_mode = val
def load_cert_chain(self, certfile, keyfile):
if isinstance(certfile, str):
with open(certfile, "rb") as f:
certfile = f.read()
if isinstance(keyfile, str):
with open(keyfile, "rb") as f:
keyfile = f.read()
self._context.load_cert_chain(certfile, keyfile)
def load_verify_locations(self, cafile=None, cadata=None):
if cafile:
with open(cafile, "rb") as f:
cadata = f.read()
self._context.load_verify_locations(cadata)
def wrap_socket(
self, sock, server_side=False, do_handshake_on_connect=True, server_hostname=None
):
return self._context.wrap_socket(
sock,
server_side=server_side,
do_handshake_on_connect=do_handshake_on_connect,
server_hostname=server_hostname,
)
def wrap_socket(
sock,
keyfile=None,
certfile=None,
server_side=False,
key=None,
cert=None,
cert_reqs=CERT_NONE,
*,
ca_certs=None,
server_hostname=None
cadata=None,
server_hostname=None,
do_handshake=True,
):
# TODO: More arguments accepted by CPython could also be handled here.
# That would allow us to accept ca_certs as a positional argument, which
# we should.
kw = {}
if keyfile is not None:
kw["keyfile"] = keyfile
if certfile is not None:
kw["certfile"] = certfile
if server_side is not False:
kw["server_side"] = server_side
if cert_reqs is not CERT_NONE:
kw["cert_reqs"] = cert_reqs
if ca_certs is not None:
kw["ca_certs"] = ca_certs
if server_hostname is not None:
kw["server_hostname"] = server_hostname
return _ussl.wrap_socket(sock, **kw)
con = SSLContext(PROTOCOL_TLS_SERVER if server_side else PROTOCOL_TLS_CLIENT)
if cert or key:
con.load_cert_chain(cert, key)
if cadata:
con.load_verify_locations(cadata=cadata)
con.verify_mode = cert_reqs
return con.wrap_socket(
sock,
server_side=server_side,
do_handshake_on_connect=do_handshake,
server_hostname=server_hostname,
)

Wyświetl plik

@ -1,4 +1,4 @@
metadata(description="Adds write (create/append) support to tarfile.", version="0.1.1")
metadata(description="Adds write (create/append) support to tarfile.", version="0.1.2")
require("tarfile")
package("tarfile")

Wyświetl plik

@ -67,7 +67,7 @@ def addfile(self, tarinfo, fileobj=None):
name += "/"
hdr = uctypes.struct(uctypes.addressof(buf), _TAR_HEADER, uctypes.LITTLE_ENDIAN)
hdr.name[:] = name.encode("utf-8")[:100]
hdr.mode[:] = b"%07o\0" % (tarinfo.mode & 0o7777)
hdr.mode[:] = b"%07o\0" % ((0o755 if tarinfo.isdir() else 0o644) & 0o7777)
hdr.uid[:] = b"%07o\0" % tarinfo.uid
hdr.gid[:] = b"%07o\0" % tarinfo.gid
hdr.size[:] = b"%011o\0" % size
@ -96,9 +96,10 @@ def addfile(self, tarinfo, fileobj=None):
def add(self, name, recursive=True):
from . import TarInfo
tarinfo = TarInfo(name)
try:
stat = os.stat(name)
res_name = (name + '/') if (stat[0] & 0xf000) == 0x4000 else name
tarinfo = TarInfo(res_name)
tarinfo.mode = stat[0]
tarinfo.uid = stat[4]
tarinfo.gid = stat[5]

Wyświetl plik

@ -0,0 +1,8 @@
# UUID
## Copied from CPython
UUID implementation and tests copied from CPython at the following versions:
[uuid.py](https://github.com/python/cpython/blob/9bf7c2d638a582af2444bc864feba13ab8957b68/Lib/uuid.py): 9bf7c2d638a582af2444bc864feba13ab8957b68
[test_uuid.py](https://github.com/python/cpython/blob/91e33ac3d08a1c6004c469da2c0e2a97b5bdc53c/Lib/test/test_uuid.py): 91e33ac3d08a1c6004c469da2c0e2a97b5bdc53c

Wyświetl plik

@ -0,0 +1,899 @@
import unittest
from test import support
from test.support import import_helper
import builtins
import contextlib
import copy
import enum
import io
import os
import pickle
import sys
import weakref
from unittest import mock
py_uuid = import_helper.import_fresh_module('uuid', blocked=['_uuid'])
c_uuid = import_helper.import_fresh_module('uuid', fresh=['_uuid'])
def importable(name):
try:
__import__(name)
return True
except:
return False
def mock_get_command_stdout(data):
def get_command_stdout(command, args):
return io.BytesIO(data.encode())
return get_command_stdout
class BaseTestUUID:
uuid = None
def test_safe_uuid_enum(self):
class CheckedSafeUUID(enum.Enum):
safe = 0
unsafe = -1
unknown = None
enum._test_simple_enum(CheckedSafeUUID, py_uuid.SafeUUID)
def test_UUID(self):
equal = self.assertEqual
ascending = []
for (string, curly, hex, bytes, bytes_le, fields, integer, urn,
time, clock_seq, variant, version) in [
('00000000-0000-0000-0000-000000000000',
'{00000000-0000-0000-0000-000000000000}',
'00000000000000000000000000000000',
b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0',
b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0',
(0, 0, 0, 0, 0, 0),
0,
'urn:uuid:00000000-0000-0000-0000-000000000000',
0, 0, self.uuid.RESERVED_NCS, None),
('00010203-0405-0607-0809-0a0b0c0d0e0f',
'{00010203-0405-0607-0809-0a0b0c0d0e0f}',
'000102030405060708090a0b0c0d0e0f',
b'\0\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\x0d\x0e\x0f',
b'\x03\x02\x01\0\x05\x04\x07\x06\x08\t\n\x0b\x0c\x0d\x0e\x0f',
(0x00010203, 0x0405, 0x0607, 8, 9, 0x0a0b0c0d0e0f),
0x000102030405060708090a0b0c0d0e0f,
'urn:uuid:00010203-0405-0607-0809-0a0b0c0d0e0f',
0x607040500010203, 0x809, self.uuid.RESERVED_NCS, None),
('02d9e6d5-9467-382e-8f9b-9300a64ac3cd',
'{02d9e6d5-9467-382e-8f9b-9300a64ac3cd}',
'02d9e6d59467382e8f9b9300a64ac3cd',
b'\x02\xd9\xe6\xd5\x94\x67\x38\x2e\x8f\x9b\x93\x00\xa6\x4a\xc3\xcd',
b'\xd5\xe6\xd9\x02\x67\x94\x2e\x38\x8f\x9b\x93\x00\xa6\x4a\xc3\xcd',
(0x02d9e6d5, 0x9467, 0x382e, 0x8f, 0x9b, 0x9300a64ac3cd),
0x02d9e6d59467382e8f9b9300a64ac3cd,
'urn:uuid:02d9e6d5-9467-382e-8f9b-9300a64ac3cd',
0x82e946702d9e6d5, 0xf9b, self.uuid.RFC_4122, 3),
('12345678-1234-5678-1234-567812345678',
'{12345678-1234-5678-1234-567812345678}',
'12345678123456781234567812345678',
b'\x12\x34\x56\x78'*4,
b'\x78\x56\x34\x12\x34\x12\x78\x56\x12\x34\x56\x78\x12\x34\x56\x78',
(0x12345678, 0x1234, 0x5678, 0x12, 0x34, 0x567812345678),
0x12345678123456781234567812345678,
'urn:uuid:12345678-1234-5678-1234-567812345678',
0x678123412345678, 0x1234, self.uuid.RESERVED_NCS, None),
('6ba7b810-9dad-11d1-80b4-00c04fd430c8',
'{6ba7b810-9dad-11d1-80b4-00c04fd430c8}',
'6ba7b8109dad11d180b400c04fd430c8',
b'\x6b\xa7\xb8\x10\x9d\xad\x11\xd1\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
b'\x10\xb8\xa7\x6b\xad\x9d\xd1\x11\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
(0x6ba7b810, 0x9dad, 0x11d1, 0x80, 0xb4, 0x00c04fd430c8),
0x6ba7b8109dad11d180b400c04fd430c8,
'urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8',
0x1d19dad6ba7b810, 0xb4, self.uuid.RFC_4122, 1),
('6ba7b811-9dad-11d1-80b4-00c04fd430c8',
'{6ba7b811-9dad-11d1-80b4-00c04fd430c8}',
'6ba7b8119dad11d180b400c04fd430c8',
b'\x6b\xa7\xb8\x11\x9d\xad\x11\xd1\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
b'\x11\xb8\xa7\x6b\xad\x9d\xd1\x11\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
(0x6ba7b811, 0x9dad, 0x11d1, 0x80, 0xb4, 0x00c04fd430c8),
0x6ba7b8119dad11d180b400c04fd430c8,
'urn:uuid:6ba7b811-9dad-11d1-80b4-00c04fd430c8',
0x1d19dad6ba7b811, 0xb4, self.uuid.RFC_4122, 1),
('6ba7b812-9dad-11d1-80b4-00c04fd430c8',
'{6ba7b812-9dad-11d1-80b4-00c04fd430c8}',
'6ba7b8129dad11d180b400c04fd430c8',
b'\x6b\xa7\xb8\x12\x9d\xad\x11\xd1\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
b'\x12\xb8\xa7\x6b\xad\x9d\xd1\x11\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
(0x6ba7b812, 0x9dad, 0x11d1, 0x80, 0xb4, 0x00c04fd430c8),
0x6ba7b8129dad11d180b400c04fd430c8,
'urn:uuid:6ba7b812-9dad-11d1-80b4-00c04fd430c8',
0x1d19dad6ba7b812, 0xb4, self.uuid.RFC_4122, 1),
('6ba7b814-9dad-11d1-80b4-00c04fd430c8',
'{6ba7b814-9dad-11d1-80b4-00c04fd430c8}',
'6ba7b8149dad11d180b400c04fd430c8',
b'\x6b\xa7\xb8\x14\x9d\xad\x11\xd1\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
b'\x14\xb8\xa7\x6b\xad\x9d\xd1\x11\x80\xb4\x00\xc0\x4f\xd4\x30\xc8',
(0x6ba7b814, 0x9dad, 0x11d1, 0x80, 0xb4, 0x00c04fd430c8),
0x6ba7b8149dad11d180b400c04fd430c8,
'urn:uuid:6ba7b814-9dad-11d1-80b4-00c04fd430c8',
0x1d19dad6ba7b814, 0xb4, self.uuid.RFC_4122, 1),
('7d444840-9dc0-11d1-b245-5ffdce74fad2',
'{7d444840-9dc0-11d1-b245-5ffdce74fad2}',
'7d4448409dc011d1b2455ffdce74fad2',
b'\x7d\x44\x48\x40\x9d\xc0\x11\xd1\xb2\x45\x5f\xfd\xce\x74\xfa\xd2',
b'\x40\x48\x44\x7d\xc0\x9d\xd1\x11\xb2\x45\x5f\xfd\xce\x74\xfa\xd2',
(0x7d444840, 0x9dc0, 0x11d1, 0xb2, 0x45, 0x5ffdce74fad2),
0x7d4448409dc011d1b2455ffdce74fad2,
'urn:uuid:7d444840-9dc0-11d1-b245-5ffdce74fad2',
0x1d19dc07d444840, 0x3245, self.uuid.RFC_4122, 1),
('e902893a-9d22-3c7e-a7b8-d6e313b71d9f',
'{e902893a-9d22-3c7e-a7b8-d6e313b71d9f}',
'e902893a9d223c7ea7b8d6e313b71d9f',
b'\xe9\x02\x89\x3a\x9d\x22\x3c\x7e\xa7\xb8\xd6\xe3\x13\xb7\x1d\x9f',
b'\x3a\x89\x02\xe9\x22\x9d\x7e\x3c\xa7\xb8\xd6\xe3\x13\xb7\x1d\x9f',
(0xe902893a, 0x9d22, 0x3c7e, 0xa7, 0xb8, 0xd6e313b71d9f),
0xe902893a9d223c7ea7b8d6e313b71d9f,
'urn:uuid:e902893a-9d22-3c7e-a7b8-d6e313b71d9f',
0xc7e9d22e902893a, 0x27b8, self.uuid.RFC_4122, 3),
('eb424026-6f54-4ef8-a4d0-bb658a1fc6cf',
'{eb424026-6f54-4ef8-a4d0-bb658a1fc6cf}',
'eb4240266f544ef8a4d0bb658a1fc6cf',
b'\xeb\x42\x40\x26\x6f\x54\x4e\xf8\xa4\xd0\xbb\x65\x8a\x1f\xc6\xcf',
b'\x26\x40\x42\xeb\x54\x6f\xf8\x4e\xa4\xd0\xbb\x65\x8a\x1f\xc6\xcf',
(0xeb424026, 0x6f54, 0x4ef8, 0xa4, 0xd0, 0xbb658a1fc6cf),
0xeb4240266f544ef8a4d0bb658a1fc6cf,
'urn:uuid:eb424026-6f54-4ef8-a4d0-bb658a1fc6cf',
0xef86f54eb424026, 0x24d0, self.uuid.RFC_4122, 4),
('f81d4fae-7dec-11d0-a765-00a0c91e6bf6',
'{f81d4fae-7dec-11d0-a765-00a0c91e6bf6}',
'f81d4fae7dec11d0a76500a0c91e6bf6',
b'\xf8\x1d\x4f\xae\x7d\xec\x11\xd0\xa7\x65\x00\xa0\xc9\x1e\x6b\xf6',
b'\xae\x4f\x1d\xf8\xec\x7d\xd0\x11\xa7\x65\x00\xa0\xc9\x1e\x6b\xf6',
(0xf81d4fae, 0x7dec, 0x11d0, 0xa7, 0x65, 0x00a0c91e6bf6),
0xf81d4fae7dec11d0a76500a0c91e6bf6,
'urn:uuid:f81d4fae-7dec-11d0-a765-00a0c91e6bf6',
0x1d07decf81d4fae, 0x2765, self.uuid.RFC_4122, 1),
('fffefdfc-fffe-fffe-fffe-fffefdfcfbfa',
'{fffefdfc-fffe-fffe-fffe-fffefdfcfbfa}',
'fffefdfcfffefffefffefffefdfcfbfa',
b'\xff\xfe\xfd\xfc\xff\xfe\xff\xfe\xff\xfe\xff\xfe\xfd\xfc\xfb\xfa',
b'\xfc\xfd\xfe\xff\xfe\xff\xfe\xff\xff\xfe\xff\xfe\xfd\xfc\xfb\xfa',
(0xfffefdfc, 0xfffe, 0xfffe, 0xff, 0xfe, 0xfffefdfcfbfa),
0xfffefdfcfffefffefffefffefdfcfbfa,
'urn:uuid:fffefdfc-fffe-fffe-fffe-fffefdfcfbfa',
0xffefffefffefdfc, 0x3ffe, self.uuid.RESERVED_FUTURE, None),
('ffffffff-ffff-ffff-ffff-ffffffffffff',
'{ffffffff-ffff-ffff-ffff-ffffffffffff}',
'ffffffffffffffffffffffffffffffff',
b'\xff'*16,
b'\xff'*16,
(0xffffffff, 0xffff, 0xffff, 0xff, 0xff, 0xffffffffffff),
0xffffffffffffffffffffffffffffffff,
'urn:uuid:ffffffff-ffff-ffff-ffff-ffffffffffff',
0xfffffffffffffff, 0x3fff, self.uuid.RESERVED_FUTURE, None),
]:
equivalents = []
# Construct each UUID in several different ways.
for u in [self.uuid.UUID(string), self.uuid.UUID(curly), self.uuid.UUID(hex),
self.uuid.UUID(bytes=bytes), self.uuid.UUID(bytes_le=bytes_le),
self.uuid.UUID(fields=fields), self.uuid.UUID(int=integer),
self.uuid.UUID(urn)]:
# Test all conversions and properties of the UUID object.
equal(str(u), string)
equal(int(u), integer)
equal(u.bytes, bytes)
equal(u.bytes_le, bytes_le)
equal(u.fields, fields)
equal(u.time_low, fields[0])
equal(u.time_mid, fields[1])
equal(u.time_hi_version, fields[2])
equal(u.clock_seq_hi_variant, fields[3])
equal(u.clock_seq_low, fields[4])
equal(u.node, fields[5])
equal(u.hex, hex)
equal(u.int, integer)
equal(u.urn, urn)
equal(u.time, time)
equal(u.clock_seq, clock_seq)
equal(u.variant, variant)
equal(u.version, version)
equivalents.append(u)
# Different construction methods should give the same UUID.
for u in equivalents:
for v in equivalents:
equal(u, v)
# Bug 7380: "bytes" and "bytes_le" should give the same type.
equal(type(u.bytes), builtins.bytes)
equal(type(u.bytes_le), builtins.bytes)
ascending.append(u)
# Test comparison of UUIDs.
for i in range(len(ascending)):
for j in range(len(ascending)):
equal(i < j, ascending[i] < ascending[j])
equal(i <= j, ascending[i] <= ascending[j])
equal(i == j, ascending[i] == ascending[j])
equal(i > j, ascending[i] > ascending[j])
equal(i >= j, ascending[i] >= ascending[j])
equal(i != j, ascending[i] != ascending[j])
# Test sorting of UUIDs (above list is in ascending order).
resorted = ascending[:]
resorted.reverse()
resorted.sort()
equal(ascending, resorted)
def test_exceptions(self):
badvalue = lambda f: self.assertRaises(ValueError, f)
badtype = lambda f: self.assertRaises(TypeError, f)
# Badly formed hex strings.
badvalue(lambda: self.uuid.UUID(''))
badvalue(lambda: self.uuid.UUID('abc'))
badvalue(lambda: self.uuid.UUID('1234567812345678123456781234567'))
badvalue(lambda: self.uuid.UUID('123456781234567812345678123456789'))
badvalue(lambda: self.uuid.UUID('123456781234567812345678z2345678'))
# Badly formed bytes.
badvalue(lambda: self.uuid.UUID(bytes='abc'))
badvalue(lambda: self.uuid.UUID(bytes='\0'*15))
badvalue(lambda: self.uuid.UUID(bytes='\0'*17))
# Badly formed bytes_le.
badvalue(lambda: self.uuid.UUID(bytes_le='abc'))
badvalue(lambda: self.uuid.UUID(bytes_le='\0'*15))
badvalue(lambda: self.uuid.UUID(bytes_le='\0'*17))
# Badly formed fields.
badvalue(lambda: self.uuid.UUID(fields=(1,)))
badvalue(lambda: self.uuid.UUID(fields=(1, 2, 3, 4, 5)))
badvalue(lambda: self.uuid.UUID(fields=(1, 2, 3, 4, 5, 6, 7)))
# Field values out of range.
badvalue(lambda: self.uuid.UUID(fields=(-1, 0, 0, 0, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0x100000000, 0, 0, 0, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, -1, 0, 0, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0x10000, 0, 0, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, -1, 0, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, 0x10000, 0, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, 0, -1, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, 0, 0x100, 0, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, 0, 0, -1, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, 0, 0, 0x100, 0)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, 0, 0, 0, -1)))
badvalue(lambda: self.uuid.UUID(fields=(0, 0, 0, 0, 0, 0x1000000000000)))
# Version number out of range.
badvalue(lambda: self.uuid.UUID('00'*16, version=0))
badvalue(lambda: self.uuid.UUID('00'*16, version=6))
# Integer value out of range.
badvalue(lambda: self.uuid.UUID(int=-1))
badvalue(lambda: self.uuid.UUID(int=1<<128))
# Must supply exactly one of hex, bytes, fields, int.
h, b, f, i = '00'*16, b'\0'*16, (0, 0, 0, 0, 0, 0), 0
self.uuid.UUID(h)
self.uuid.UUID(hex=h)
self.uuid.UUID(bytes=b)
self.uuid.UUID(bytes_le=b)
self.uuid.UUID(fields=f)
self.uuid.UUID(int=i)
# Wrong number of arguments (positional).
badtype(lambda: self.uuid.UUID())
badtype(lambda: self.uuid.UUID(h, b))
badtype(lambda: self.uuid.UUID(h, b, b))
badtype(lambda: self.uuid.UUID(h, b, b, f))
badtype(lambda: self.uuid.UUID(h, b, b, f, i))
# Duplicate arguments.
for hh in [[], [('hex', h)]]:
for bb in [[], [('bytes', b)]]:
for bble in [[], [('bytes_le', b)]]:
for ii in [[], [('int', i)]]:
for ff in [[], [('fields', f)]]:
args = dict(hh + bb + bble + ii + ff)
if len(args) != 0:
badtype(lambda: self.uuid.UUID(h, **args))
if len(args) != 1:
badtype(lambda: self.uuid.UUID(**args))
# Immutability.
u = self.uuid.UUID(h)
badtype(lambda: setattr(u, 'hex', h))
badtype(lambda: setattr(u, 'bytes', b))
badtype(lambda: setattr(u, 'bytes_le', b))
badtype(lambda: setattr(u, 'fields', f))
badtype(lambda: setattr(u, 'int', i))
badtype(lambda: setattr(u, 'time_low', 0))
badtype(lambda: setattr(u, 'time_mid', 0))
badtype(lambda: setattr(u, 'time_hi_version', 0))
badtype(lambda: setattr(u, 'time_hi_version', 0))
badtype(lambda: setattr(u, 'clock_seq_hi_variant', 0))
badtype(lambda: setattr(u, 'clock_seq_low', 0))
badtype(lambda: setattr(u, 'node', 0))
# Comparison with a non-UUID object
badtype(lambda: u < object())
badtype(lambda: u > object())
def test_getnode(self):
node1 = self.uuid.getnode()
self.assertTrue(0 < node1 < (1 << 48), '%012x' % node1)
# Test it again to ensure consistency.
node2 = self.uuid.getnode()
self.assertEqual(node1, node2, '%012x != %012x' % (node1, node2))
def test_pickle_roundtrip(self):
def check(actual, expected):
self.assertEqual(actual, expected)
self.assertEqual(actual.is_safe, expected.is_safe)
with support.swap_item(sys.modules, 'uuid', self.uuid):
for is_safe in self.uuid.SafeUUID:
u = self.uuid.UUID('d82579ce6642a0de7ddf490a7aec7aa5',
is_safe=is_safe)
check(copy.copy(u), u)
check(copy.deepcopy(u), u)
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
with self.subTest(protocol=proto):
check(pickle.loads(pickle.dumps(u, proto)), u)
def test_unpickle_previous_python_versions(self):
def check(actual, expected):
self.assertEqual(actual, expected)
self.assertEqual(actual.is_safe, expected.is_safe)
pickled_uuids = [
# Python 2.7, protocol 0
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR(dS\'int\'\nL287307832597519156748809049798316161701L\nsb.',
# Python 2.7, protocol 1
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR}U\x03intL287307832597519156748809049798316161701L\nsb.',
# Python 2.7, protocol 2
b'\x80\x02cuuid\nUUID\n)\x81}U\x03int\x8a\x11\xa5z\xecz\nI\xdf}'
b'\xde\xa0Bf\xcey%\xd8\x00sb.',
# Python 3.6, protocol 0
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR(dVint\nL287307832597519156748809049798316161701L\nsb.',
# Python 3.6, protocol 1
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR}X\x03\x00\x00\x00intL287307832597519156748809049798316161701L'
b'\nsb.',
# Python 3.6, protocol 2
b'\x80\x02cuuid\nUUID\n)\x81}X\x03\x00\x00\x00int\x8a\x11\xa5z\xec'
b'z\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00sb.',
# Python 3.6, protocol 3
b'\x80\x03cuuid\nUUID\n)\x81}X\x03\x00\x00\x00int\x8a\x11\xa5z\xec'
b'z\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00sb.',
# Python 3.6, protocol 4
b'\x80\x04\x95+\x00\x00\x00\x00\x00\x00\x00\x8c\x04uuid\x8c\x04UUI'
b'D\x93)\x81}\x8c\x03int\x8a\x11\xa5z\xecz\nI\xdf}\xde\xa0Bf\xcey%'
b'\xd8\x00sb.',
# Python 3.7, protocol 0
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR(dVint\nL287307832597519156748809049798316161701L\nsVis_safe\n'
b'cuuid\nSafeUUID\n(NtRsb.',
# Python 3.7, protocol 1
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR}(X\x03\x00\x00\x00intL287307832597519156748809049798316161701'
b'L\nX\x07\x00\x00\x00is_safecuuid\nSafeUUID\n(NtRub.',
# Python 3.7, protocol 2
b'\x80\x02cuuid\nUUID\n)\x81}(X\x03\x00\x00\x00int\x8a\x11\xa5z'
b'\xecz\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00X\x07\x00\x00\x00is_safecuu'
b'id\nSafeUUID\nN\x85Rub.',
# Python 3.7, protocol 3
b'\x80\x03cuuid\nUUID\n)\x81}(X\x03\x00\x00\x00int\x8a\x11\xa5z'
b'\xecz\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00X\x07\x00\x00\x00is_safecuu'
b'id\nSafeUUID\nN\x85Rub.',
# Python 3.7, protocol 4
b'\x80\x04\x95F\x00\x00\x00\x00\x00\x00\x00\x8c\x04uuid\x94\x8c'
b'\x04UUID\x93)\x81}(\x8c\x03int\x8a\x11\xa5z\xecz\nI\xdf}\xde\xa0'
b'Bf\xcey%\xd8\x00\x8c\x07is_safeh\x00\x8c\x08SafeUUID\x93N\x85Rub'
b'.',
]
pickled_uuids_safe = [
# Python 3.7, protocol 0
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR(dVint\nL287307832597519156748809049798316161701L\nsVis_safe\n'
b'cuuid\nSafeUUID\n(I0\ntRsb.',
# Python 3.7, protocol 1
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR}(X\x03\x00\x00\x00intL287307832597519156748809049798316161701'
b'L\nX\x07\x00\x00\x00is_safecuuid\nSafeUUID\n(K\x00tRub.',
# Python 3.7, protocol 2
b'\x80\x02cuuid\nUUID\n)\x81}(X\x03\x00\x00\x00int\x8a\x11\xa5z'
b'\xecz\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00X\x07\x00\x00\x00is_safecuu'
b'id\nSafeUUID\nK\x00\x85Rub.',
# Python 3.7, protocol 3
b'\x80\x03cuuid\nUUID\n)\x81}(X\x03\x00\x00\x00int\x8a\x11\xa5z'
b'\xecz\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00X\x07\x00\x00\x00is_safecuu'
b'id\nSafeUUID\nK\x00\x85Rub.',
# Python 3.7, protocol 4
b'\x80\x04\x95G\x00\x00\x00\x00\x00\x00\x00\x8c\x04uuid\x94\x8c'
b'\x04UUID\x93)\x81}(\x8c\x03int\x8a\x11\xa5z\xecz\nI\xdf}\xde\xa0'
b'Bf\xcey%\xd8\x00\x8c\x07is_safeh\x00\x8c\x08SafeUUID\x93K\x00'
b'\x85Rub.',
]
pickled_uuids_unsafe = [
# Python 3.7, protocol 0
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR(dVint\nL287307832597519156748809049798316161701L\nsVis_safe\n'
b'cuuid\nSafeUUID\n(I-1\ntRsb.',
# Python 3.7, protocol 1
b'ccopy_reg\n_reconstructor\n(cuuid\nUUID\nc__builtin__\nobject\nN'
b'tR}(X\x03\x00\x00\x00intL287307832597519156748809049798316161701'
b'L\nX\x07\x00\x00\x00is_safecuuid\nSafeUUID\n(J\xff\xff\xff\xfftR'
b'ub.',
# Python 3.7, protocol 2
b'\x80\x02cuuid\nUUID\n)\x81}(X\x03\x00\x00\x00int\x8a\x11\xa5z'
b'\xecz\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00X\x07\x00\x00\x00is_safecuu'
b'id\nSafeUUID\nJ\xff\xff\xff\xff\x85Rub.',
# Python 3.7, protocol 3
b'\x80\x03cuuid\nUUID\n)\x81}(X\x03\x00\x00\x00int\x8a\x11\xa5z'
b'\xecz\nI\xdf}\xde\xa0Bf\xcey%\xd8\x00X\x07\x00\x00\x00is_safecuu'
b'id\nSafeUUID\nJ\xff\xff\xff\xff\x85Rub.',
# Python 3.7, protocol 4
b'\x80\x04\x95J\x00\x00\x00\x00\x00\x00\x00\x8c\x04uuid\x94\x8c'
b'\x04UUID\x93)\x81}(\x8c\x03int\x8a\x11\xa5z\xecz\nI\xdf}\xde\xa0'
b'Bf\xcey%\xd8\x00\x8c\x07is_safeh\x00\x8c\x08SafeUUID\x93J\xff'
b'\xff\xff\xff\x85Rub.',
]
u = self.uuid.UUID('d82579ce6642a0de7ddf490a7aec7aa5')
u_safe = self.uuid.UUID('d82579ce6642a0de7ddf490a7aec7aa5',
is_safe=self.uuid.SafeUUID.safe)
u_unsafe = self.uuid.UUID('d82579ce6642a0de7ddf490a7aec7aa5',
is_safe=self.uuid.SafeUUID.unsafe)
with support.swap_item(sys.modules, 'uuid', self.uuid):
for pickled in pickled_uuids:
# is_safe was added in 3.7. When unpickling values from older
# versions, is_safe will be missing, so it should be set to
# SafeUUID.unknown.
check(pickle.loads(pickled), u)
for pickled in pickled_uuids_safe:
check(pickle.loads(pickled), u_safe)
for pickled in pickled_uuids_unsafe:
check(pickle.loads(pickled), u_unsafe)
# bpo-32502: UUID1 requires a 48-bit identifier, but hardware identifiers
# need not necessarily be 48 bits (e.g., EUI-64).
def test_uuid1_eui64(self):
# Confirm that uuid.getnode ignores hardware addresses larger than 48
# bits. Mock out each platform's *_getnode helper functions to return
# something just larger than 48 bits to test. This will cause
# uuid.getnode to fall back on uuid._random_getnode, which will
# generate a valid value.
too_large_getter = lambda: 1 << 48
with mock.patch.multiple(
self.uuid,
_node=None, # Ignore any cached node value.
_GETTERS=[too_large_getter],
):
node = self.uuid.getnode()
self.assertTrue(0 < node < (1 << 48), '%012x' % node)
# Confirm that uuid1 can use the generated node, i.e., the that
# uuid.getnode fell back on uuid._random_getnode() rather than using
# the value from too_large_getter above.
try:
self.uuid.uuid1(node=node)
except ValueError:
self.fail('uuid1 was given an invalid node ID')
def test_uuid1(self):
equal = self.assertEqual
# Make sure uuid1() generates UUIDs that are actually version 1.
for u in [self.uuid.uuid1() for i in range(10)]:
equal(u.variant, self.uuid.RFC_4122)
equal(u.version, 1)
self.assertIn(u.is_safe, {self.uuid.SafeUUID.safe,
self.uuid.SafeUUID.unsafe,
self.uuid.SafeUUID.unknown})
# Make sure the generated UUIDs are actually unique.
uuids = {}
for u in [self.uuid.uuid1() for i in range(1000)]:
uuids[u] = 1
equal(len(uuids.keys()), 1000)
# Make sure the supplied node ID appears in the UUID.
u = self.uuid.uuid1(0)
equal(u.node, 0)
u = self.uuid.uuid1(0x123456789abc)
equal(u.node, 0x123456789abc)
u = self.uuid.uuid1(0xffffffffffff)
equal(u.node, 0xffffffffffff)
# Make sure the supplied clock sequence appears in the UUID.
u = self.uuid.uuid1(0x123456789abc, 0)
equal(u.node, 0x123456789abc)
equal(((u.clock_seq_hi_variant & 0x3f) << 8) | u.clock_seq_low, 0)
u = self.uuid.uuid1(0x123456789abc, 0x1234)
equal(u.node, 0x123456789abc)
equal(((u.clock_seq_hi_variant & 0x3f) << 8) |
u.clock_seq_low, 0x1234)
u = self.uuid.uuid1(0x123456789abc, 0x3fff)
equal(u.node, 0x123456789abc)
equal(((u.clock_seq_hi_variant & 0x3f) << 8) |
u.clock_seq_low, 0x3fff)
# bpo-29925: On Mac OS X Tiger, self.uuid.uuid1().is_safe returns
# self.uuid.SafeUUID.unknown
@support.requires_mac_ver(10, 5)
@unittest.skipUnless(os.name == 'posix', 'POSIX-only test')
def test_uuid1_safe(self):
if not self.uuid._has_uuid_generate_time_safe:
self.skipTest('requires uuid_generate_time_safe(3)')
u = self.uuid.uuid1()
# uuid_generate_time_safe() may return 0 or -1 but what it returns is
# dependent on the underlying platform support. At least it cannot be
# unknown (unless I suppose the platform is buggy).
self.assertNotEqual(u.is_safe, self.uuid.SafeUUID.unknown)
@contextlib.contextmanager
def mock_generate_time_safe(self, safe_value):
"""
Mock uuid._generate_time_safe() to return a given *safe_value*.
"""
if os.name != 'posix':
self.skipTest('POSIX-only test')
self.uuid._load_system_functions()
f = self.uuid._generate_time_safe
if f is None:
self.skipTest('need uuid._generate_time_safe')
with mock.patch.object(self.uuid, '_generate_time_safe',
lambda: (f()[0], safe_value)):
yield
@unittest.skipUnless(os.name == 'posix', 'POSIX-only test')
def test_uuid1_unknown(self):
# Even if the platform has uuid_generate_time_safe(), let's mock it to
# be uuid_generate_time() and ensure the safety is unknown.
with self.mock_generate_time_safe(None):
u = self.uuid.uuid1()
self.assertEqual(u.is_safe, self.uuid.SafeUUID.unknown)
@unittest.skipUnless(os.name == 'posix', 'POSIX-only test')
def test_uuid1_is_safe(self):
with self.mock_generate_time_safe(0):
u = self.uuid.uuid1()
self.assertEqual(u.is_safe, self.uuid.SafeUUID.safe)
@unittest.skipUnless(os.name == 'posix', 'POSIX-only test')
def test_uuid1_is_unsafe(self):
with self.mock_generate_time_safe(-1):
u = self.uuid.uuid1()
self.assertEqual(u.is_safe, self.uuid.SafeUUID.unsafe)
@unittest.skipUnless(os.name == 'posix', 'POSIX-only test')
def test_uuid1_bogus_return_value(self):
with self.mock_generate_time_safe(3):
u = self.uuid.uuid1()
self.assertEqual(u.is_safe, self.uuid.SafeUUID.unknown)
def test_uuid1_time(self):
with mock.patch.object(self.uuid, '_has_uuid_generate_time_safe', False), \
mock.patch.object(self.uuid, '_generate_time_safe', None), \
mock.patch.object(self.uuid, '_last_timestamp', None), \
mock.patch.object(self.uuid, 'getnode', return_value=93328246233727), \
mock.patch('time.time_ns', return_value=1545052026752910643), \
mock.patch('random.getrandbits', return_value=5317): # guaranteed to be random
u = self.uuid.uuid1()
self.assertEqual(u, self.uuid.UUID('a7a55b92-01fc-11e9-94c5-54e1acf6da7f'))
with mock.patch.object(self.uuid, '_has_uuid_generate_time_safe', False), \
mock.patch.object(self.uuid, '_generate_time_safe', None), \
mock.patch.object(self.uuid, '_last_timestamp', None), \
mock.patch('time.time_ns', return_value=1545052026752910643):
u = self.uuid.uuid1(node=93328246233727, clock_seq=5317)
self.assertEqual(u, self.uuid.UUID('a7a55b92-01fc-11e9-94c5-54e1acf6da7f'))
def test_uuid3(self):
equal = self.assertEqual
# Test some known version-3 UUIDs.
for u, v in [(self.uuid.uuid3(self.uuid.NAMESPACE_DNS, 'python.org'),
'6fa459ea-ee8a-3ca4-894e-db77e160355e'),
(self.uuid.uuid3(self.uuid.NAMESPACE_URL, 'http://python.org/'),
'9fe8e8c4-aaa8-32a9-a55c-4535a88b748d'),
(self.uuid.uuid3(self.uuid.NAMESPACE_OID, '1.3.6.1'),
'dd1a1cef-13d5-368a-ad82-eca71acd4cd1'),
(self.uuid.uuid3(self.uuid.NAMESPACE_X500, 'c=ca'),
'658d3002-db6b-3040-a1d1-8ddd7d189a4d'),
]:
equal(u.variant, self.uuid.RFC_4122)
equal(u.version, 3)
equal(u, self.uuid.UUID(v))
equal(str(u), v)
def test_uuid4(self):
equal = self.assertEqual
# Make sure uuid4() generates UUIDs that are actually version 4.
for u in [self.uuid.uuid4() for i in range(10)]:
equal(u.variant, self.uuid.RFC_4122)
equal(u.version, 4)
# Make sure the generated UUIDs are actually unique.
uuids = {}
for u in [self.uuid.uuid4() for i in range(1000)]:
uuids[u] = 1
equal(len(uuids.keys()), 1000)
def test_uuid5(self):
equal = self.assertEqual
# Test some known version-5 UUIDs.
for u, v in [(self.uuid.uuid5(self.uuid.NAMESPACE_DNS, 'python.org'),
'886313e1-3b8a-5372-9b90-0c9aee199e5d'),
(self.uuid.uuid5(self.uuid.NAMESPACE_URL, 'http://python.org/'),
'4c565f0d-3f5a-5890-b41b-20cf47701c5e'),
(self.uuid.uuid5(self.uuid.NAMESPACE_OID, '1.3.6.1'),
'1447fa61-5277-5fef-a9b3-fbc6e44f4af3'),
(self.uuid.uuid5(self.uuid.NAMESPACE_X500, 'c=ca'),
'cc957dd1-a972-5349-98cd-874190002798'),
]:
equal(u.variant, self.uuid.RFC_4122)
equal(u.version, 5)
equal(u, self.uuid.UUID(v))
equal(str(u), v)
@support.requires_fork()
def testIssue8621(self):
# On at least some versions of OSX self.uuid.uuid4 generates
# the same sequence of UUIDs in the parent and any
# children started using fork.
fds = os.pipe()
pid = os.fork()
if pid == 0:
os.close(fds[0])
value = self.uuid.uuid4()
os.write(fds[1], value.hex.encode('latin-1'))
os._exit(0)
else:
os.close(fds[1])
self.addCleanup(os.close, fds[0])
parent_value = self.uuid.uuid4().hex
support.wait_process(pid, exitcode=0)
child_value = os.read(fds[0], 100).decode('latin-1')
self.assertNotEqual(parent_value, child_value)
def test_uuid_weakref(self):
# bpo-35701: check that weak referencing to a UUID object can be created
strong = self.uuid.uuid4()
weak = weakref.ref(strong)
self.assertIs(strong, weak())
class TestUUIDWithoutExtModule(BaseTestUUID, unittest.TestCase):
uuid = py_uuid
@unittest.skipUnless(c_uuid, 'requires the C _uuid module')
class TestUUIDWithExtModule(BaseTestUUID, unittest.TestCase):
uuid = c_uuid
class BaseTestInternals:
_uuid = py_uuid
def check_parse_mac(self, aix):
if not aix:
patch = mock.patch.multiple(self.uuid,
_MAC_DELIM=b':',
_MAC_OMITS_LEADING_ZEROES=False)
else:
patch = mock.patch.multiple(self.uuid,
_MAC_DELIM=b'.',
_MAC_OMITS_LEADING_ZEROES=True)
with patch:
# Valid MAC addresses
if not aix:
tests = (
(b'52:54:00:9d:0e:67', 0x5254009d0e67),
(b'12:34:56:78:90:ab', 0x1234567890ab),
)
else:
# AIX format
tests = (
(b'fe.ad.c.1.23.4', 0xfead0c012304),
)
for mac, expected in tests:
self.assertEqual(self.uuid._parse_mac(mac), expected)
# Invalid MAC addresses
for mac in (
b'',
# IPv6 addresses with same length than valid MAC address
# (17 characters)
b'fe80::5054:ff:fe9',
b'123:2:3:4:5:6:7:8',
# empty 5rd field
b'52:54:00:9d::67',
# only 5 fields instead of 6
b'52:54:00:9d:0e'
# invalid character 'x'
b'52:54:00:9d:0e:6x'
# dash separator
b'52-54-00-9d-0e-67',
):
if aix:
mac = mac.replace(b':', b'.')
with self.subTest(mac=mac):
self.assertIsNone(self.uuid._parse_mac(mac))
def test_parse_mac(self):
self.check_parse_mac(False)
def test_parse_mac_aix(self):
self.check_parse_mac(True)
def test_find_under_heading(self):
data = '''\
Name Mtu Network Address Ipkts Ierrs Opkts Oerrs Coll
en0 1500 link#2 fe.ad.c.1.23.4 1714807956 0 711348489 0 0
01:00:5e:00:00:01
en0 1500 192.168.129 x071 1714807956 0 711348489 0 0
224.0.0.1
en0 1500 192.168.90 x071 1714807956 0 711348489 0 0
224.0.0.1
'''
# The above data is from AIX - with '.' as _MAC_DELIM and strings
# shorter than 17 bytes (no leading 0). (_MAC_OMITS_LEADING_ZEROES=True)
with mock.patch.multiple(self.uuid,
_MAC_DELIM=b'.',
_MAC_OMITS_LEADING_ZEROES=True,
_get_command_stdout=mock_get_command_stdout(data)):
mac = self.uuid._find_mac_under_heading(
command='netstat',
args='-ian',
heading=b'Address',
)
self.assertEqual(mac, 0xfead0c012304)
def test_find_under_heading_ipv6(self):
# bpo-39991: IPv6 address "fe80::5054:ff:fe9" looks like a MAC address
# (same string length) but must be skipped
data = '''\
Name Mtu Network Address Ipkts Ierrs Idrop Opkts Oerrs Coll
vtnet 1500 <Link#1> 52:54:00:9d:0e:67 10017 0 0 8174 0 0
vtnet - fe80::%vtnet0 fe80::5054:ff:fe9 0 - - 4 - -
vtnet - 192.168.122.0 192.168.122.45 8844 - - 8171 - -
lo0 16384 <Link#2> lo0 260148 0 0 260148 0 0
lo0 - ::1/128 ::1 193 - - 193 - -
ff01::1%lo0
ff02::2:2eb7:74fa
ff02::2:ff2e:b774
ff02::1%lo0
ff02::1:ff00:1%lo
lo0 - fe80::%lo0/64 fe80::1%lo0 0 - - 0 - -
ff01::1%lo0
ff02::2:2eb7:74fa
ff02::2:ff2e:b774
ff02::1%lo0
ff02::1:ff00:1%lo
lo0 - 127.0.0.0/8 127.0.0.1 259955 - - 259955 - -
224.0.0.1
'''
with mock.patch.multiple(self.uuid,
_MAC_DELIM=b':',
_MAC_OMITS_LEADING_ZEROES=False,
_get_command_stdout=mock_get_command_stdout(data)):
mac = self.uuid._find_mac_under_heading(
command='netstat',
args='-ian',
heading=b'Address',
)
self.assertEqual(mac, 0x5254009d0e67)
def test_find_mac_near_keyword(self):
# key and value are on the same line
data = '''
fake Link encap:UNSPEC hwaddr 00-00
cscotun0 Link encap:UNSPEC HWaddr 00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00
eth0 Link encap:Ethernet HWaddr 12:34:56:78:90:ab
'''
# The above data will only be parsed properly on non-AIX unixes.
with mock.patch.multiple(self.uuid,
_MAC_DELIM=b':',
_MAC_OMITS_LEADING_ZEROES=False,
_get_command_stdout=mock_get_command_stdout(data)):
mac = self.uuid._find_mac_near_keyword(
command='ifconfig',
args='',
keywords=[b'hwaddr'],
get_word_index=lambda x: x + 1,
)
self.assertEqual(mac, 0x1234567890ab)
def check_node(self, node, requires=None):
if requires and node is None:
self.skipTest('requires ' + requires)
hex = '%012x' % node
if support.verbose >= 2:
print(hex, end=' ')
self.assertTrue(0 < node < (1 << 48),
"%s is not an RFC 4122 node ID" % hex)
@unittest.skipUnless(_uuid._ifconfig_getnode in _uuid._GETTERS,
"ifconfig is not used for introspection on this platform")
def test_ifconfig_getnode(self):
node = self.uuid._ifconfig_getnode()
self.check_node(node, 'ifconfig')
@unittest.skipUnless(_uuid._ip_getnode in _uuid._GETTERS,
"ip is not used for introspection on this platform")
def test_ip_getnode(self):
node = self.uuid._ip_getnode()
self.check_node(node, 'ip')
@unittest.skipUnless(_uuid._arp_getnode in _uuid._GETTERS,
"arp is not used for introspection on this platform")
def test_arp_getnode(self):
node = self.uuid._arp_getnode()
self.check_node(node, 'arp')
@unittest.skipUnless(_uuid._lanscan_getnode in _uuid._GETTERS,
"lanscan is not used for introspection on this platform")
def test_lanscan_getnode(self):
node = self.uuid._lanscan_getnode()
self.check_node(node, 'lanscan')
@unittest.skipUnless(_uuid._netstat_getnode in _uuid._GETTERS,
"netstat is not used for introspection on this platform")
def test_netstat_getnode(self):
node = self.uuid._netstat_getnode()
self.check_node(node, 'netstat')
def test_random_getnode(self):
node = self.uuid._random_getnode()
# The multicast bit, i.e. the least significant bit of first octet,
# must be set for randomly generated MAC addresses. See RFC 4122,
# $4.1.6.
self.assertTrue(node & (1 << 40), '%012x' % node)
self.check_node(node)
node2 = self.uuid._random_getnode()
self.assertNotEqual(node2, node, '%012x' % node)
class TestInternalsWithoutExtModule(BaseTestInternals, unittest.TestCase):
uuid = py_uuid
@unittest.skipUnless(c_uuid, 'requires the C _uuid module')
class TestInternalsWithExtModule(BaseTestInternals, unittest.TestCase):
uuid = c_uuid
@unittest.skipUnless(os.name == 'posix', 'requires Posix')
def test_unix_getnode(self):
if not importable('_uuid') and not importable('ctypes'):
self.skipTest("neither _uuid extension nor ctypes available")
try: # Issues 1481, 3581: _uuid_generate_time() might be None.
node = self.uuid._unix_getnode()
except TypeError:
self.skipTest('requires uuid_generate_time')
self.check_node(node, 'unix')
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_windll_getnode(self):
node = self.uuid._windll_getnode()
self.check_node(node)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,402 @@
r"""UUID objects (universally unique identifiers) according to RFC 4122.
This module provides immutable UUID objects (class UUID) and the functions
uuid1(), uuid3(), uuid4(), uuid5() for generating version 1, 3, 4, and 5
UUIDs as specified in RFC 4122.
If all you want is a unique ID, you should probably call uuid1() or uuid4().
Note that uuid1() may compromise privacy since it creates a UUID containing
the computer's network address. uuid4() creates a random UUID.
Typical usage:
>>> import uuid
# make a UUID based on the host ID and current time
>>> uuid.uuid1() # doctest: +SKIP
UUID('a8098c1a-f86e-11da-bd1a-00112444be1e')
# make a UUID using an MD5 hash of a namespace UUID and a name
>>> uuid.uuid3(uuid.NAMESPACE_DNS, 'python.org')
UUID('6fa459ea-ee8a-3ca4-894e-db77e160355e')
# make a random UUID
>>> uuid.uuid4() # doctest: +SKIP
UUID('16fd2706-8baf-433b-82eb-8c7fada847da')
# make a UUID using a SHA-1 hash of a namespace UUID and a name
>>> uuid.uuid5(uuid.NAMESPACE_DNS, 'python.org')
UUID('886313e1-3b8a-5372-9b90-0c9aee199e5d')
# make a UUID from a string of hex digits (braces and hyphens ignored)
>>> x = uuid.UUID('{00010203-0405-0607-0809-0a0b0c0d0e0f}')
# convert a UUID to a string of hex digits in standard form
>>> str(x)
'00010203-0405-0607-0809-0a0b0c0d0e0f'
# get the raw 16 bytes of the UUID
>>> x.bytes
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
# make a UUID from a 16-byte string
>>> uuid.UUID(bytes=x.bytes)
UUID('00010203-0405-0607-0809-0a0b0c0d0e0f')
"""
import os
import sys
RESERVED_NCS, RFC_4122, RESERVED_MICROSOFT, RESERVED_FUTURE = [
'reserved for NCS compatibility', 'specified in RFC 4122',
'reserved for Microsoft compatibility', 'reserved for future definition']
int_ = int # The built-in int type
bytes_ = bytes # The built-in bytes type
class UUID:
"""Instances of the UUID class represent UUIDs as specified in RFC 4122.
UUID objects are immutable, hashable, and usable as dictionary keys.
Converting a UUID to a string with str() yields something in the form
'12345678-1234-1234-1234-123456789abc'. The UUID constructor accepts
five possible forms: a similar string of hexadecimal digits, or a tuple
of six integer fields (with 32-bit, 16-bit, 16-bit, 8-bit, 8-bit, and
48-bit values respectively) as an argument named 'fields', or a string
of 16 bytes (with all the integer fields in big-endian order) as an
argument named 'bytes', or a string of 16 bytes (with the first three
fields in little-endian order) as an argument named 'bytes_le', or a
single 128-bit integer as an argument named 'int'.
UUIDs have these read-only attributes:
bytes the UUID as a 16-byte string (containing the six
integer fields in big-endian byte order)
bytes_le the UUID as a 16-byte string (with time_low, time_mid,
and time_hi_version in little-endian byte order)
fields a tuple of the six integer fields of the UUID,
which are also available as six individual attributes
and two derived attributes:
time_low the first 32 bits of the UUID
time_mid the next 16 bits of the UUID
time_hi_version the next 16 bits of the UUID
clock_seq_hi_variant the next 8 bits of the UUID
clock_seq_low the next 8 bits of the UUID
node the last 48 bits of the UUID
time the 60-bit timestamp
clock_seq the 14-bit sequence number
hex the UUID as a 32-character hexadecimal string
int the UUID as a 128-bit integer
urn the UUID as a URN as specified in RFC 4122
variant the UUID variant (one of the constants RESERVED_NCS,
RFC_4122, RESERVED_MICROSOFT, or RESERVED_FUTURE)
version the UUID version number (1 through 5, meaningful only
when the variant is RFC_4122)
is_safe An enum indicating whether the UUID has been generated in
a way that is safe for multiprocessing applications, via
uuid_generate_time_safe(3).
"""
def __init__(self, hex=None, bytes=None, bytes_le=None, fields=None,
int=None, version=None,
*):
r"""Create a UUID from either a string of 32 hexadecimal digits,
a string of 16 bytes as the 'bytes' argument, a string of 16 bytes
in little-endian order as the 'bytes_le' argument, a tuple of six
integers (32-bit time_low, 16-bit time_mid, 16-bit time_hi_version,
8-bit clock_seq_hi_variant, 8-bit clock_seq_low, 48-bit node) as
the 'fields' argument, or a single 128-bit integer as the 'int'
argument. When a string of hex digits is given, curly braces,
hyphens, and a URN prefix are all optional. For example, these
expressions all yield the same UUID:
UUID('{12345678-1234-5678-1234-567812345678}')
UUID('12345678123456781234567812345678')
UUID('urn:uuid:12345678-1234-5678-1234-567812345678')
UUID(bytes='\x12\x34\x56\x78'*4)
UUID(bytes_le='\x78\x56\x34\x12\x34\x12\x78\x56' +
'\x12\x34\x56\x78\x12\x34\x56\x78')
UUID(fields=(0x12345678, 0x1234, 0x5678, 0x12, 0x34, 0x567812345678))
UUID(int=0x12345678123456781234567812345678)
Exactly one of 'hex', 'bytes', 'bytes_le', 'fields', or 'int' must
be given. The 'version' argument is optional; if given, the resulting
UUID will have its variant and version set according to RFC 4122,
overriding the given 'hex', 'bytes', 'bytes_le', 'fields', or 'int'.
is_safe is an enum exposed as an attribute on the instance. It
indicates whether the UUID has been generated in a way that is safe
for multiprocessing applications, via uuid_generate_time_safe(3).
"""
if [hex, bytes, bytes_le, fields, int].count(None) != 4:
raise TypeError('one of the hex, bytes, bytes_le, fields, '
'or int arguments must be given')
if hex is not None:
hex = hex.replace('urn:', '').replace('uuid:', '')
hex = hex.strip('{}').replace('-', '')
if len(hex) != 32:
raise ValueError('badly formed hexadecimal UUID string')
int = int_(hex, 16)
if bytes_le is not None:
if len(bytes_le) != 16:
raise ValueError('bytes_le is not a 16-char string')
bytes = (bytes_le[4-1::-1] + bytes_le[6-1:4-1:-1] +
bytes_le[8-1:6-1:-1] + bytes_le[8:])
if bytes is not None:
if len(bytes) != 16:
raise ValueError('bytes is not a 16-char string')
assert isinstance(bytes, bytes_), repr(bytes)
int = int_.from_bytes(bytes, 'big')
if fields is not None:
if len(fields) != 6:
raise ValueError('fields is not a 6-tuple')
(time_low, time_mid, time_hi_version,
clock_seq_hi_variant, clock_seq_low, node) = fields
if not 0 <= time_low < 1<<32:
raise ValueError('field 1 out of range (need a 32-bit value)')
if not 0 <= time_mid < 1<<16:
raise ValueError('field 2 out of range (need a 16-bit value)')
if not 0 <= time_hi_version < 1<<16:
raise ValueError('field 3 out of range (need a 16-bit value)')
if not 0 <= clock_seq_hi_variant < 1<<8:
raise ValueError('field 4 out of range (need an 8-bit value)')
if not 0 <= clock_seq_low < 1<<8:
raise ValueError('field 5 out of range (need an 8-bit value)')
if not 0 <= node < 1<<48:
raise ValueError('field 6 out of range (need a 48-bit value)')
clock_seq = (clock_seq_hi_variant << 8) | clock_seq_low
int = ((time_low << 96) | (time_mid << 80) |
(time_hi_version << 64) | (clock_seq << 48) | node)
if int is not None:
if not 0 <= int < 1<<128:
raise ValueError('int is out of range (need a 128-bit value)')
if version is not None:
if not 1 <= version <= 5:
raise ValueError('illegal version number')
# Set the variant to RFC 4122.
int &= ~(0xc000 << 48)
int |= 0x8000 << 48
# Set the version number.
int &= ~(0xf000 << 64)
int |= version << 76
object.__setattr__(self, 'int', int)
def __eq__(self, other):
if isinstance(other, UUID):
return self.int == other.int
return NotImplemented
# Q. What's the value of being able to sort UUIDs?
# A. Use them as keys in a B-Tree or similar mapping.
def __lt__(self, other):
if isinstance(other, UUID):
return self.int < other.int
return NotImplemented
def __gt__(self, other):
if isinstance(other, UUID):
return self.int > other.int
return NotImplemented
def __le__(self, other):
if isinstance(other, UUID):
return self.int <= other.int
return NotImplemented
def __ge__(self, other):
if isinstance(other, UUID):
return self.int >= other.int
return NotImplemented
def __hash__(self):
return hash(self.int)
def __int__(self):
return self.int
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, str(self))
def __setattr__(self, name, value):
raise TypeError('UUID objects are immutable')
def __str__(self):
hex = '%032x' % self.int
return '%s-%s-%s-%s-%s' % (
hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])
@property
def bytes(self):
return self.int.to_bytes(16, 'big')
@property
def bytes_le(self):
bytes = self.bytes
return (bytes[4-1::-1] + bytes[6-1:4-1:-1] + bytes[8-1:6-1:-1] +
bytes[8:])
@property
def fields(self):
return (self.time_low, self.time_mid, self.time_hi_version,
self.clock_seq_hi_variant, self.clock_seq_low, self.node)
@property
def time_low(self):
return self.int >> 96
@property
def time_mid(self):
return (self.int >> 80) & 0xffff
@property
def time_hi_version(self):
return (self.int >> 64) & 0xffff
@property
def clock_seq_hi_variant(self):
return (self.int >> 56) & 0xff
@property
def clock_seq_low(self):
return (self.int >> 48) & 0xff
@property
def time(self):
return (((self.time_hi_version & 0x0fff) << 48) |
(self.time_mid << 32) | self.time_low)
@property
def clock_seq(self):
return (((self.clock_seq_hi_variant & 0x3f) << 8) |
self.clock_seq_low)
@property
def node(self):
return self.int & 0xffffffffffff
@property
def hex(self):
return '%032x' % self.int
@property
def urn(self):
return 'urn:uuid:' + str(self)
@property
def variant(self):
if not self.int & (0x8000 << 48):
return RESERVED_NCS
elif not self.int & (0x4000 << 48):
return RFC_4122
elif not self.int & (0x2000 << 48):
return RESERVED_MICROSOFT
else:
return RESERVED_FUTURE
@property
def version(self):
# The version bits are only meaningful for RFC 4122 UUIDs.
if self.variant == RFC_4122:
return int((self.int >> 76) & 0xf)
def _micropython_get_mac():
'''Error handling needed! network won't exist on ports/boards with no network access (unix).
Need to fall back to using _random_getnode() if this fails.
'''
mac = b''
try:
import network
wlan = network.WLAN()
mac = wlan.config('mac')
except ImportError:
from os import urandom
mac = urandom(6)
return int.from_bytes(mac, "little")
def _random_getnode():
"""Get a random node ID."""
# RFC 4122, $4.1.6 says "For systems with no IEEE address, a randomly or
# pseudo-randomly generated value may be used; see Section 4.5. The
# multicast bit must be set in such addresses, in order that they will
# never conflict with addresses obtained from network cards."
#
# The "multicast bit" of a MAC address is defined to be "the least
# significant bit of the first octet". This works out to be the 41st bit
# counting from 1 being the least significant bit, or 1<<40.
#
# See https://en.wikipedia.org/wiki/MAC_address#Unicast_vs._multicast
import random
return random.getrandbits(48) | (1 << 40)
_last_timestamp = None
def uuid1(node=None, clock_seq=None):
"""Generate a UUID from a host ID, sequence number, and the current time.
If 'node' is not given, getnode() is used to obtain the hardware
address. If 'clock_seq' is given, it is used as the sequence number;
otherwise a random 14-bit sequence number is chosen."""
global _last_timestamp
import time
nanoseconds = time.time_ns()
# 0x01b21dd213814000 is the number of 100-ns intervals between the
# UUID epoch 1582-10-15 00:00:00 and the Unix epoch 1970-01-01 00:00:00.
# TODO mst This timestamp will need to be adjusted (MicroPython Epoch is different)
timestamp = nanoseconds // 100 + 0x01b21dd213814000
if _last_timestamp is not None and timestamp <= _last_timestamp:
timestamp = _last_timestamp + 1
_last_timestamp = timestamp
if clock_seq is None:
import random
clock_seq = random.getrandbits(14) # instead of stable storage
time_low = timestamp & 0xffffffff
time_mid = (timestamp >> 32) & 0xffff
time_hi_version = (timestamp >> 48) & 0x0fff
clock_seq_low = clock_seq & 0xff
clock_seq_hi_variant = (clock_seq >> 8) & 0x3f
if node is None:
node = _micropython_get_mac()
return UUID(fields=(time_low, time_mid, time_hi_version,
clock_seq_hi_variant, clock_seq_low, node), version=1)
def uuid3(namespace, name):
"""Generate a UUID from the MD5 hash of a namespace UUID and a name."""
from hashlib import md5
digest = md5(
namespace.bytes + bytes(name, "utf-8")
).digest()
return UUID(bytes=digest[:16], version=3)
def uuid4():
"""Generate a random UUID."""
return UUID(bytes=os.urandom(16), version=4)
def uuid5(namespace, name):
"""Generate a UUID from the SHA-1 hash of a namespace UUID and a name."""
from hashlib import sha1
hash = sha1(namespace.bytes + bytes(name, "utf-8")).digest()
return UUID(bytes=hash[:16], version=5)
# The following standard UUIDs are for use with uuid3() or uuid5().
NAMESPACE_DNS = UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
NAMESPACE_URL = UUID('6ba7b811-9dad-11d1-80b4-00c04fd430c8')
NAMESPACE_OID = UUID('6ba7b812-9dad-11d1-80b4-00c04fd430c8')
NAMESPACE_X500 = UUID('6ba7b814-9dad-11d1-80b4-00c04fd430c8')

Wyświetl plik

@ -30,7 +30,11 @@ function ci_build_packages_check_manifest {
for file in $(find -name manifest.py); do
echo "##################################################"
echo "# Testing $file"
python3 /tmp/micropython/tools/manifestfile.py --lib . --compile $file
extra_args=
if [[ "$file" =~ "/unix-ffi/" ]]; then
extra_args="--unix-ffi"
fi
python3 /tmp/micropython/tools/manifestfile.py $extra_args --lib . --compile $file
done
}

Wyświetl plik

@ -19,9 +19,13 @@ replacement for CPython.
### Usage
To use a unix-specific library, pass `unix_ffi=True` to `require()` in your
manifest file.
To use a unix-specific library, a manifest file must add the `unix-ffi`
library to the library search path using `add_library()`:
```py
require("os", unix_ffi=True) # Use the unix-ffi version instead of python-stdlib.
add_library("unix-ffi", "$(MPY_LIB_DIR)/unix-ffi", prepend=True)
```
Prepending the `unix-ffi` library to the path will make it so that the
`unix-ffi` version of a package will be preferred if that package appears in
both `unix-ffi` and another library (eg `python-stdlib`).

Wyświetl plik

@ -1,5 +1,5 @@
metadata(version="3.3.4")
require("re", unix_ffi=True)
require("re")
module("_markupbase.py")

Wyświetl plik

@ -1,7 +1,7 @@
metadata(version="0.5.1")
require("functools")
require("email.encoders", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.encoders")
require("email.errors")
package("email")

Wyświetl plik

@ -3,7 +3,7 @@ metadata(version="0.5.1")
require("base64")
require("binascii")
require("quopri")
require("re", unix_ffi=True)
require("re")
require("string")
package("email")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="0.5.1")
require("re", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.message", unix_ffi=True)
require("email.internal", unix_ffi=True)
require("re")
require("email.errors")
require("email.message")
require("email.internal")
package("email")

Wyświetl plik

@ -1,9 +1,9 @@
metadata(version="0.5.2")
require("re", unix_ffi=True)
require("re")
require("binascii")
require("email.encoders", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.charset", unix_ffi=True)
require("email.encoders")
require("email.errors")
require("email.charset")
package("email")

Wyświetl plik

@ -1,15 +1,15 @@
metadata(version="0.5.1")
require("re", unix_ffi=True)
require("re")
require("base64")
require("binascii")
require("functools")
require("string")
# require("calendar") TODO
require("abc")
require("email.errors", unix_ffi=True)
require("email.header", unix_ffi=True)
require("email.charset", unix_ffi=True)
require("email.utils", unix_ffi=True)
require("email.errors")
require("email.header")
require("email.charset")
require("email.utils")
package("email")

Wyświetl plik

@ -1,11 +1,11 @@
metadata(version="0.5.3")
require("re", unix_ffi=True)
require("re")
require("uu")
require("base64")
require("binascii")
require("email.utils", unix_ffi=True)
require("email.errors", unix_ffi=True)
require("email.charset", unix_ffi=True)
require("email.utils")
require("email.errors")
require("email.charset")
package("email")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="0.5.1")
require("warnings")
require("email.feedparser", unix_ffi=True)
require("email.message", unix_ffi=True)
require("email.internal", unix_ffi=True)
require("email.feedparser")
require("email.message")
require("email.internal")
package("email")

Wyświetl plik

@ -1,13 +1,13 @@
metadata(version="3.3.4")
require("os", unix_ffi=True)
require("re", unix_ffi=True)
require("os")
require("re")
require("base64")
require("random")
require("datetime")
require("urllib.parse", unix_ffi=True)
require("urllib.parse")
require("warnings")
require("quopri")
require("email.charset", unix_ffi=True)
require("email.charset")
package("email")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.0.4")
# Originally written by Paul Sokolovsky.
require("ffilib", unix_ffi=True)
require("ffilib")
module("fcntl.py")

Wyświetl plik

@ -1,5 +1,5 @@
metadata(version="3.3.4")
require("os", unix_ffi=True)
require("os")
module("getopt.py")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.1.0")
# Originally written by Riccardo Magliocchetti.
require("ffilib", unix_ffi=True)
require("ffilib")
module("gettext.py")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="0.5.2")
require("os", unix_ffi=True)
require("os")
require("os-path")
require("re", unix_ffi=True)
require("re")
require("fnmatch")
module("glob.py")

Wyświetl plik

@ -1,8 +1,8 @@
metadata(version="3.3.4")
require("_markupbase", unix_ffi=True)
require("_markupbase")
require("warnings")
require("html.entities", unix_ffi=True)
require("re", unix_ffi=True)
require("html.entities")
require("re")
package("html")

Wyświetl plik

@ -1,10 +1,10 @@
metadata(version="0.5.1")
require("email.parser", unix_ffi=True)
require("email.message", unix_ffi=True)
require("socket", unix_ffi=True)
require("email.parser")
require("email.message")
require("socket")
require("collections")
require("urllib.parse", unix_ffi=True)
require("urllib.parse")
require("warnings")
package("http")

Wyświetl plik

@ -0,0 +1,4 @@
metadata(version="0.2.0")
require("re")
package("json")

Wyświetl plik

@ -2,8 +2,8 @@ metadata(version="0.2.1")
# Originally written by Paul Sokolovsky.
require("ffilib", unix_ffi=True)
require("os", unix_ffi=True)
require("signal", unix_ffi=True)
require("ffilib")
require("os")
require("signal")
package("machine")

Wyświetl plik

@ -2,8 +2,8 @@ metadata(version="0.1.2")
# Originally written by Paul Sokolovsky.
require("os", unix_ffi=True)
require("select", unix_ffi=True)
require("os")
require("select")
require("pickle")
module("multiprocessing.py")

Wyświetl plik

@ -2,7 +2,7 @@ metadata(version="0.6.0")
# Originally written by Paul Sokolovsky.
require("ffilib", unix_ffi=True)
require("ffilib")
require("errno")
require("stat")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.1.0")
# Originally written by Riccardo Magliocchetti.
require("ffilib", unix_ffi=True)
require("ffilib")
module("pwd.py")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.2.5")
# Originally written by Paul Sokolovsky.
require("ffilib", unix_ffi=True)
require("ffilib")
module("re.py")

Wyświetl plik

@ -2,7 +2,7 @@ metadata(version="0.3.0")
# Originally written by Paul Sokolovsky.
require("os", unix_ffi=True)
require("ffilib", unix_ffi=True)
require("os")
require("ffilib")
module("select.py")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.3.2")
# Originally written by Paul Sokolovsky.
require("ffilib", unix_ffi=True)
require("ffilib")
module("signal.py")

Wyświetl plik

@ -2,6 +2,6 @@ metadata(version="0.2.4")
# Originally written by Paul Sokolovsky.
require("ffilib", unix_ffi=True)
require("ffilib")
module("sqlite3.py")

Wyświetl plik

@ -1,5 +1,5 @@
metadata(version="0.5.0")
require("ffilib", unix_ffi=True)
require("ffilib")
module("time.py")

Wyświetl plik

@ -1,9 +1,9 @@
metadata(version="3.3.4")
require("getopt", unix_ffi=True)
require("getopt")
require("itertools")
# require("linecache") TODO
require("time", unix_ffi=True)
require("time")
require("traceback")
module("timeit.py")

Wyświetl plik

@ -2,8 +2,8 @@ metadata(version="0.1.2")
# Originally written by Paul Sokolovsky.
require("os", unix_ffi=True)
require("tty", unix_ffi=True)
require("select", unix_ffi=True)
require("os")
require("tty")
require("select")
package("ucurses")

Wyświetl plik

@ -1,6 +1,6 @@
metadata(version="0.5.2")
require("re", unix_ffi=True)
require("re")
require("collections")
require("collections-defaultdict")