feat:算例下编辑报告

This commit is contained in:
2026-01-13 17:33:29 +08:00
parent f9b2c327bb
commit b8e125f0b0
14 changed files with 660 additions and 16 deletions

393
project/modifyReport.py Normal file
View File

@@ -0,0 +1,393 @@
"""根据前端传来的json数据,修改模板文件,生成新的word文档"""
from docx.oxml.ns import qn
from docx.shared import Pt
from docx.enum.text import WD_ALIGN_PARAGRAPH
import os
import sys
import traceback
from datetime import datetime
import json
import shutil
from lxml import etree
import base64
from docx import Document
from docx.shared import RGBColor
import re
from docx.enum.table import WD_ALIGN_VERTICAL
def getJsonData(jsonFilePath):
"""获取JSON数据"""
if not os.path.exists(jsonFilePath):
print(f"Error:JSON文件路径 '{jsonFilePath}' 不存在,请确认")
return None
with open(jsonFilePath, 'r', encoding='utf-8') as jsonFile:
data = json.load(jsonFile)
return data
class Tee:
"""自定义文件对象,同时写入文件和原控制台"""
def __init__(self, *files):
self.files = files
def write(self, obj):
for f in self.files:
f.write(obj)
f.flush() # 确保立即写入
def flush(self):
for f in self.files:
f.flush()
def replace_text_in_textbox(doc, data_dict):
"""替换文本框中的文本"""
namespaces = {
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
'wp': 'http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing',
'a': 'http://schemas.openxmlformats.org/drawingml/2006/main',
'wps': 'http://schemas.microsoft.com/office/word/2010/wordprocessingShape'
}
doc_modified = False # 整个文档是否被修改
for part_id, related_part in doc.part.related_parts.items():
if hasattr(related_part, 'blob'):
xml_content = related_part.blob
# 检查是否是 XML 内容(以 <?xml 或 < 开头)
if not xml_content.strip().startswith(b'<?xml') and not xml_content.strip().startswith(b'<'):
continue
try:
try:
content_str = xml_content.decode('utf-8')
except UnicodeDecodeError:
continue
# 检查是否包含 XML 声明或根元素
if not content_str.strip().startswith('<?xml') and not content_str.strip().startswith('<'):
continue
root = etree.fromstring(xml_content)
# 查找文本框
textboxes = root.xpath('//wps:txbx', namespaces=namespaces)
part_modified = False # 当前部件是否被修改
for textbox in textboxes:
# 提取文本内容
text_elements = textbox.xpath(
'.//w:t', namespaces=namespaces)
for elem in text_elements:
# print(f"原始文本: {elem.text}")
if elem.text and "$" in elem.text:
keyName = elem.text.split("$")[1]
value = data_dict.get(keyName, None)
if value is not None:
elem.text = elem.text.replace(
f"${keyName}", str(value))
print(f"文本框替换: ${keyName} -> {value}")
part_modified = True
doc_modified = True
# 关键步骤:将修改后的 XML 写回到部件中
if part_modified:
# 同时更新 _element 和 _blob
if hasattr(related_part, '_element'):
related_part._element = root
updated_xml = etree.tostring(
root, encoding='UTF-8', xml_declaration=True)
related_part._blob = updated_xml
except etree.XMLSyntaxError as e:
print(f"Warning:部件 {part_id} 不是有效的 XML: {e}")
continue
except Exception as e:
print(f"Warning:处理部件 {part_id} 时出错: {e}")
continue
def replace_text_in_paragraph(paragraph, data_dict, csvPath=None):
"""新增段落中的文本"""
text = paragraph.text
# 居中显示
# paragraph.alignment = 1
ngFlag = False
if "$" in text and "<" not in text and ">" not in text:
keys_list = re.findall(r'\$([a-zA-Z0-9_]+)', text)
for keyName in keys_list:
value = data_dict.get(keyName, None)
if value is not None:
placeholder = f"${keyName}"
text = text.replace(placeholder, str(value))
print(f"文本: {placeholder} -> {value}")
paragraph.text = text
def add_text_in_table(paragraph, data_dict):
"""新增表格中的文本"""
text = paragraph.text
if "$" in text and "<" not in text and ">" not in text:
keys_list = re.findall(r'\$([a-zA-Z0-9_]+)', text)
for keyName in keys_list:
value = data_dict.get(keyName, None)
if value is not None:
placeholder = f"${keyName}"
text = text.replace(placeholder, str(value))
print(f"文本: {placeholder} -> {value}")
paragraph.text = text
def get_reference_style(table):
"""从表格第一行提取样式特征"""
style = {
'font_name': '宋体',
'font_size': Pt(10.5),
'alignment': WD_ALIGN_PARAGRAPH.CENTER,
'vertical': WD_ALIGN_VERTICAL.CENTER
}
try:
if len(table.rows) > 0:
# 尝试从第一行第一个单元格获取字体信息
cell = table.rows[0].cells[0]
style['vertical'] = cell.vertical_alignment or WD_ALIGN_VERTICAL.CENTER
if cell.paragraphs and cell.paragraphs[0].runs:
run = cell.paragraphs[0].runs[0]
if run.font.name:
style['font_name'] = run.font.name
if run.font.size:
style['font_size'] = run.font.size
style['alignment'] = cell.paragraphs[0].alignment or WD_ALIGN_PARAGRAPH.CENTER
except Exception:
pass
return style
def apply_style_to_cell(cell, text, style):
"""将提取或默认的样式应用到单元格"""
cell.text = str(text)
cell.vertical_alignment = style['vertical']
for paragraph in cell.paragraphs:
paragraph.alignment = style['alignment']
for run in paragraph.runs:
run.font.name = style['font_name']
run.font.size = style['font_size']
# 确保中文字体兼容性
run._element.rPr.rFonts.set(qn('w:eastAsia'), style['font_name'])
def replace_text_in_table(table, data_dict, tableFlag=False, count_wrapper=[0]):
"""
替换表格中的文本
count_wrapper: 传入一个列表,例如 [2],表示剩余可执行次数
"""
for row in table.rows:
headers = [cell.text for cell in row.cells]
if headers:
header_str = ",".join(headers)
if header_str in data_dict and not tableFlag and count_wrapper[0] > 0:
print(f"================表头匹配到数据: {header_str}")
table_data = data_dict[header_str]
# 在表格末尾添加数据行
# 提取原表格样式
ref_style = get_reference_style(table)
for data_row in table_data:
new_row = table.add_row()
# 尝试继承原行高
if len(table.rows) > 1:
new_row.height = table.rows[0].height
for idx, header in enumerate(headers):
# content = data_row.get(header, "")
# apply_style_to_cell(
# new_row.cells[idx], content, ref_style)
key_list = list(data_row.keys())
new_row.cells[idx].text = str(data_row[key_list[idx]])
# 根据内容长短,自适应水平居中显示 不换行
# for paragraph in new_row.cells[idx].paragraphs:
# paragraph.vertical = WD_ALIGN_VERTICAL.CENTER
# paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
print(
f"表格新增行内容: {[str(data_row.get(header, '')) for header in headers]}")
tableFlag = True
count_wrapper[0] -= 1
if not tableFlag:
for cell in row.cells:
# 递归处理单元格中的嵌套表格
if cell.tables:
for nested_table in cell.tables:
replace_text_in_table(
nested_table, data_dict, tableFlag=tableFlag, count_wrapper=count_wrapper)
# 新增表格内容
for paragraph in cell.paragraphs:
replace_text_in_paragraph(paragraph, data_dict)
def getDataDict(reportContent_list):
"""获取数据字典 用于替换文本"""
data_dict = {}
table_num = 0
for item_dict in reportContent_list:
if item_dict["type"] == "text":
data_dict[item_dict["key"]] = item_dict["value"]
elif item_dict["type"] == "table":
tableContent = item_dict.get("value", [])
# 获取表头
headers = []
if tableContent:
headers = list(tableContent[0].keys())
if headers:
header_str = ",".join(headers)
# newKey = item_dict["key"] + "_" + header_str
newKey = header_str
data_dict[newKey] = tableContent
table_num += 1
elif item_dict["type"] == "conclusion":
data_dict[item_dict["key"]] = item_dict["value"]
return data_dict, table_num
def base64ToImg(projectPath, reportContent_list):
"""把base64字符串转成图片"""
img_num = 0
for item_dict in reportContent_list:
if item_dict["type"] == "img":
img_base64_list = item_dict["value"]
img_base64_str = img_base64_list[0].get("src", "")
img_base64 = img_base64_str.split(",")[1]
img_data = base64.b64decode(img_base64)
img_path = f"{projectPath}/{item_dict['key']}.png"
img_num += 1
with open(img_path, 'wb') as img_file:
img_file.write(img_data)
# 新增字典中的picPath为图片路径
# item_dict["picPath"] = img_path
return img_num
def modiyReport(projectPath, reportContent_list, outputDocxPath):
"""根据JSON数据修改报告文件"""
doc = Document(outputDocxPath)
# 把base64转成图片
img_num = base64ToImg(projectPath, reportContent_list)
print(f"Info:共转换{img_num}张图片")
# 图片与模版中图片名称的对应关系 模版从2开始 对应图片1.png 2.png ...
image_dict = {
f"image{i}.png": f"pic{i-1}.png" for i in range(2, img_num+2)}
# image_dict = {"image2.png": "pic1.png",
# "image3.png": "pic2.png",
# "image4.png": "pic3.png",
# "image5.png": "pic4.png",
# "image6.png": "pic5.png",
# }
# 获取数据字典 用于替换文本
data_dict, table_num = getDataDict(reportContent_list)
# 处理页眉
for section in doc.sections:
if section.header:
for paragraph in section.header.paragraphs:
replace_text_in_paragraph(paragraph, data_dict)
# 处理页眉中的表格
for table in section.header.tables:
replace_text_in_table(table, data_dict)
# 替换图片
for rel in doc.part.rels.values():
if "image" in rel.reltype:
# print(rel.target_ref, rel.reltype)
# media/image3.png
imageItem = rel.target_ref.split("/")[-1]
picName = image_dict.get(imageItem, None)
if picName is not None and picName != ".png":
replace_image_path = os.path.join(projectPath, picName)
if os.path.exists(replace_image_path):
print(f"图片: {imageItem} -> {picName}")
rel.target_part._blob = open(
replace_image_path, "rb").read()
# 处理表格
remaining_count = [table_num]
for table in doc.tables:
replace_text_in_table(
table, data_dict, tableFlag=False, count_wrapper=remaining_count)
# 处理结论
# replaceConclusion(doc, data_dict)
# 处理文本框
replace_text_in_textbox(doc, data_dict)
# 保存文档
doc.save(outputDocxPath)
print(f"Info:Word文档已创建: {outputDocxPath}")
if __name__ == "__main__":
projectPath = sys.argv[1]
templatePath = sys.argv[2]
# pythonLogPath = "/opt/report"
# currentPath = os.getcwd()
# projectPath = f"{currentPath}/project/modiyTemp"
# templatePath = f"{currentPath}/Test/report_tem.docx"
pythonLogPath = os.path.dirname(projectPath)
# 输出log日志
# 日志文件路径
log_fileDir = f"{pythonLogPath}/pythonLog"
if not os.path.exists(log_fileDir):
os.makedirs(log_fileDir)
# log文件用日期命名
log_file = f"{log_fileDir}/{datetime.now().strftime('%Y%m%d')}.log"
with open(log_file, 'a', encoding='utf-8') as log_file_obj:
# 重定向标准输出和标准错误
original_stdout = sys.stdout
original_stderr = sys.stderr
# 创建Tee对象同时输出到文件和原控制台
tee = Tee(log_file_obj, original_stdout)
sys.stdout = tee
sys.stderr = tee
# 在日志开头添加时间戳
print("=" * 60)
print(f"程序开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"日志文件: {log_file}")
print("=" * 60)
try:
# 获取报告内容JSON数据
reportContentPath = f"{projectPath}/reportContent.json"
reportContent_list = getJsonData(reportContentPath)
# 拷贝模板文件到项目目录下
outputDocxPath = f"{projectPath}/report_{datetime.now().strftime('%Y%m%d')}.docx"
shutil.copy(templatePath, outputDocxPath)
# 修改报告文件
modiyReport(projectPath, reportContent_list, outputDocxPath)
print("\n" + "=" * 60)
print(f"程序结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("程序执行成功")
print("=" * 60)
except Exception as e:
print(f"\n程序执行出错: {str(e)}", file=sys.stderr)
traceback.print_exc()
print("=" * 60)
print(f"程序异常结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
raise
finally:
# 恢复标准输出
sys.stdout = original_stdout
sys.stderr = original_stderr