- main.py: 增加show_banner()启动说明、各阶段[INFO]日志、结果摘要、任意键退出 - file_selector.py: 重写为路径输入→验证→空输入弹窗回退→不存在循环重试 - run.bat: 新建启动脚本(chcp 65001, mode con cols=80 lines=20, color 0B, title固定署名, pause) - Code/docs/modification-assessment.md: 修改需求评估文档
490 lines
19 KiB
Python
490 lines
19 KiB
Python
"""XLS (BIFF8) reader — pure Python, zero dependencies.
|
|
|
|
Parses OLE2 compound document + BIFF8 record stream using only
|
|
the ``struct`` module.
|
|
"""
|
|
|
|
import struct
|
|
from typing import Optional
|
|
|
|
from models import FileFormatError
|
|
|
|
|
|
# ── OLE2 constants ─────────────────────────────────────────────────
|
|
|
|
OLE2_SIGNATURE = b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1'
|
|
MSAT_SECT = 0xFFFFFFFE
|
|
FREE_SECT = 0xFFFFFFFF
|
|
ENDOFCHAIN = 0xFFFFFFFE
|
|
|
|
# Directory entry types
|
|
STGTY_INVALID = 0
|
|
STGTY_STORAGE = 1
|
|
STGTY_STREAM = 2
|
|
STGTY_ROOT = 5
|
|
|
|
|
|
# ── BIFF8 record opcodes ──────────────────────────────────────────
|
|
|
|
BOF = 0x0009
|
|
EOF = 0x000A
|
|
SST = 0x0034
|
|
BOUNDSHEET = 0x0085
|
|
DIMENSIONS = 0x0027
|
|
NUMBER = 0x0203
|
|
LABELSST = 0x00FD
|
|
FORMULA = 0x0006
|
|
RK = 0x000C
|
|
MULRK = 0x00BD
|
|
LABEL = 0x0204
|
|
RSTRING = 0x00FD # same as LABELSST in some docs; we handle via SST
|
|
INDEX = 0x00CD
|
|
WINDOW2 = 0x003D
|
|
|
|
|
|
class XLSReader:
|
|
"""Read an .xls (BIFF8) file and return a cell map."""
|
|
|
|
def __init__(self, filepath: str):
|
|
self._filepath = filepath
|
|
self._data: bytes = b''
|
|
self._sector_size: int = 512
|
|
self._mini_sector_size: int = 64
|
|
self._fat: list[int] = []
|
|
self._mini_fat: list[int] = []
|
|
self._directory: list[dict] = []
|
|
self._sst: list[str] = []
|
|
self._cells: dict[tuple[int, int], str] = {}
|
|
|
|
# ── public API ──────────────────────────────────────────────────
|
|
|
|
def read_all_cells(self) -> dict[tuple[int, int], str]:
|
|
"""Return {(row, col): str} for every non-empty cell."""
|
|
self._load_file()
|
|
self._parse_ole2()
|
|
self._find_workbook_stream()
|
|
self._parse_biff8()
|
|
return dict(self._cells)
|
|
|
|
@staticmethod
|
|
def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]:
|
|
"""Convenience function matching the xlsx_reader interface."""
|
|
return XLSReader(filepath).read_all_cells()
|
|
|
|
# ── OLE2 layer ──────────────────────────────────────────────────
|
|
|
|
def _load_file(self):
|
|
with open(self._filepath, 'rb') as f:
|
|
self._data = f.read()
|
|
if len(self._data) < 512:
|
|
raise FileFormatError("File too small to be a valid OLE2 document")
|
|
if self._data[:8] != OLE2_SIGNATURE:
|
|
raise FileFormatError("Not a valid OLE2 compound document")
|
|
|
|
def _parse_ole2(self):
|
|
"""Parse the OLE2 header, FAT, directory, and MiniFAT."""
|
|
hdr = self._data[:512]
|
|
|
|
# Sector size (usually 512 → shift=9, 4096 → shift=12)
|
|
ss_shift = struct.unpack_from('<H', hdr, 30)[0]
|
|
self._sector_size = 1 << ss_shift
|
|
|
|
# Mini sector size (always 64)
|
|
self._mini_sector_size = 1 << struct.unpack_from('<H', hdr, 32)[0]
|
|
|
|
# FAT
|
|
csect_fat = struct.unpack_from('<I', hdr, 44)[0]
|
|
csect_dir = struct.unpack_from('<I', hdr, 48)[0]
|
|
sect_dir_start = struct.unpack_from('<I', hdr, 56)[0]
|
|
sect_fat_start = struct.unpack_from('<I', hdr, 68)[0]
|
|
|
|
# MSAT (first 109 entries are in the header)
|
|
msat_header = struct.unpack_from('<109I', hdr, 76)
|
|
msat: list[int] = list(msat_header)
|
|
|
|
# Additional MSAT sectors (if any)
|
|
sect_msat_next = struct.unpack_from('<I', hdr, 68 + 4)[0] # offset 72
|
|
while sect_msat_next not in (ENDOFCHAIN, FREE_SECT):
|
|
block = self._read_sector(sect_msat_next)
|
|
entries = list(struct.unpack_from(f'<{127}I', block))
|
|
msat.extend(entries[:-1])
|
|
sect_msat_next = entries[127]
|
|
|
|
# Read FAT sectors
|
|
self._fat = [0] * max(csect_fat * (self._sector_size // 4), 1)
|
|
for i in range(csect_fat):
|
|
if i < len(msat) and msat[i] not in (ENDOFCHAIN, FREE_SECT):
|
|
block = self._read_sector(msat[i])
|
|
offset = i * (self._sector_size // 4)
|
|
count = self._sector_size // 4
|
|
chunk = struct.unpack_from(f'<{count}I', block)
|
|
self._fat[offset:offset + count] = list(chunk)
|
|
|
|
# Read directory entries
|
|
self._directory = []
|
|
sect = sect_dir_start
|
|
while sect not in (ENDOFCHAIN, FREE_SECT):
|
|
block = self._read_sector(sect)
|
|
for j in range(0, self._sector_size, 128):
|
|
entry_data = block[j:j + 128]
|
|
if len(entry_data) < 128:
|
|
break
|
|
name_len = struct.unpack_from('<H', entry_data, 64)[0]
|
|
if name_len == 0:
|
|
continue
|
|
name_utf16 = entry_data[:62].decode('utf-16le', errors='ignore')
|
|
name = name_utf16[:name_len]
|
|
entry = {
|
|
'name': name,
|
|
'type': struct.unpack_from('<B', entry_data, 66)[0],
|
|
'start': struct.unpack_from('<I', entry_data, 116)[0],
|
|
'size': struct.unpack_from('<I', entry_data, 120)[0],
|
|
}
|
|
self._directory.append(entry)
|
|
sect = self._fat[sect] if sect < len(self._fat) else ENDOFCHAIN
|
|
|
|
# MiniFAT
|
|
csect_mini_fat = struct.unpack_from('<I', hdr, 60)[0]
|
|
sect_mini_fat_start = struct.unpack_from('<I', hdr, 64)[0]
|
|
if csect_mini_fat > 0 and sect_mini_fat_start not in (ENDOFCHAIN, FREE_SECT):
|
|
self._mini_fat = []
|
|
for ms in self._chain(sect_mini_fat_start):
|
|
block = self._read_sector(ms)
|
|
count = self._sector_size // 4
|
|
self._mini_fat.extend(struct.unpack_from(f'<{count}I', block))
|
|
|
|
def _chain(self, start: int) -> list[int]:
|
|
"""Follow a sector chain starting at *start*."""
|
|
chain = []
|
|
s = start
|
|
while s not in (ENDOFCHAIN, FREE_SECT):
|
|
chain.append(s)
|
|
if s >= len(self._fat):
|
|
break
|
|
s = self._fat[s]
|
|
return chain
|
|
|
|
def _read_sector(self, sect: int) -> bytes:
|
|
"""Return the raw bytes of sector *sect*."""
|
|
offset = 512 + sect * self._sector_size
|
|
return self._data[offset:offset + self._sector_size]
|
|
|
|
def _read_stream(self, start: int, size: int, use_mini: bool = False) -> bytes:
|
|
"""Read a stream given its starting sector and total size."""
|
|
if use_mini:
|
|
return self._read_mini_stream(start, size)
|
|
chain = self._chain(start)
|
|
parts = []
|
|
remaining = size
|
|
for s in chain:
|
|
chunk = self._read_sector(s)
|
|
take = min(len(chunk), remaining)
|
|
parts.append(chunk[:take])
|
|
remaining -= take
|
|
if remaining <= 0:
|
|
break
|
|
return b''.join(parts)
|
|
|
|
def _read_mini_stream(self, start: int, size: int) -> bytes:
|
|
"""Read a mini-stream (stored in the mini FAT area)."""
|
|
# Find the "Root Entry" stream which holds mini-stream data
|
|
root_entry = None
|
|
for e in self._directory:
|
|
if e['type'] == STGTY_ROOT:
|
|
root_entry = e
|
|
break
|
|
if root_entry is None:
|
|
raise FileFormatError("Cannot find Root Entry in OLE2 directory")
|
|
|
|
root_data = self._read_stream(root_entry['start'], root_entry['size'])
|
|
chain = self._mini_chain(start)
|
|
parts = []
|
|
remaining = size
|
|
for s in chain:
|
|
offset = s * self._mini_sector_size
|
|
if offset + self._mini_sector_size > len(root_data):
|
|
break
|
|
chunk = root_data[offset:offset + self._mini_sector_size]
|
|
take = min(len(chunk), remaining)
|
|
parts.append(chunk[:take])
|
|
remaining -= take
|
|
if remaining <= 0:
|
|
break
|
|
return b''.join(parts)
|
|
|
|
def _mini_chain(self, start: int) -> list[int]:
|
|
"""Follow a mini-FAT chain."""
|
|
chain = []
|
|
s = start
|
|
while s not in (ENDOFCHAIN, FREE_SECT):
|
|
chain.append(s)
|
|
if s >= len(self._mini_fat):
|
|
break
|
|
s = self._mini_fat[s]
|
|
return chain
|
|
|
|
# ── BIFF8 layer ─────────────────────────────────────────────────
|
|
|
|
def _find_workbook_stream(self) -> tuple[int, int]:
|
|
"""Locate the Workbook/Book stream in the directory.
|
|
|
|
Returns (start_sector, size) or raises FileFormatError.
|
|
"""
|
|
for name in ('Workbook', 'Book'):
|
|
for e in self._directory:
|
|
if e['name'] == name and e['type'] == STGTY_STREAM:
|
|
return e['start'], e['size']
|
|
raise FileFormatError("No Workbook stream found in OLE2 document")
|
|
|
|
def _parse_biff8(self):
|
|
"""Parse the BIFF8 record stream and populate self._cells."""
|
|
start, size = self._find_workbook_stream()
|
|
# Determine if the stream is small enough to be a mini-stream
|
|
use_mini = size < 4096
|
|
raw = self._read_stream(start, size, use_mini=use_mini)
|
|
|
|
pos = 0
|
|
while pos + 4 <= len(raw):
|
|
opcode = struct.unpack_from('<H', raw, pos)[0]
|
|
length = struct.unpack_from('<H', raw, pos + 2)[0]
|
|
pos += 4
|
|
if pos + length > len(raw):
|
|
break
|
|
record_data = raw[pos:pos + length]
|
|
pos += length
|
|
|
|
if opcode == SST:
|
|
self._parse_sst(record_data)
|
|
elif opcode == LABELSST:
|
|
self._parse_labelsst(record_data)
|
|
elif opcode == NUMBER:
|
|
self._parse_number(record_data)
|
|
elif opcode == FORMULA:
|
|
self._parse_formula(record_data)
|
|
elif opcode == RK:
|
|
self._parse_rk(record_data)
|
|
elif opcode == MULRK:
|
|
self._parse_mulrk(record_data)
|
|
elif opcode == LABEL:
|
|
self._parse_label(record_data)
|
|
elif opcode == EOF:
|
|
break
|
|
|
|
# ── SST parser ──────────────────────────────────────────────────
|
|
|
|
def _parse_sst(self, data: bytes):
|
|
"""Parse the Shared Strings Table."""
|
|
if len(data) < 8:
|
|
return
|
|
cst_total = struct.unpack_from('<I', data, 0)[0]
|
|
# cst_unique = struct.unpack_from('<I', data, 4)[0] # not needed
|
|
|
|
offset = 8
|
|
for _ in range(cst_total):
|
|
if offset + 2 > len(data):
|
|
break
|
|
cch = struct.unpack_from('<H', data, offset)[0]
|
|
offset += 2
|
|
|
|
if offset >= len(data):
|
|
break
|
|
flags = data[offset]
|
|
offset += 1
|
|
|
|
is_16bit = bool(flags & 0x08)
|
|
has_rich = bool(flags & 0x04)
|
|
has_ext = bool(flags & 0x10)
|
|
|
|
# Skip extended formatting (run count)
|
|
if has_rich and offset + 2 <= len(data):
|
|
iset = struct.unpack_from('<H', data, offset)[0]
|
|
offset += 2 + iset * 4 # 4 bytes per format run
|
|
|
|
# Skip extended string (Asian phonetic)
|
|
if has_ext and offset + 4 <= len(data):
|
|
ext_size = struct.unpack_from('<I', data, offset)[0]
|
|
offset += 4 + ext_size
|
|
|
|
# Read the string characters
|
|
if is_16bit:
|
|
byte_count = cch * 2
|
|
else:
|
|
byte_count = cch
|
|
|
|
if offset + byte_count > len(data):
|
|
break
|
|
|
|
if is_16bit:
|
|
text = data[offset:offset + byte_count].decode('utf-16le', errors='replace')
|
|
else:
|
|
text = data[offset:offset + byte_count].decode('cp1252', errors='replace')
|
|
|
|
self._sst.append(text)
|
|
offset += byte_count
|
|
|
|
# ── Cell record parsers ─────────────────────────────────────────
|
|
|
|
def _parse_labelsst(self, data: bytes):
|
|
"""LABELSST (0x00FD): row(2) + col(2) + xf(2) + sst_index(4)."""
|
|
if len(data) < 10:
|
|
return
|
|
row = struct.unpack_from('<H', data, 0)[0]
|
|
col = struct.unpack_from('<H', data, 2)[0]
|
|
sst_idx = struct.unpack_from('<I', data, 6)[0]
|
|
if sst_idx < len(self._sst):
|
|
self._cells[(row, col)] = self._sst[sst_idx]
|
|
|
|
def _parse_number(self, data: bytes):
|
|
"""NUMBER (0x0203): row(2) + col(2) + xf(2) + float(8)."""
|
|
if len(data) < 14:
|
|
return
|
|
row = struct.unpack_from('<H', data, 0)[0]
|
|
col = struct.unpack_from('<H', data, 2)[0]
|
|
value = struct.unpack_from('<d', data, 6)[0]
|
|
self._cells[(row, col)] = self._format_number(value)
|
|
|
|
def _parse_formula(self, data: bytes):
|
|
"""FORMULA (0x0006): row(2) + col(2) + xf(2) + result(8) + ...
|
|
|
|
The result bytes can encode a string, number, boolean, or error.
|
|
We check the first two bytes of the result to determine type.
|
|
"""
|
|
if len(data) < 20:
|
|
return
|
|
row = struct.unpack_from('<H', data, 0)[0]
|
|
col = struct.unpack_from('<H', data, 2)[0]
|
|
result_bytes = data[4:12]
|
|
|
|
# Check for string result (first two bytes are 0xFFFF)
|
|
if result_bytes[:2] == b'\xff\xff':
|
|
# The actual string comes in a following STRING record
|
|
return
|
|
|
|
# Try as double
|
|
value = struct.unpack_from('<d', result_bytes, 0)[0]
|
|
self._cells[(row, col)] = self._format_number(value)
|
|
|
|
def _parse_rk(self, data: bytes):
|
|
"""RK (0x000C): row(2) + col(2) + xf(2) + rk(4)."""
|
|
if len(data) < 10:
|
|
return
|
|
row = struct.unpack_from('<H', data, 0)[0]
|
|
col = struct.unpack_from('<H', data, 2)[0]
|
|
rk_val = struct.unpack_from('<I', data, 6)[0]
|
|
value = self._decode_rk(rk_val)
|
|
self._cells[(row, col)] = self._format_number(value)
|
|
|
|
def _parse_mulrk(self, data: bytes):
|
|
"""MULRK (0x00BD): row(2) + col_first(2) + (xf(2)+rk(4))*n + col_last(2)."""
|
|
if len(data) < 6:
|
|
return
|
|
row = struct.unpack_from('<H', data, 0)[0]
|
|
col_first = struct.unpack_from('<H', data, 2)[0]
|
|
col_last = struct.unpack_from('<H', data, -2)[0]
|
|
n = col_last - col_first + 1
|
|
pos = 4
|
|
for i in range(n):
|
|
if pos + 6 > len(data):
|
|
break
|
|
# xf = struct.unpack_from('<H', data, pos)[0] # not needed
|
|
rk_val = struct.unpack_from('<I', data, pos + 2)[0]
|
|
value = self._decode_rk(rk_val)
|
|
self._cells[(row, col_first + i)] = self._format_number(value)
|
|
pos += 6
|
|
|
|
def _parse_label(self, data: bytes):
|
|
"""LABEL (0x0204): row(2) + col(2) + xf(2) + cch(2) + ...
|
|
|
|
Deprecated but sometimes present. Internal string, not SST.
|
|
"""
|
|
if len(data) < 6:
|
|
return
|
|
row = struct.unpack_from('<H', data, 0)[0]
|
|
col = struct.unpack_from('<H', data, 2)[0]
|
|
cch = struct.unpack_from('<H', data, 4)[0]
|
|
if len(data) < 6 + cch:
|
|
return
|
|
flags = data[6] if 6 < len(data) else 0
|
|
offset = 7
|
|
if flags & 0x01:
|
|
# 16-bit
|
|
text = data[offset:offset + cch * 2].decode('utf-16le', errors='replace')
|
|
else:
|
|
text = data[offset:offset + cch].decode('cp1252', errors='replace')
|
|
self._cells[(row, col)] = text
|
|
|
|
# ── Helpers ─────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def _decode_rk(rk: int) -> float:
|
|
"""Decode an RK value to a float."""
|
|
if rk & 0x02:
|
|
# Integer
|
|
val = (rk >> 2) if rk & 0x01 else rk >> 2
|
|
if rk & 0x80000000:
|
|
val = -((~rk >> 2) & 0x3FFFFFFF)
|
|
# Actually, the integer encoding: bit 0 = int flag
|
|
# If bit 0 set, it's a signed 30-bit int
|
|
int_val = (rk >> 2) & 0x3FFFFFFF
|
|
if rk & 0x40000000:
|
|
int_val -= 0x40000000
|
|
multiplier = 0.01 if rk & 0x01 else 1.0
|
|
return int_val * multiplier
|
|
else:
|
|
# Float: reconstruct IEEE 754 double from the 30-bit mantissa
|
|
# Take the 32-bit rk, set bit 0 and 1 to 0
|
|
mantissa = (rk >> 2) & 0x3FFFFFFF
|
|
if rk & 0x01:
|
|
mantissa = int(mantissa / 0.01)
|
|
# Build a double from the upper bits
|
|
# The RK stores the top 30 bits of the mantissa
|
|
double_bytes = struct.pack('<I', rk & 0xFFFFFFFC | 0x00000002)
|
|
# Actually, proper RK decoding:
|
|
# If bit 1 is 0 → it's a float stored in a compressed form
|
|
# Reconstruct: take 32-bit value, set bits 0-1 to 0, prepend 0x00000002
|
|
raw = (rk & 0xFFFFFFFC) | 0x00000000
|
|
# The RK float is stored as: sign(1) + exp(11) + mantissa(30)
|
|
# padded to 32 bits. We need to expand to 64-bit double.
|
|
# Simplified: treat as a special encoding
|
|
if rk & 0x01:
|
|
multiplier = 0.01
|
|
else:
|
|
multiplier = 1.0
|
|
# Proper decoding using bit manipulation
|
|
sign = (rk >> 31) & 1
|
|
exp = (rk >> 22) & 0x3FF
|
|
mant = rk & 0x003FFFFF
|
|
|
|
# Reconstruct double
|
|
# RK uses 30-bit mantissa (bits 2-31 of rk), with implicit leading 1
|
|
# and biased exponent
|
|
if exp == 0 and mant == 0:
|
|
return 0.0
|
|
# Build IEEE 754 double
|
|
d_sign = sign
|
|
d_exp = exp + 896 # bias adjustment
|
|
d_mant = mant << 20 # expand 30-bit to 52-bit
|
|
|
|
# Pack as double
|
|
packed = (d_sign << 63) | (d_exp << 52) | d_mant
|
|
packed_bytes = struct.pack('<Q', packed)
|
|
value = struct.unpack_from('<d', packed_bytes, 0)[0]
|
|
return value * multiplier
|
|
|
|
@staticmethod
|
|
def _format_number(value: float) -> str:
|
|
"""Format a numeric value as a string."""
|
|
if value == int(value) and abs(value) < 1e15:
|
|
return str(int(value))
|
|
return str(value)
|
|
|
|
|
|
# ── Module-level convenience function ──────────────────────────────
|
|
|
|
def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]:
|
|
"""Read an .xls file and return {(row, col): str}.
|
|
|
|
Rows and columns are 0-based. A1 → (0, 0).
|
|
"""
|
|
return XLSReader(filepath).read_all_cells()
|