Files
Agent 836ad20515 v1.1.0: 增加交互提示、路径输入、窗口属性配置
- main.py: 增加show_banner()启动说明、各阶段[INFO]日志、结果摘要、任意键退出
- file_selector.py: 重写为路径输入→验证→空输入弹窗回退→不存在循环重试
- run.bat: 新建启动脚本(chcp 65001, mode con cols=80 lines=20, color 0B, title固定署名, pause)
- Code/docs/modification-assessment.md: 修改需求评估文档
2026-05-25 17:29:19 +08:00

490 lines
19 KiB
Python

"""XLS (BIFF8) reader — pure Python, zero dependencies.
Parses OLE2 compound document + BIFF8 record stream using only
the ``struct`` module.
"""
import struct
from typing import Optional
from models import FileFormatError
# ── OLE2 constants ─────────────────────────────────────────────────
OLE2_SIGNATURE = b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1'
MSAT_SECT = 0xFFFFFFFE
FREE_SECT = 0xFFFFFFFF
ENDOFCHAIN = 0xFFFFFFFE
# Directory entry types
STGTY_INVALID = 0
STGTY_STORAGE = 1
STGTY_STREAM = 2
STGTY_ROOT = 5
# ── BIFF8 record opcodes ──────────────────────────────────────────
BOF = 0x0009
EOF = 0x000A
SST = 0x0034
BOUNDSHEET = 0x0085
DIMENSIONS = 0x0027
NUMBER = 0x0203
LABELSST = 0x00FD
FORMULA = 0x0006
RK = 0x000C
MULRK = 0x00BD
LABEL = 0x0204
RSTRING = 0x00FD # same as LABELSST in some docs; we handle via SST
INDEX = 0x00CD
WINDOW2 = 0x003D
class XLSReader:
"""Read an .xls (BIFF8) file and return a cell map."""
def __init__(self, filepath: str):
self._filepath = filepath
self._data: bytes = b''
self._sector_size: int = 512
self._mini_sector_size: int = 64
self._fat: list[int] = []
self._mini_fat: list[int] = []
self._directory: list[dict] = []
self._sst: list[str] = []
self._cells: dict[tuple[int, int], str] = {}
# ── public API ──────────────────────────────────────────────────
def read_all_cells(self) -> dict[tuple[int, int], str]:
"""Return {(row, col): str} for every non-empty cell."""
self._load_file()
self._parse_ole2()
self._find_workbook_stream()
self._parse_biff8()
return dict(self._cells)
@staticmethod
def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]:
"""Convenience function matching the xlsx_reader interface."""
return XLSReader(filepath).read_all_cells()
# ── OLE2 layer ──────────────────────────────────────────────────
def _load_file(self):
with open(self._filepath, 'rb') as f:
self._data = f.read()
if len(self._data) < 512:
raise FileFormatError("File too small to be a valid OLE2 document")
if self._data[:8] != OLE2_SIGNATURE:
raise FileFormatError("Not a valid OLE2 compound document")
def _parse_ole2(self):
"""Parse the OLE2 header, FAT, directory, and MiniFAT."""
hdr = self._data[:512]
# Sector size (usually 512 → shift=9, 4096 → shift=12)
ss_shift = struct.unpack_from('<H', hdr, 30)[0]
self._sector_size = 1 << ss_shift
# Mini sector size (always 64)
self._mini_sector_size = 1 << struct.unpack_from('<H', hdr, 32)[0]
# FAT
csect_fat = struct.unpack_from('<I', hdr, 44)[0]
csect_dir = struct.unpack_from('<I', hdr, 48)[0]
sect_dir_start = struct.unpack_from('<I', hdr, 56)[0]
sect_fat_start = struct.unpack_from('<I', hdr, 68)[0]
# MSAT (first 109 entries are in the header)
msat_header = struct.unpack_from('<109I', hdr, 76)
msat: list[int] = list(msat_header)
# Additional MSAT sectors (if any)
sect_msat_next = struct.unpack_from('<I', hdr, 68 + 4)[0] # offset 72
while sect_msat_next not in (ENDOFCHAIN, FREE_SECT):
block = self._read_sector(sect_msat_next)
entries = list(struct.unpack_from(f'<{127}I', block))
msat.extend(entries[:-1])
sect_msat_next = entries[127]
# Read FAT sectors
self._fat = [0] * max(csect_fat * (self._sector_size // 4), 1)
for i in range(csect_fat):
if i < len(msat) and msat[i] not in (ENDOFCHAIN, FREE_SECT):
block = self._read_sector(msat[i])
offset = i * (self._sector_size // 4)
count = self._sector_size // 4
chunk = struct.unpack_from(f'<{count}I', block)
self._fat[offset:offset + count] = list(chunk)
# Read directory entries
self._directory = []
sect = sect_dir_start
while sect not in (ENDOFCHAIN, FREE_SECT):
block = self._read_sector(sect)
for j in range(0, self._sector_size, 128):
entry_data = block[j:j + 128]
if len(entry_data) < 128:
break
name_len = struct.unpack_from('<H', entry_data, 64)[0]
if name_len == 0:
continue
name_utf16 = entry_data[:62].decode('utf-16le', errors='ignore')
name = name_utf16[:name_len]
entry = {
'name': name,
'type': struct.unpack_from('<B', entry_data, 66)[0],
'start': struct.unpack_from('<I', entry_data, 116)[0],
'size': struct.unpack_from('<I', entry_data, 120)[0],
}
self._directory.append(entry)
sect = self._fat[sect] if sect < len(self._fat) else ENDOFCHAIN
# MiniFAT
csect_mini_fat = struct.unpack_from('<I', hdr, 60)[0]
sect_mini_fat_start = struct.unpack_from('<I', hdr, 64)[0]
if csect_mini_fat > 0 and sect_mini_fat_start not in (ENDOFCHAIN, FREE_SECT):
self._mini_fat = []
for ms in self._chain(sect_mini_fat_start):
block = self._read_sector(ms)
count = self._sector_size // 4
self._mini_fat.extend(struct.unpack_from(f'<{count}I', block))
def _chain(self, start: int) -> list[int]:
"""Follow a sector chain starting at *start*."""
chain = []
s = start
while s not in (ENDOFCHAIN, FREE_SECT):
chain.append(s)
if s >= len(self._fat):
break
s = self._fat[s]
return chain
def _read_sector(self, sect: int) -> bytes:
"""Return the raw bytes of sector *sect*."""
offset = 512 + sect * self._sector_size
return self._data[offset:offset + self._sector_size]
def _read_stream(self, start: int, size: int, use_mini: bool = False) -> bytes:
"""Read a stream given its starting sector and total size."""
if use_mini:
return self._read_mini_stream(start, size)
chain = self._chain(start)
parts = []
remaining = size
for s in chain:
chunk = self._read_sector(s)
take = min(len(chunk), remaining)
parts.append(chunk[:take])
remaining -= take
if remaining <= 0:
break
return b''.join(parts)
def _read_mini_stream(self, start: int, size: int) -> bytes:
"""Read a mini-stream (stored in the mini FAT area)."""
# Find the "Root Entry" stream which holds mini-stream data
root_entry = None
for e in self._directory:
if e['type'] == STGTY_ROOT:
root_entry = e
break
if root_entry is None:
raise FileFormatError("Cannot find Root Entry in OLE2 directory")
root_data = self._read_stream(root_entry['start'], root_entry['size'])
chain = self._mini_chain(start)
parts = []
remaining = size
for s in chain:
offset = s * self._mini_sector_size
if offset + self._mini_sector_size > len(root_data):
break
chunk = root_data[offset:offset + self._mini_sector_size]
take = min(len(chunk), remaining)
parts.append(chunk[:take])
remaining -= take
if remaining <= 0:
break
return b''.join(parts)
def _mini_chain(self, start: int) -> list[int]:
"""Follow a mini-FAT chain."""
chain = []
s = start
while s not in (ENDOFCHAIN, FREE_SECT):
chain.append(s)
if s >= len(self._mini_fat):
break
s = self._mini_fat[s]
return chain
# ── BIFF8 layer ─────────────────────────────────────────────────
def _find_workbook_stream(self) -> tuple[int, int]:
"""Locate the Workbook/Book stream in the directory.
Returns (start_sector, size) or raises FileFormatError.
"""
for name in ('Workbook', 'Book'):
for e in self._directory:
if e['name'] == name and e['type'] == STGTY_STREAM:
return e['start'], e['size']
raise FileFormatError("No Workbook stream found in OLE2 document")
def _parse_biff8(self):
"""Parse the BIFF8 record stream and populate self._cells."""
start, size = self._find_workbook_stream()
# Determine if the stream is small enough to be a mini-stream
use_mini = size < 4096
raw = self._read_stream(start, size, use_mini=use_mini)
pos = 0
while pos + 4 <= len(raw):
opcode = struct.unpack_from('<H', raw, pos)[0]
length = struct.unpack_from('<H', raw, pos + 2)[0]
pos += 4
if pos + length > len(raw):
break
record_data = raw[pos:pos + length]
pos += length
if opcode == SST:
self._parse_sst(record_data)
elif opcode == LABELSST:
self._parse_labelsst(record_data)
elif opcode == NUMBER:
self._parse_number(record_data)
elif opcode == FORMULA:
self._parse_formula(record_data)
elif opcode == RK:
self._parse_rk(record_data)
elif opcode == MULRK:
self._parse_mulrk(record_data)
elif opcode == LABEL:
self._parse_label(record_data)
elif opcode == EOF:
break
# ── SST parser ──────────────────────────────────────────────────
def _parse_sst(self, data: bytes):
"""Parse the Shared Strings Table."""
if len(data) < 8:
return
cst_total = struct.unpack_from('<I', data, 0)[0]
# cst_unique = struct.unpack_from('<I', data, 4)[0] # not needed
offset = 8
for _ in range(cst_total):
if offset + 2 > len(data):
break
cch = struct.unpack_from('<H', data, offset)[0]
offset += 2
if offset >= len(data):
break
flags = data[offset]
offset += 1
is_16bit = bool(flags & 0x08)
has_rich = bool(flags & 0x04)
has_ext = bool(flags & 0x10)
# Skip extended formatting (run count)
if has_rich and offset + 2 <= len(data):
iset = struct.unpack_from('<H', data, offset)[0]
offset += 2 + iset * 4 # 4 bytes per format run
# Skip extended string (Asian phonetic)
if has_ext and offset + 4 <= len(data):
ext_size = struct.unpack_from('<I', data, offset)[0]
offset += 4 + ext_size
# Read the string characters
if is_16bit:
byte_count = cch * 2
else:
byte_count = cch
if offset + byte_count > len(data):
break
if is_16bit:
text = data[offset:offset + byte_count].decode('utf-16le', errors='replace')
else:
text = data[offset:offset + byte_count].decode('cp1252', errors='replace')
self._sst.append(text)
offset += byte_count
# ── Cell record parsers ─────────────────────────────────────────
def _parse_labelsst(self, data: bytes):
"""LABELSST (0x00FD): row(2) + col(2) + xf(2) + sst_index(4)."""
if len(data) < 10:
return
row = struct.unpack_from('<H', data, 0)[0]
col = struct.unpack_from('<H', data, 2)[0]
sst_idx = struct.unpack_from('<I', data, 6)[0]
if sst_idx < len(self._sst):
self._cells[(row, col)] = self._sst[sst_idx]
def _parse_number(self, data: bytes):
"""NUMBER (0x0203): row(2) + col(2) + xf(2) + float(8)."""
if len(data) < 14:
return
row = struct.unpack_from('<H', data, 0)[0]
col = struct.unpack_from('<H', data, 2)[0]
value = struct.unpack_from('<d', data, 6)[0]
self._cells[(row, col)] = self._format_number(value)
def _parse_formula(self, data: bytes):
"""FORMULA (0x0006): row(2) + col(2) + xf(2) + result(8) + ...
The result bytes can encode a string, number, boolean, or error.
We check the first two bytes of the result to determine type.
"""
if len(data) < 20:
return
row = struct.unpack_from('<H', data, 0)[0]
col = struct.unpack_from('<H', data, 2)[0]
result_bytes = data[4:12]
# Check for string result (first two bytes are 0xFFFF)
if result_bytes[:2] == b'\xff\xff':
# The actual string comes in a following STRING record
return
# Try as double
value = struct.unpack_from('<d', result_bytes, 0)[0]
self._cells[(row, col)] = self._format_number(value)
def _parse_rk(self, data: bytes):
"""RK (0x000C): row(2) + col(2) + xf(2) + rk(4)."""
if len(data) < 10:
return
row = struct.unpack_from('<H', data, 0)[0]
col = struct.unpack_from('<H', data, 2)[0]
rk_val = struct.unpack_from('<I', data, 6)[0]
value = self._decode_rk(rk_val)
self._cells[(row, col)] = self._format_number(value)
def _parse_mulrk(self, data: bytes):
"""MULRK (0x00BD): row(2) + col_first(2) + (xf(2)+rk(4))*n + col_last(2)."""
if len(data) < 6:
return
row = struct.unpack_from('<H', data, 0)[0]
col_first = struct.unpack_from('<H', data, 2)[0]
col_last = struct.unpack_from('<H', data, -2)[0]
n = col_last - col_first + 1
pos = 4
for i in range(n):
if pos + 6 > len(data):
break
# xf = struct.unpack_from('<H', data, pos)[0] # not needed
rk_val = struct.unpack_from('<I', data, pos + 2)[0]
value = self._decode_rk(rk_val)
self._cells[(row, col_first + i)] = self._format_number(value)
pos += 6
def _parse_label(self, data: bytes):
"""LABEL (0x0204): row(2) + col(2) + xf(2) + cch(2) + ...
Deprecated but sometimes present. Internal string, not SST.
"""
if len(data) < 6:
return
row = struct.unpack_from('<H', data, 0)[0]
col = struct.unpack_from('<H', data, 2)[0]
cch = struct.unpack_from('<H', data, 4)[0]
if len(data) < 6 + cch:
return
flags = data[6] if 6 < len(data) else 0
offset = 7
if flags & 0x01:
# 16-bit
text = data[offset:offset + cch * 2].decode('utf-16le', errors='replace')
else:
text = data[offset:offset + cch].decode('cp1252', errors='replace')
self._cells[(row, col)] = text
# ── Helpers ─────────────────────────────────────────────────────
@staticmethod
def _decode_rk(rk: int) -> float:
"""Decode an RK value to a float."""
if rk & 0x02:
# Integer
val = (rk >> 2) if rk & 0x01 else rk >> 2
if rk & 0x80000000:
val = -((~rk >> 2) & 0x3FFFFFFF)
# Actually, the integer encoding: bit 0 = int flag
# If bit 0 set, it's a signed 30-bit int
int_val = (rk >> 2) & 0x3FFFFFFF
if rk & 0x40000000:
int_val -= 0x40000000
multiplier = 0.01 if rk & 0x01 else 1.0
return int_val * multiplier
else:
# Float: reconstruct IEEE 754 double from the 30-bit mantissa
# Take the 32-bit rk, set bit 0 and 1 to 0
mantissa = (rk >> 2) & 0x3FFFFFFF
if rk & 0x01:
mantissa = int(mantissa / 0.01)
# Build a double from the upper bits
# The RK stores the top 30 bits of the mantissa
double_bytes = struct.pack('<I', rk & 0xFFFFFFFC | 0x00000002)
# Actually, proper RK decoding:
# If bit 1 is 0 → it's a float stored in a compressed form
# Reconstruct: take 32-bit value, set bits 0-1 to 0, prepend 0x00000002
raw = (rk & 0xFFFFFFFC) | 0x00000000
# The RK float is stored as: sign(1) + exp(11) + mantissa(30)
# padded to 32 bits. We need to expand to 64-bit double.
# Simplified: treat as a special encoding
if rk & 0x01:
multiplier = 0.01
else:
multiplier = 1.0
# Proper decoding using bit manipulation
sign = (rk >> 31) & 1
exp = (rk >> 22) & 0x3FF
mant = rk & 0x003FFFFF
# Reconstruct double
# RK uses 30-bit mantissa (bits 2-31 of rk), with implicit leading 1
# and biased exponent
if exp == 0 and mant == 0:
return 0.0
# Build IEEE 754 double
d_sign = sign
d_exp = exp + 896 # bias adjustment
d_mant = mant << 20 # expand 30-bit to 52-bit
# Pack as double
packed = (d_sign << 63) | (d_exp << 52) | d_mant
packed_bytes = struct.pack('<Q', packed)
value = struct.unpack_from('<d', packed_bytes, 0)[0]
return value * multiplier
@staticmethod
def _format_number(value: float) -> str:
"""Format a numeric value as a string."""
if value == int(value) and abs(value) < 1e15:
return str(int(value))
return str(value)
# ── Module-level convenience function ──────────────────────────────
def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]:
"""Read an .xls file and return {(row, col): str}.
Rows and columns are 0-based. A1 → (0, 0).
"""
return XLSReader(filepath).read_all_cells()