micropython_eeprom/bdevice.py

210 wiersze
8.1 KiB
Python

# bdevice.py Hardware-agnostic base classes.
# BlockDevice Base class for general block devices e.g. EEPROM, FRAM.
# FlashDevice Base class for generic Flash memory (subclass of BlockDevice).
# Documentation in BASE_CLASSES.md
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2019-2024 Peter Hinch
from micropython import const
class BlockDevice:
def __init__(self, nbits, nchips, chip_size):
self._c_bytes = chip_size # Size of chip in bytes
self._a_bytes = chip_size * nchips # Size of array
self._nbits = nbits # Block size in bits
self._block_size = 2 ** nbits
self._rwbuf = bytearray(1)
def __len__(self):
return self._a_bytes
def __setitem__(self, addr, value):
if isinstance(addr, slice):
return self._wslice(addr, value)
self._rwbuf[0] = value
self.readwrite(addr, self._rwbuf, False)
def __getitem__(self, addr):
if isinstance(addr, slice):
return self._rslice(addr)
return self.readwrite(addr, self._rwbuf, True)[0]
# Handle special cases of a slice. Always return a pair of positive indices.
def _do_slice(self, addr):
if not (addr.step is None or addr.step == 1):
raise NotImplementedError("only slices with step=1 (aka None) are supported")
start = addr.start if addr.start is not None else 0
stop = addr.stop if addr.stop is not None else self._a_bytes
start = start if start >= 0 else self._a_bytes + start
stop = stop if stop >= 0 else self._a_bytes + stop
return start, stop
def _wslice(self, addr, value):
start, stop = self._do_slice(addr)
try:
if len(value) == (stop - start):
res = self.readwrite(start, value, False)
else:
raise RuntimeError("Slice must have same length as data")
except TypeError:
raise RuntimeError("Can only assign bytes/bytearray to a slice")
return res
def _rslice(self, addr):
start, stop = self._do_slice(addr)
buf = bytearray(stop - start)
return self.readwrite(start, buf, True)
# IOCTL protocol.
def sync(self): # Nothing to do for unbuffered devices. Subclass overrides.
return
def readblocks(self, blocknum, buf, offset=0):
self.readwrite(offset + (blocknum << self._nbits), buf, True)
def writeblocks(self, blocknum, buf, offset=0):
self.readwrite(offset + (blocknum << self._nbits), buf, False)
# https://docs.micropython.org/en/latest/library/os.html#os.AbstractBlockDev.ioctl
def ioctl(self, op, arg): # ioctl calls: see extmod/vfs.h
if op == 3: # SYNCHRONISE
self.sync()
return
if op == 4: # BP_IOCTL_SEC_COUNT
return self._a_bytes >> self._nbits
if op == 5: # BP_IOCTL_SEC_SIZE
return self._block_size
if op == 6: # Ignore ERASE because handled by driver.
return 0
# Hardware agnostic base class for EEPROM arrays
class EepromDevice(BlockDevice):
def __init__(self, nbits, nchips, chip_size, page_size, verbose):
super().__init__(nbits, nchips, chip_size)
# Handle page size arg
if page_size not in (None, 16, 32, 64, 128, 256):
raise ValueError(f"Invalid page size: {page_size}")
self._set_pagesize(page_size) # Set page size
verbose and print("Page size:", self._page_size)
def _psize(self, ps): # Set page size and page mask
self._page_size = ps
self._page_mask = ~(ps - 1)
def get_page_size(self): # For test script
return self._page_size
# Measuring page size should not be done in production code. See docs.
def _set_pagesize(self, page_size):
if page_size is None: # Measure it.
self._psize(16) # Conservative
old = self[:129] # Save old contents (nonvolatile!)
self._psize(256) # Ambitious
r = (16, 32, 64, 128) # Legal page sizes + 256
for x in r:
self[x] = 255 # Write single bytes, don't invoke page write
self[0:129] = b"\0" * 129 # Zero 129 bytes attempting to use 256 byte pages
try:
ps = next(z for z in r if self[z])
except StopIteration:
ps = 256
self._psize(ps)
self[:129] = old
else: # Validated page_size was supplied
self._psize(page_size)
# Hardware agnostic base class for flash memory.
_RDBUFSIZE = const(32) # Size of read buffer for erasure test
class FlashDevice(BlockDevice):
def __init__(self, nbits, nchips, chip_size, sec_size):
super().__init__(nbits, nchips, chip_size)
self.sec_size = sec_size
self._cache_mask = sec_size - 1 # For 4K sector size: 0xfff
self._fmask = self._cache_mask ^ 0x3FFFFFFF # 4K -> 0x3ffff000
self._buf = bytearray(_RDBUFSIZE)
self._mvbuf = memoryview(self._buf)
self._cache = bytearray(sec_size) # Cache always contains one sector
self._mvd = memoryview(self._cache)
self._acache = 0 # Address in chip of byte 0 of current cached sector.
# A newly cached sector, or one which has been flushed, will be clean,
# so .sync() will do nothing. If cache is modified, dirty will be set.
self._dirty = False
def read(self, addr, mvb):
nbytes = len(mvb)
next_sec = self._acache + self.sec_size # Start of next sector
if addr >= next_sec or addr + nbytes <= self._acache:
self.rdchip(addr, mvb) # No data is cached: just read from device
else:
# Some of address range is cached
boff = 0 # Offset into buf
if addr < self._acache: # Read data prior to cache from chip
nr = self._acache - addr
self.rdchip(addr, mvb[:nr])
addr = self._acache # Start of cached data
nbytes -= nr
boff += nr
# addr now >= self._acache: read from cache.
sa = addr - self._acache # Offset into cache
nr = min(nbytes, self._acache + self.sec_size - addr) # No of bytes to read from cache
mvb[boff : boff + nr] = self._mvd[sa : sa + nr]
if nbytes - nr: # Get any remaining data from chip
self.rdchip(addr + nr, mvb[boff + nr :])
return mvb
def sync(self):
if self._dirty:
self.flush(self._mvd, self._acache) # Write out old data
self._dirty = False
return 0
# Performance enhancement: if cache intersects address range, update it first.
# Currently in this case it would be written twice. This may be rare.
def write(self, addr, mvb):
nbytes = len(mvb)
acache = self._acache
boff = 0 # Offset into buf.
while nbytes:
if (addr & self._fmask) != acache:
self.sync() # Erase sector and write out old data
self._fill_cache(addr) # Cache sector which includes addr
offs = addr & self._cache_mask # Offset into cache
npage = min(nbytes, self.sec_size - offs) # No. of bytes in current sector
self._mvd[offs : offs + npage] = mvb[boff : boff + npage]
self._dirty = True # Cache contents do not match those of chip
nbytes -= npage
boff += npage
addr += npage
return mvb
# Cache the sector which contains a given byte addresss. Save sector
# start address.
def _fill_cache(self, addr):
addr &= self._fmask
self.rdchip(addr, self._mvd)
self._acache = addr
self._dirty = False
def initialise(self):
self._fill_cache(0)
# Return True if a sector is erased.
def is_empty(self, addr, ev=0xFF):
mvb = self._mvbuf
erased = True
nbufs = self.sec_size // _RDBUFSIZE # Read buffers per sector
for _ in range(nbufs):
self.rdchip(addr, mvb)
if any(True for x in mvb if x != ev):
erased = False
break
addr += _RDBUFSIZE
return erased