# bdevice.py Hardware-agnostic base classes. # BlockDevice Base class for general block devices e.g. EEPROM, FRAM. # FlashDevice Base class for generic Flash memory (subclass of BlockDevice). # Documentation in BASE_CLASSES.md # Released under the MIT License (MIT). See LICENSE. # Copyright (c) 2019-2024 Peter Hinch from micropython import const class BlockDevice: def __init__(self, nbits, nchips, chip_size): self._c_bytes = chip_size # Size of chip in bytes self._a_bytes = chip_size * nchips # Size of array self._nbits = nbits # Block size in bits self._block_size = 2 ** nbits self._rwbuf = bytearray(1) def __len__(self): return self._a_bytes def __setitem__(self, addr, value): if isinstance(addr, slice): return self._wslice(addr, value) self._rwbuf[0] = value self.readwrite(addr, self._rwbuf, False) def __getitem__(self, addr): if isinstance(addr, slice): return self._rslice(addr) return self.readwrite(addr, self._rwbuf, True)[0] # Handle special cases of a slice. Always return a pair of positive indices. def _do_slice(self, addr): if not (addr.step is None or addr.step == 1): raise NotImplementedError("only slices with step=1 (aka None) are supported") start = addr.start if addr.start is not None else 0 stop = addr.stop if addr.stop is not None else self._a_bytes start = start if start >= 0 else self._a_bytes + start stop = stop if stop >= 0 else self._a_bytes + stop return start, stop def _wslice(self, addr, value): start, stop = self._do_slice(addr) try: if len(value) == (stop - start): res = self.readwrite(start, value, False) else: raise RuntimeError("Slice must have same length as data") except TypeError: raise RuntimeError("Can only assign bytes/bytearray to a slice") return res def _rslice(self, addr): start, stop = self._do_slice(addr) buf = bytearray(stop - start) return self.readwrite(start, buf, True) # IOCTL protocol. def sync(self): # Nothing to do for unbuffered devices. Subclass overrides. return def readblocks(self, blocknum, buf, offset=0): self.readwrite(offset + (blocknum << self._nbits), buf, True) def writeblocks(self, blocknum, buf, offset=0): self.readwrite(offset + (blocknum << self._nbits), buf, False) # https://docs.micropython.org/en/latest/library/os.html#os.AbstractBlockDev.ioctl def ioctl(self, op, arg): # ioctl calls: see extmod/vfs.h if op == 3: # SYNCHRONISE self.sync() return if op == 4: # BP_IOCTL_SEC_COUNT return self._a_bytes >> self._nbits if op == 5: # BP_IOCTL_SEC_SIZE return self._block_size if op == 6: # Ignore ERASE because handled by driver. return 0 # Hardware agnostic base class for EEPROM arrays class EepromDevice(BlockDevice): def __init__(self, nbits, nchips, chip_size, page_size, verbose): super().__init__(nbits, nchips, chip_size) # Handle page size arg if page_size not in (None, 16, 32, 64, 128, 256): raise ValueError(f"Invalid page size: {page_size}") self._set_pagesize(page_size) # Set page size verbose and print("Page size:", self._page_size) def _psize(self, ps): # Set page size and page mask self._page_size = ps self._page_mask = ~(ps - 1) def get_page_size(self): # For test script return self._page_size # Measuring page size should not be done in production code. See docs. def _set_pagesize(self, page_size): if page_size is None: # Measure it. self._psize(16) # Conservative old = self[:129] # Save old contents (nonvolatile!) self._psize(256) # Ambitious r = (16, 32, 64, 128) # Legal page sizes + 256 for x in r: self[x] = 255 # Write single bytes, don't invoke page write self[0:129] = b"\0" * 129 # Zero 129 bytes attempting to use 256 byte pages try: ps = next(z for z in r if self[z]) except StopIteration: ps = 256 self._psize(ps) self[:129] = old else: # Validated page_size was supplied self._psize(page_size) # Hardware agnostic base class for flash memory. _RDBUFSIZE = const(32) # Size of read buffer for erasure test class FlashDevice(BlockDevice): def __init__(self, nbits, nchips, chip_size, sec_size): super().__init__(nbits, nchips, chip_size) self.sec_size = sec_size self._cache_mask = sec_size - 1 # For 4K sector size: 0xfff self._fmask = self._cache_mask ^ 0x3FFFFFFF # 4K -> 0x3ffff000 self._buf = bytearray(_RDBUFSIZE) self._mvbuf = memoryview(self._buf) self._cache = bytearray(sec_size) # Cache always contains one sector self._mvd = memoryview(self._cache) self._acache = 0 # Address in chip of byte 0 of current cached sector. # A newly cached sector, or one which has been flushed, will be clean, # so .sync() will do nothing. If cache is modified, dirty will be set. self._dirty = False def read(self, addr, mvb): nbytes = len(mvb) next_sec = self._acache + self.sec_size # Start of next sector if addr >= next_sec or addr + nbytes <= self._acache: self.rdchip(addr, mvb) # No data is cached: just read from device else: # Some of address range is cached boff = 0 # Offset into buf if addr < self._acache: # Read data prior to cache from chip nr = self._acache - addr self.rdchip(addr, mvb[:nr]) addr = self._acache # Start of cached data nbytes -= nr boff += nr # addr now >= self._acache: read from cache. sa = addr - self._acache # Offset into cache nr = min(nbytes, self._acache + self.sec_size - addr) # No of bytes to read from cache mvb[boff : boff + nr] = self._mvd[sa : sa + nr] if nbytes - nr: # Get any remaining data from chip self.rdchip(addr + nr, mvb[boff + nr :]) return mvb def sync(self): if self._dirty: self.flush(self._mvd, self._acache) # Write out old data self._dirty = False return 0 # Performance enhancement: if cache intersects address range, update it first. # Currently in this case it would be written twice. This may be rare. def write(self, addr, mvb): nbytes = len(mvb) acache = self._acache boff = 0 # Offset into buf. while nbytes: if (addr & self._fmask) != acache: self.sync() # Erase sector and write out old data self._fill_cache(addr) # Cache sector which includes addr offs = addr & self._cache_mask # Offset into cache npage = min(nbytes, self.sec_size - offs) # No. of bytes in current sector self._mvd[offs : offs + npage] = mvb[boff : boff + npage] self._dirty = True # Cache contents do not match those of chip nbytes -= npage boff += npage addr += npage return mvb # Cache the sector which contains a given byte addresss. Save sector # start address. def _fill_cache(self, addr): addr &= self._fmask self.rdchip(addr, self._mvd) self._acache = addr self._dirty = False def initialise(self): self._fill_cache(0) # Return True if a sector is erased. def is_empty(self, addr, ev=0xFF): mvb = self._mvbuf erased = True nbufs = self.sec_size // _RDBUFSIZE # Read buffers per sector for _ in range(nbufs): self.rdchip(addr, mvb) if any(True for x in mvb if x != ev): erased = False break addr += _RDBUFSIZE return erased