"""XLS (BIFF8) reader — pure Python, zero dependencies. Parses OLE2 compound document + BIFF8 record stream using only the ``struct`` module. """ import struct from typing import Optional from models import FileFormatError # ── OLE2 constants ───────────────────────────────────────────────── OLE2_SIGNATURE = b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1' MSAT_SECT = 0xFFFFFFFE FREE_SECT = 0xFFFFFFFF ENDOFCHAIN = 0xFFFFFFFE # Directory entry types STGTY_INVALID = 0 STGTY_STORAGE = 1 STGTY_STREAM = 2 STGTY_ROOT = 5 # ── BIFF8 record opcodes ────────────────────────────────────────── BOF = 0x0009 EOF = 0x000A SST = 0x0034 BOUNDSHEET = 0x0085 DIMENSIONS = 0x0027 NUMBER = 0x0203 LABELSST = 0x00FD FORMULA = 0x0006 RK = 0x000C MULRK = 0x00BD LABEL = 0x0204 RSTRING = 0x00FD # same as LABELSST in some docs; we handle via SST INDEX = 0x00CD WINDOW2 = 0x003D class XLSReader: """Read an .xls (BIFF8) file and return a cell map.""" def __init__(self, filepath: str): self._filepath = filepath self._data: bytes = b'' self._sector_size: int = 512 self._mini_sector_size: int = 64 self._fat: list[int] = [] self._mini_fat: list[int] = [] self._directory: list[dict] = [] self._sst: list[str] = [] self._cells: dict[tuple[int, int], str] = {} # ── public API ────────────────────────────────────────────────── def read_all_cells(self) -> dict[tuple[int, int], str]: """Return {(row, col): str} for every non-empty cell.""" self._load_file() self._parse_ole2() self._find_workbook_stream() self._parse_biff8() return dict(self._cells) @staticmethod def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]: """Convenience function matching the xlsx_reader interface.""" return XLSReader(filepath).read_all_cells() # ── OLE2 layer ────────────────────────────────────────────────── def _load_file(self): with open(self._filepath, 'rb') as f: self._data = f.read() if len(self._data) < 512: raise FileFormatError("File too small to be a valid OLE2 document") if self._data[:8] != OLE2_SIGNATURE: raise FileFormatError("Not a valid OLE2 compound document") def _parse_ole2(self): """Parse the OLE2 header, FAT, directory, and MiniFAT.""" hdr = self._data[:512] # Sector size (usually 512 → shift=9, 4096 → shift=12) ss_shift = struct.unpack_from(' 0 and sect_mini_fat_start not in (ENDOFCHAIN, FREE_SECT): self._mini_fat = [] for ms in self._chain(sect_mini_fat_start): block = self._read_sector(ms) count = self._sector_size // 4 self._mini_fat.extend(struct.unpack_from(f'<{count}I', block)) def _chain(self, start: int) -> list[int]: """Follow a sector chain starting at *start*.""" chain = [] s = start while s not in (ENDOFCHAIN, FREE_SECT): chain.append(s) if s >= len(self._fat): break s = self._fat[s] return chain def _read_sector(self, sect: int) -> bytes: """Return the raw bytes of sector *sect*.""" offset = 512 + sect * self._sector_size return self._data[offset:offset + self._sector_size] def _read_stream(self, start: int, size: int, use_mini: bool = False) -> bytes: """Read a stream given its starting sector and total size.""" if use_mini: return self._read_mini_stream(start, size) chain = self._chain(start) parts = [] remaining = size for s in chain: chunk = self._read_sector(s) take = min(len(chunk), remaining) parts.append(chunk[:take]) remaining -= take if remaining <= 0: break return b''.join(parts) def _read_mini_stream(self, start: int, size: int) -> bytes: """Read a mini-stream (stored in the mini FAT area).""" # Find the "Root Entry" stream which holds mini-stream data root_entry = None for e in self._directory: if e['type'] == STGTY_ROOT: root_entry = e break if root_entry is None: raise FileFormatError("Cannot find Root Entry in OLE2 directory") root_data = self._read_stream(root_entry['start'], root_entry['size']) chain = self._mini_chain(start) parts = [] remaining = size for s in chain: offset = s * self._mini_sector_size if offset + self._mini_sector_size > len(root_data): break chunk = root_data[offset:offset + self._mini_sector_size] take = min(len(chunk), remaining) parts.append(chunk[:take]) remaining -= take if remaining <= 0: break return b''.join(parts) def _mini_chain(self, start: int) -> list[int]: """Follow a mini-FAT chain.""" chain = [] s = start while s not in (ENDOFCHAIN, FREE_SECT): chain.append(s) if s >= len(self._mini_fat): break s = self._mini_fat[s] return chain # ── BIFF8 layer ───────────────────────────────────────────────── def _find_workbook_stream(self) -> tuple[int, int]: """Locate the Workbook/Book stream in the directory. Returns (start_sector, size) or raises FileFormatError. """ for name in ('Workbook', 'Book'): for e in self._directory: if e['name'] == name and e['type'] == STGTY_STREAM: return e['start'], e['size'] raise FileFormatError("No Workbook stream found in OLE2 document") def _parse_biff8(self): """Parse the BIFF8 record stream and populate self._cells.""" start, size = self._find_workbook_stream() # Determine if the stream is small enough to be a mini-stream use_mini = size < 4096 raw = self._read_stream(start, size, use_mini=use_mini) pos = 0 while pos + 4 <= len(raw): opcode = struct.unpack_from(' len(raw): break record_data = raw[pos:pos + length] pos += length if opcode == SST: self._parse_sst(record_data) elif opcode == LABELSST: self._parse_labelsst(record_data) elif opcode == NUMBER: self._parse_number(record_data) elif opcode == FORMULA: self._parse_formula(record_data) elif opcode == RK: self._parse_rk(record_data) elif opcode == MULRK: self._parse_mulrk(record_data) elif opcode == LABEL: self._parse_label(record_data) elif opcode == EOF: break # ── SST parser ────────────────────────────────────────────────── def _parse_sst(self, data: bytes): """Parse the Shared Strings Table.""" if len(data) < 8: return cst_total = struct.unpack_from(' len(data): break cch = struct.unpack_from('= len(data): break flags = data[offset] offset += 1 is_16bit = bool(flags & 0x08) has_rich = bool(flags & 0x04) has_ext = bool(flags & 0x10) # Skip extended formatting (run count) if has_rich and offset + 2 <= len(data): iset = struct.unpack_from(' len(data): break if is_16bit: text = data[offset:offset + byte_count].decode('utf-16le', errors='replace') else: text = data[offset:offset + byte_count].decode('cp1252', errors='replace') self._sst.append(text) offset += byte_count # ── Cell record parsers ───────────────────────────────────────── def _parse_labelsst(self, data: bytes): """LABELSST (0x00FD): row(2) + col(2) + xf(2) + sst_index(4).""" if len(data) < 10: return row = struct.unpack_from(' len(data): break # xf = struct.unpack_from(' float: """Decode an RK value to a float.""" if rk & 0x02: # Integer val = (rk >> 2) if rk & 0x01 else rk >> 2 if rk & 0x80000000: val = -((~rk >> 2) & 0x3FFFFFFF) # Actually, the integer encoding: bit 0 = int flag # If bit 0 set, it's a signed 30-bit int int_val = (rk >> 2) & 0x3FFFFFFF if rk & 0x40000000: int_val -= 0x40000000 multiplier = 0.01 if rk & 0x01 else 1.0 return int_val * multiplier else: # Float: reconstruct IEEE 754 double from the 30-bit mantissa # Take the 32-bit rk, set bit 0 and 1 to 0 mantissa = (rk >> 2) & 0x3FFFFFFF if rk & 0x01: mantissa = int(mantissa / 0.01) # Build a double from the upper bits # The RK stores the top 30 bits of the mantissa double_bytes = struct.pack('> 31) & 1 exp = (rk >> 22) & 0x3FF mant = rk & 0x003FFFFF # Reconstruct double # RK uses 30-bit mantissa (bits 2-31 of rk), with implicit leading 1 # and biased exponent if exp == 0 and mant == 0: return 0.0 # Build IEEE 754 double d_sign = sign d_exp = exp + 896 # bias adjustment d_mant = mant << 20 # expand 30-bit to 52-bit # Pack as double packed = (d_sign << 63) | (d_exp << 52) | d_mant packed_bytes = struct.pack(' str: """Format a numeric value as a string.""" if value == int(value) and abs(value) < 1e15: return str(int(value)) return str(value) # ── Module-level convenience function ────────────────────────────── def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]: """Read an .xls file and return {(row, col): str}. Rows and columns are 0-based. A1 → (0, 0). """ return XLSReader(filepath).read_all_cells()