v1.1.0: 增加交互提示、路径输入、窗口属性配置
- main.py: 增加show_banner()启动说明、各阶段[INFO]日志、结果摘要、任意键退出 - file_selector.py: 重写为路径输入→验证→空输入弹窗回退→不存在循环重试 - run.bat: 新建启动脚本(chcp 65001, mode con cols=80 lines=20, color 0B, title固定署名, pause) - Code/docs/modification-assessment.md: 修改需求评估文档
This commit is contained in:
489
Releases/v1.0.1/source/xls_reader.py
Normal file
489
Releases/v1.0.1/source/xls_reader.py
Normal file
@@ -0,0 +1,489 @@
|
||||
"""XLS (BIFF8) reader — pure Python, zero dependencies.
|
||||
|
||||
Parses OLE2 compound document + BIFF8 record stream using only
|
||||
the ``struct`` module.
|
||||
"""
|
||||
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
from models import FileFormatError
|
||||
|
||||
|
||||
# ── OLE2 constants ─────────────────────────────────────────────────
|
||||
|
||||
OLE2_SIGNATURE = b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1'
|
||||
MSAT_SECT = 0xFFFFFFFE
|
||||
FREE_SECT = 0xFFFFFFFF
|
||||
ENDOFCHAIN = 0xFFFFFFFE
|
||||
|
||||
# Directory entry types
|
||||
STGTY_INVALID = 0
|
||||
STGTY_STORAGE = 1
|
||||
STGTY_STREAM = 2
|
||||
STGTY_ROOT = 5
|
||||
|
||||
|
||||
# ── BIFF8 record opcodes ──────────────────────────────────────────
|
||||
|
||||
BOF = 0x0009
|
||||
EOF = 0x000A
|
||||
SST = 0x0034
|
||||
BOUNDSHEET = 0x0085
|
||||
DIMENSIONS = 0x0027
|
||||
NUMBER = 0x0203
|
||||
LABELSST = 0x00FD
|
||||
FORMULA = 0x0006
|
||||
RK = 0x000C
|
||||
MULRK = 0x00BD
|
||||
LABEL = 0x0204
|
||||
RSTRING = 0x00FD # same as LABELSST in some docs; we handle via SST
|
||||
INDEX = 0x00CD
|
||||
WINDOW2 = 0x003D
|
||||
|
||||
|
||||
class XLSReader:
|
||||
"""Read an .xls (BIFF8) file and return a cell map."""
|
||||
|
||||
def __init__(self, filepath: str):
|
||||
self._filepath = filepath
|
||||
self._data: bytes = b''
|
||||
self._sector_size: int = 512
|
||||
self._mini_sector_size: int = 64
|
||||
self._fat: list[int] = []
|
||||
self._mini_fat: list[int] = []
|
||||
self._directory: list[dict] = []
|
||||
self._sst: list[str] = []
|
||||
self._cells: dict[tuple[int, int], str] = {}
|
||||
|
||||
# ── public API ──────────────────────────────────────────────────
|
||||
|
||||
def read_all_cells(self) -> dict[tuple[int, int], str]:
|
||||
"""Return {(row, col): str} for every non-empty cell."""
|
||||
self._load_file()
|
||||
self._parse_ole2()
|
||||
self._find_workbook_stream()
|
||||
self._parse_biff8()
|
||||
return dict(self._cells)
|
||||
|
||||
@staticmethod
|
||||
def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]:
|
||||
"""Convenience function matching the xlsx_reader interface."""
|
||||
return XLSReader(filepath).read_all_cells()
|
||||
|
||||
# ── OLE2 layer ──────────────────────────────────────────────────
|
||||
|
||||
def _load_file(self):
|
||||
with open(self._filepath, 'rb') as f:
|
||||
self._data = f.read()
|
||||
if len(self._data) < 512:
|
||||
raise FileFormatError("File too small to be a valid OLE2 document")
|
||||
if self._data[:8] != OLE2_SIGNATURE:
|
||||
raise FileFormatError("Not a valid OLE2 compound document")
|
||||
|
||||
def _parse_ole2(self):
|
||||
"""Parse the OLE2 header, FAT, directory, and MiniFAT."""
|
||||
hdr = self._data[:512]
|
||||
|
||||
# Sector size (usually 512 → shift=9, 4096 → shift=12)
|
||||
ss_shift = struct.unpack_from('<H', hdr, 30)[0]
|
||||
self._sector_size = 1 << ss_shift
|
||||
|
||||
# Mini sector size (always 64)
|
||||
self._mini_sector_size = 1 << struct.unpack_from('<H', hdr, 32)[0]
|
||||
|
||||
# FAT
|
||||
csect_fat = struct.unpack_from('<I', hdr, 44)[0]
|
||||
csect_dir = struct.unpack_from('<I', hdr, 48)[0]
|
||||
sect_dir_start = struct.unpack_from('<I', hdr, 56)[0]
|
||||
sect_fat_start = struct.unpack_from('<I', hdr, 68)[0]
|
||||
|
||||
# MSAT (first 109 entries are in the header)
|
||||
msat_header = struct.unpack_from('<109I', hdr, 76)
|
||||
msat: list[int] = list(msat_header)
|
||||
|
||||
# Additional MSAT sectors (if any)
|
||||
sect_msat_next = struct.unpack_from('<I', hdr, 68 + 4)[0] # offset 72
|
||||
while sect_msat_next not in (ENDOFCHAIN, FREE_SECT):
|
||||
block = self._read_sector(sect_msat_next)
|
||||
entries = list(struct.unpack_from(f'<{127}I', block))
|
||||
msat.extend(entries[:-1])
|
||||
sect_msat_next = entries[127]
|
||||
|
||||
# Read FAT sectors
|
||||
self._fat = [0] * max(csect_fat * (self._sector_size // 4), 1)
|
||||
for i in range(csect_fat):
|
||||
if i < len(msat) and msat[i] not in (ENDOFCHAIN, FREE_SECT):
|
||||
block = self._read_sector(msat[i])
|
||||
offset = i * (self._sector_size // 4)
|
||||
count = self._sector_size // 4
|
||||
chunk = struct.unpack_from(f'<{count}I', block)
|
||||
self._fat[offset:offset + count] = list(chunk)
|
||||
|
||||
# Read directory entries
|
||||
self._directory = []
|
||||
sect = sect_dir_start
|
||||
while sect not in (ENDOFCHAIN, FREE_SECT):
|
||||
block = self._read_sector(sect)
|
||||
for j in range(0, self._sector_size, 128):
|
||||
entry_data = block[j:j + 128]
|
||||
if len(entry_data) < 128:
|
||||
break
|
||||
name_len = struct.unpack_from('<H', entry_data, 64)[0]
|
||||
if name_len == 0:
|
||||
continue
|
||||
name_utf16 = entry_data[:62].decode('utf-16le', errors='ignore')
|
||||
name = name_utf16[:name_len]
|
||||
entry = {
|
||||
'name': name,
|
||||
'type': struct.unpack_from('<B', entry_data, 66)[0],
|
||||
'start': struct.unpack_from('<I', entry_data, 116)[0],
|
||||
'size': struct.unpack_from('<I', entry_data, 120)[0],
|
||||
}
|
||||
self._directory.append(entry)
|
||||
sect = self._fat[sect] if sect < len(self._fat) else ENDOFCHAIN
|
||||
|
||||
# MiniFAT
|
||||
csect_mini_fat = struct.unpack_from('<I', hdr, 60)[0]
|
||||
sect_mini_fat_start = struct.unpack_from('<I', hdr, 64)[0]
|
||||
if csect_mini_fat > 0 and sect_mini_fat_start not in (ENDOFCHAIN, FREE_SECT):
|
||||
self._mini_fat = []
|
||||
for ms in self._chain(sect_mini_fat_start):
|
||||
block = self._read_sector(ms)
|
||||
count = self._sector_size // 4
|
||||
self._mini_fat.extend(struct.unpack_from(f'<{count}I', block))
|
||||
|
||||
def _chain(self, start: int) -> list[int]:
|
||||
"""Follow a sector chain starting at *start*."""
|
||||
chain = []
|
||||
s = start
|
||||
while s not in (ENDOFCHAIN, FREE_SECT):
|
||||
chain.append(s)
|
||||
if s >= len(self._fat):
|
||||
break
|
||||
s = self._fat[s]
|
||||
return chain
|
||||
|
||||
def _read_sector(self, sect: int) -> bytes:
|
||||
"""Return the raw bytes of sector *sect*."""
|
||||
offset = 512 + sect * self._sector_size
|
||||
return self._data[offset:offset + self._sector_size]
|
||||
|
||||
def _read_stream(self, start: int, size: int, use_mini: bool = False) -> bytes:
|
||||
"""Read a stream given its starting sector and total size."""
|
||||
if use_mini:
|
||||
return self._read_mini_stream(start, size)
|
||||
chain = self._chain(start)
|
||||
parts = []
|
||||
remaining = size
|
||||
for s in chain:
|
||||
chunk = self._read_sector(s)
|
||||
take = min(len(chunk), remaining)
|
||||
parts.append(chunk[:take])
|
||||
remaining -= take
|
||||
if remaining <= 0:
|
||||
break
|
||||
return b''.join(parts)
|
||||
|
||||
def _read_mini_stream(self, start: int, size: int) -> bytes:
|
||||
"""Read a mini-stream (stored in the mini FAT area)."""
|
||||
# Find the "Root Entry" stream which holds mini-stream data
|
||||
root_entry = None
|
||||
for e in self._directory:
|
||||
if e['type'] == STGTY_ROOT:
|
||||
root_entry = e
|
||||
break
|
||||
if root_entry is None:
|
||||
raise FileFormatError("Cannot find Root Entry in OLE2 directory")
|
||||
|
||||
root_data = self._read_stream(root_entry['start'], root_entry['size'])
|
||||
chain = self._mini_chain(start)
|
||||
parts = []
|
||||
remaining = size
|
||||
for s in chain:
|
||||
offset = s * self._mini_sector_size
|
||||
if offset + self._mini_sector_size > len(root_data):
|
||||
break
|
||||
chunk = root_data[offset:offset + self._mini_sector_size]
|
||||
take = min(len(chunk), remaining)
|
||||
parts.append(chunk[:take])
|
||||
remaining -= take
|
||||
if remaining <= 0:
|
||||
break
|
||||
return b''.join(parts)
|
||||
|
||||
def _mini_chain(self, start: int) -> list[int]:
|
||||
"""Follow a mini-FAT chain."""
|
||||
chain = []
|
||||
s = start
|
||||
while s not in (ENDOFCHAIN, FREE_SECT):
|
||||
chain.append(s)
|
||||
if s >= len(self._mini_fat):
|
||||
break
|
||||
s = self._mini_fat[s]
|
||||
return chain
|
||||
|
||||
# ── BIFF8 layer ─────────────────────────────────────────────────
|
||||
|
||||
def _find_workbook_stream(self) -> tuple[int, int]:
|
||||
"""Locate the Workbook/Book stream in the directory.
|
||||
|
||||
Returns (start_sector, size) or raises FileFormatError.
|
||||
"""
|
||||
for name in ('Workbook', 'Book'):
|
||||
for e in self._directory:
|
||||
if e['name'] == name and e['type'] == STGTY_STREAM:
|
||||
return e['start'], e['size']
|
||||
raise FileFormatError("No Workbook stream found in OLE2 document")
|
||||
|
||||
def _parse_biff8(self):
|
||||
"""Parse the BIFF8 record stream and populate self._cells."""
|
||||
start, size = self._find_workbook_stream()
|
||||
# Determine if the stream is small enough to be a mini-stream
|
||||
use_mini = size < 4096
|
||||
raw = self._read_stream(start, size, use_mini=use_mini)
|
||||
|
||||
pos = 0
|
||||
while pos + 4 <= len(raw):
|
||||
opcode = struct.unpack_from('<H', raw, pos)[0]
|
||||
length = struct.unpack_from('<H', raw, pos + 2)[0]
|
||||
pos += 4
|
||||
if pos + length > len(raw):
|
||||
break
|
||||
record_data = raw[pos:pos + length]
|
||||
pos += length
|
||||
|
||||
if opcode == SST:
|
||||
self._parse_sst(record_data)
|
||||
elif opcode == LABELSST:
|
||||
self._parse_labelsst(record_data)
|
||||
elif opcode == NUMBER:
|
||||
self._parse_number(record_data)
|
||||
elif opcode == FORMULA:
|
||||
self._parse_formula(record_data)
|
||||
elif opcode == RK:
|
||||
self._parse_rk(record_data)
|
||||
elif opcode == MULRK:
|
||||
self._parse_mulrk(record_data)
|
||||
elif opcode == LABEL:
|
||||
self._parse_label(record_data)
|
||||
elif opcode == EOF:
|
||||
break
|
||||
|
||||
# ── SST parser ──────────────────────────────────────────────────
|
||||
|
||||
def _parse_sst(self, data: bytes):
|
||||
"""Parse the Shared Strings Table."""
|
||||
if len(data) < 8:
|
||||
return
|
||||
cst_total = struct.unpack_from('<I', data, 0)[0]
|
||||
# cst_unique = struct.unpack_from('<I', data, 4)[0] # not needed
|
||||
|
||||
offset = 8
|
||||
for _ in range(cst_total):
|
||||
if offset + 2 > len(data):
|
||||
break
|
||||
cch = struct.unpack_from('<H', data, offset)[0]
|
||||
offset += 2
|
||||
|
||||
if offset >= len(data):
|
||||
break
|
||||
flags = data[offset]
|
||||
offset += 1
|
||||
|
||||
is_16bit = bool(flags & 0x08)
|
||||
has_rich = bool(flags & 0x04)
|
||||
has_ext = bool(flags & 0x10)
|
||||
|
||||
# Skip extended formatting (run count)
|
||||
if has_rich and offset + 2 <= len(data):
|
||||
iset = struct.unpack_from('<H', data, offset)[0]
|
||||
offset += 2 + iset * 4 # 4 bytes per format run
|
||||
|
||||
# Skip extended string (Asian phonetic)
|
||||
if has_ext and offset + 4 <= len(data):
|
||||
ext_size = struct.unpack_from('<I', data, offset)[0]
|
||||
offset += 4 + ext_size
|
||||
|
||||
# Read the string characters
|
||||
if is_16bit:
|
||||
byte_count = cch * 2
|
||||
else:
|
||||
byte_count = cch
|
||||
|
||||
if offset + byte_count > len(data):
|
||||
break
|
||||
|
||||
if is_16bit:
|
||||
text = data[offset:offset + byte_count].decode('utf-16le', errors='replace')
|
||||
else:
|
||||
text = data[offset:offset + byte_count].decode('cp1252', errors='replace')
|
||||
|
||||
self._sst.append(text)
|
||||
offset += byte_count
|
||||
|
||||
# ── Cell record parsers ─────────────────────────────────────────
|
||||
|
||||
def _parse_labelsst(self, data: bytes):
|
||||
"""LABELSST (0x00FD): row(2) + col(2) + xf(2) + sst_index(4)."""
|
||||
if len(data) < 10:
|
||||
return
|
||||
row = struct.unpack_from('<H', data, 0)[0]
|
||||
col = struct.unpack_from('<H', data, 2)[0]
|
||||
sst_idx = struct.unpack_from('<I', data, 6)[0]
|
||||
if sst_idx < len(self._sst):
|
||||
self._cells[(row, col)] = self._sst[sst_idx]
|
||||
|
||||
def _parse_number(self, data: bytes):
|
||||
"""NUMBER (0x0203): row(2) + col(2) + xf(2) + float(8)."""
|
||||
if len(data) < 14:
|
||||
return
|
||||
row = struct.unpack_from('<H', data, 0)[0]
|
||||
col = struct.unpack_from('<H', data, 2)[0]
|
||||
value = struct.unpack_from('<d', data, 6)[0]
|
||||
self._cells[(row, col)] = self._format_number(value)
|
||||
|
||||
def _parse_formula(self, data: bytes):
|
||||
"""FORMULA (0x0006): row(2) + col(2) + xf(2) + result(8) + ...
|
||||
|
||||
The result bytes can encode a string, number, boolean, or error.
|
||||
We check the first two bytes of the result to determine type.
|
||||
"""
|
||||
if len(data) < 20:
|
||||
return
|
||||
row = struct.unpack_from('<H', data, 0)[0]
|
||||
col = struct.unpack_from('<H', data, 2)[0]
|
||||
result_bytes = data[4:12]
|
||||
|
||||
# Check for string result (first two bytes are 0xFFFF)
|
||||
if result_bytes[:2] == b'\xff\xff':
|
||||
# The actual string comes in a following STRING record
|
||||
return
|
||||
|
||||
# Try as double
|
||||
value = struct.unpack_from('<d', result_bytes, 0)[0]
|
||||
self._cells[(row, col)] = self._format_number(value)
|
||||
|
||||
def _parse_rk(self, data: bytes):
|
||||
"""RK (0x000C): row(2) + col(2) + xf(2) + rk(4)."""
|
||||
if len(data) < 10:
|
||||
return
|
||||
row = struct.unpack_from('<H', data, 0)[0]
|
||||
col = struct.unpack_from('<H', data, 2)[0]
|
||||
rk_val = struct.unpack_from('<I', data, 6)[0]
|
||||
value = self._decode_rk(rk_val)
|
||||
self._cells[(row, col)] = self._format_number(value)
|
||||
|
||||
def _parse_mulrk(self, data: bytes):
|
||||
"""MULRK (0x00BD): row(2) + col_first(2) + (xf(2)+rk(4))*n + col_last(2)."""
|
||||
if len(data) < 6:
|
||||
return
|
||||
row = struct.unpack_from('<H', data, 0)[0]
|
||||
col_first = struct.unpack_from('<H', data, 2)[0]
|
||||
col_last = struct.unpack_from('<H', data, -2)[0]
|
||||
n = col_last - col_first + 1
|
||||
pos = 4
|
||||
for i in range(n):
|
||||
if pos + 6 > len(data):
|
||||
break
|
||||
# xf = struct.unpack_from('<H', data, pos)[0] # not needed
|
||||
rk_val = struct.unpack_from('<I', data, pos + 2)[0]
|
||||
value = self._decode_rk(rk_val)
|
||||
self._cells[(row, col_first + i)] = self._format_number(value)
|
||||
pos += 6
|
||||
|
||||
def _parse_label(self, data: bytes):
|
||||
"""LABEL (0x0204): row(2) + col(2) + xf(2) + cch(2) + ...
|
||||
|
||||
Deprecated but sometimes present. Internal string, not SST.
|
||||
"""
|
||||
if len(data) < 6:
|
||||
return
|
||||
row = struct.unpack_from('<H', data, 0)[0]
|
||||
col = struct.unpack_from('<H', data, 2)[0]
|
||||
cch = struct.unpack_from('<H', data, 4)[0]
|
||||
if len(data) < 6 + cch:
|
||||
return
|
||||
flags = data[6] if 6 < len(data) else 0
|
||||
offset = 7
|
||||
if flags & 0x01:
|
||||
# 16-bit
|
||||
text = data[offset:offset + cch * 2].decode('utf-16le', errors='replace')
|
||||
else:
|
||||
text = data[offset:offset + cch].decode('cp1252', errors='replace')
|
||||
self._cells[(row, col)] = text
|
||||
|
||||
# ── Helpers ─────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _decode_rk(rk: int) -> float:
|
||||
"""Decode an RK value to a float."""
|
||||
if rk & 0x02:
|
||||
# Integer
|
||||
val = (rk >> 2) if rk & 0x01 else rk >> 2
|
||||
if rk & 0x80000000:
|
||||
val = -((~rk >> 2) & 0x3FFFFFFF)
|
||||
# Actually, the integer encoding: bit 0 = int flag
|
||||
# If bit 0 set, it's a signed 30-bit int
|
||||
int_val = (rk >> 2) & 0x3FFFFFFF
|
||||
if rk & 0x40000000:
|
||||
int_val -= 0x40000000
|
||||
multiplier = 0.01 if rk & 0x01 else 1.0
|
||||
return int_val * multiplier
|
||||
else:
|
||||
# Float: reconstruct IEEE 754 double from the 30-bit mantissa
|
||||
# Take the 32-bit rk, set bit 0 and 1 to 0
|
||||
mantissa = (rk >> 2) & 0x3FFFFFFF
|
||||
if rk & 0x01:
|
||||
mantissa = int(mantissa / 0.01)
|
||||
# Build a double from the upper bits
|
||||
# The RK stores the top 30 bits of the mantissa
|
||||
double_bytes = struct.pack('<I', rk & 0xFFFFFFFC | 0x00000002)
|
||||
# Actually, proper RK decoding:
|
||||
# If bit 1 is 0 → it's a float stored in a compressed form
|
||||
# Reconstruct: take 32-bit value, set bits 0-1 to 0, prepend 0x00000002
|
||||
raw = (rk & 0xFFFFFFFC) | 0x00000000
|
||||
# The RK float is stored as: sign(1) + exp(11) + mantissa(30)
|
||||
# padded to 32 bits. We need to expand to 64-bit double.
|
||||
# Simplified: treat as a special encoding
|
||||
if rk & 0x01:
|
||||
multiplier = 0.01
|
||||
else:
|
||||
multiplier = 1.0
|
||||
# Proper decoding using bit manipulation
|
||||
sign = (rk >> 31) & 1
|
||||
exp = (rk >> 22) & 0x3FF
|
||||
mant = rk & 0x003FFFFF
|
||||
|
||||
# Reconstruct double
|
||||
# RK uses 30-bit mantissa (bits 2-31 of rk), with implicit leading 1
|
||||
# and biased exponent
|
||||
if exp == 0 and mant == 0:
|
||||
return 0.0
|
||||
# Build IEEE 754 double
|
||||
d_sign = sign
|
||||
d_exp = exp + 896 # bias adjustment
|
||||
d_mant = mant << 20 # expand 30-bit to 52-bit
|
||||
|
||||
# Pack as double
|
||||
packed = (d_sign << 63) | (d_exp << 52) | d_mant
|
||||
packed_bytes = struct.pack('<Q', packed)
|
||||
value = struct.unpack_from('<d', packed_bytes, 0)[0]
|
||||
return value * multiplier
|
||||
|
||||
@staticmethod
|
||||
def _format_number(value: float) -> str:
|
||||
"""Format a numeric value as a string."""
|
||||
if value == int(value) and abs(value) < 1e15:
|
||||
return str(int(value))
|
||||
return str(value)
|
||||
|
||||
|
||||
# ── Module-level convenience function ──────────────────────────────
|
||||
|
||||
def read_excel_cells(filepath: str) -> dict[tuple[int, int], str]:
|
||||
"""Read an .xls file and return {(row, col): str}.
|
||||
|
||||
Rows and columns are 0-based. A1 → (0, 0).
|
||||
"""
|
||||
return XLSReader(filepath).read_all_cells()
|
||||
Reference in New Issue
Block a user