394 lines
15 KiB
Python
394 lines
15 KiB
Python
"""根据前端传来的json数据,修改模板文件,生成新的word文档"""
|
||
|
||
from docx.oxml.ns import qn
|
||
from docx.shared import Pt
|
||
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
||
import os
|
||
import sys
|
||
import traceback
|
||
from datetime import datetime
|
||
import json
|
||
import shutil
|
||
from lxml import etree
|
||
import base64
|
||
from docx import Document
|
||
from docx.shared import RGBColor
|
||
import re
|
||
from docx.enum.table import WD_ALIGN_VERTICAL
|
||
|
||
|
||
def getJsonData(jsonFilePath):
|
||
"""获取JSON数据"""
|
||
if not os.path.exists(jsonFilePath):
|
||
print(f"Error:JSON文件路径 '{jsonFilePath}' 不存在,请确认")
|
||
return None
|
||
|
||
with open(jsonFilePath, 'r', encoding='utf-8') as jsonFile:
|
||
data = json.load(jsonFile)
|
||
|
||
return data
|
||
|
||
|
||
class Tee:
|
||
"""自定义文件对象,同时写入文件和原控制台"""
|
||
|
||
def __init__(self, *files):
|
||
self.files = files
|
||
|
||
def write(self, obj):
|
||
for f in self.files:
|
||
f.write(obj)
|
||
f.flush() # 确保立即写入
|
||
|
||
def flush(self):
|
||
for f in self.files:
|
||
f.flush()
|
||
|
||
|
||
def replace_text_in_textbox(doc, data_dict):
|
||
"""替换文本框中的文本"""
|
||
namespaces = {
|
||
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
|
||
'wp': 'http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing',
|
||
'a': 'http://schemas.openxmlformats.org/drawingml/2006/main',
|
||
'wps': 'http://schemas.microsoft.com/office/word/2010/wordprocessingShape'
|
||
}
|
||
|
||
doc_modified = False # 整个文档是否被修改
|
||
for part_id, related_part in doc.part.related_parts.items():
|
||
if hasattr(related_part, 'blob'):
|
||
xml_content = related_part.blob
|
||
# 检查是否是 XML 内容(以 <?xml 或 < 开头)
|
||
if not xml_content.strip().startswith(b'<?xml') and not xml_content.strip().startswith(b'<'):
|
||
continue
|
||
try:
|
||
try:
|
||
content_str = xml_content.decode('utf-8')
|
||
except UnicodeDecodeError:
|
||
continue
|
||
|
||
# 检查是否包含 XML 声明或根元素
|
||
if not content_str.strip().startswith('<?xml') and not content_str.strip().startswith('<'):
|
||
continue
|
||
|
||
root = etree.fromstring(xml_content)
|
||
# 查找文本框
|
||
textboxes = root.xpath('//wps:txbx', namespaces=namespaces)
|
||
part_modified = False # 当前部件是否被修改
|
||
for textbox in textboxes:
|
||
# 提取文本内容
|
||
text_elements = textbox.xpath(
|
||
'.//w:t', namespaces=namespaces)
|
||
for elem in text_elements:
|
||
# print(f"原始文本: {elem.text}")
|
||
if elem.text and "$" in elem.text:
|
||
keyName = elem.text.split("$")[1]
|
||
value = data_dict.get(keyName, None)
|
||
if value is not None:
|
||
elem.text = elem.text.replace(
|
||
f"${keyName}", str(value))
|
||
print(f"文本框替换: ${keyName} -> {value}")
|
||
part_modified = True
|
||
doc_modified = True
|
||
|
||
# 关键步骤:将修改后的 XML 写回到部件中
|
||
if part_modified:
|
||
# 同时更新 _element 和 _blob
|
||
if hasattr(related_part, '_element'):
|
||
related_part._element = root
|
||
updated_xml = etree.tostring(
|
||
root, encoding='UTF-8', xml_declaration=True)
|
||
related_part._blob = updated_xml
|
||
|
||
except etree.XMLSyntaxError as e:
|
||
print(f"Warning:部件 {part_id} 不是有效的 XML: {e}")
|
||
continue
|
||
except Exception as e:
|
||
print(f"Warning:处理部件 {part_id} 时出错: {e}")
|
||
continue
|
||
|
||
|
||
def replace_text_in_paragraph(paragraph, data_dict, csvPath=None):
|
||
"""新增段落中的文本"""
|
||
text = paragraph.text
|
||
# 居中显示
|
||
# paragraph.alignment = 1
|
||
ngFlag = False
|
||
if "$" in text and "<" not in text and ">" not in text:
|
||
keys_list = re.findall(r'\$([a-zA-Z0-9_]+)', text)
|
||
for keyName in keys_list:
|
||
value = data_dict.get(keyName, None)
|
||
if value is not None:
|
||
placeholder = f"${keyName}"
|
||
text = text.replace(placeholder, str(value))
|
||
print(f"文本: {placeholder} -> {value}")
|
||
paragraph.text = text
|
||
|
||
|
||
def add_text_in_table(paragraph, data_dict):
|
||
"""新增表格中的文本"""
|
||
text = paragraph.text
|
||
if "$" in text and "<" not in text and ">" not in text:
|
||
keys_list = re.findall(r'\$([a-zA-Z0-9_]+)', text)
|
||
for keyName in keys_list:
|
||
value = data_dict.get(keyName, None)
|
||
if value is not None:
|
||
placeholder = f"${keyName}"
|
||
text = text.replace(placeholder, str(value))
|
||
print(f"文本: {placeholder} -> {value}")
|
||
paragraph.text = text
|
||
|
||
|
||
def get_reference_style(table):
|
||
"""从表格第一行提取样式特征"""
|
||
style = {
|
||
'font_name': '宋体',
|
||
'font_size': Pt(10.5),
|
||
'alignment': WD_ALIGN_PARAGRAPH.CENTER,
|
||
'vertical': WD_ALIGN_VERTICAL.CENTER
|
||
}
|
||
try:
|
||
if len(table.rows) > 0:
|
||
# 尝试从第一行第一个单元格获取字体信息
|
||
cell = table.rows[0].cells[0]
|
||
style['vertical'] = cell.vertical_alignment or WD_ALIGN_VERTICAL.CENTER
|
||
if cell.paragraphs and cell.paragraphs[0].runs:
|
||
run = cell.paragraphs[0].runs[0]
|
||
if run.font.name:
|
||
style['font_name'] = run.font.name
|
||
if run.font.size:
|
||
style['font_size'] = run.font.size
|
||
style['alignment'] = cell.paragraphs[0].alignment or WD_ALIGN_PARAGRAPH.CENTER
|
||
except Exception:
|
||
pass
|
||
return style
|
||
|
||
|
||
def apply_style_to_cell(cell, text, style):
|
||
"""将提取或默认的样式应用到单元格"""
|
||
cell.text = str(text)
|
||
cell.vertical_alignment = style['vertical']
|
||
for paragraph in cell.paragraphs:
|
||
paragraph.alignment = style['alignment']
|
||
for run in paragraph.runs:
|
||
run.font.name = style['font_name']
|
||
run.font.size = style['font_size']
|
||
# 确保中文字体兼容性
|
||
run._element.rPr.rFonts.set(qn('w:eastAsia'), style['font_name'])
|
||
|
||
|
||
def replace_text_in_table(table, data_dict, tableFlag=False, count_wrapper=[0]):
|
||
"""
|
||
替换表格中的文本
|
||
count_wrapper: 传入一个列表,例如 [2],表示剩余可执行次数
|
||
"""
|
||
for row in table.rows:
|
||
headers = [cell.text for cell in row.cells]
|
||
if headers:
|
||
header_str = ",".join(headers)
|
||
if header_str in data_dict and not tableFlag and count_wrapper[0] > 0:
|
||
print(f"================表头匹配到数据: {header_str}")
|
||
table_data = data_dict[header_str]
|
||
|
||
# 在表格末尾添加数据行
|
||
# 提取原表格样式
|
||
ref_style = get_reference_style(table)
|
||
for data_row in table_data:
|
||
new_row = table.add_row()
|
||
# 尝试继承原行高
|
||
if len(table.rows) > 1:
|
||
new_row.height = table.rows[0].height
|
||
for idx, header in enumerate(headers):
|
||
# content = data_row.get(header, "")
|
||
# apply_style_to_cell(
|
||
# new_row.cells[idx], content, ref_style)
|
||
key_list = list(data_row.keys())
|
||
new_row.cells[idx].text = str(data_row[key_list[idx]])
|
||
# 根据内容长短,自适应水平居中显示 不换行
|
||
# for paragraph in new_row.cells[idx].paragraphs:
|
||
# paragraph.vertical = WD_ALIGN_VERTICAL.CENTER
|
||
# paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
||
|
||
print(
|
||
f"表格新增行内容: {[str(data_row.get(header, '')) for header in headers]}")
|
||
tableFlag = True
|
||
count_wrapper[0] -= 1
|
||
|
||
if not tableFlag:
|
||
for cell in row.cells:
|
||
# 递归处理单元格中的嵌套表格
|
||
if cell.tables:
|
||
for nested_table in cell.tables:
|
||
replace_text_in_table(
|
||
nested_table, data_dict, tableFlag=tableFlag, count_wrapper=count_wrapper)
|
||
|
||
# 新增表格内容
|
||
for paragraph in cell.paragraphs:
|
||
replace_text_in_paragraph(paragraph, data_dict)
|
||
|
||
|
||
def getDataDict(reportContent_list):
|
||
"""获取数据字典 用于替换文本"""
|
||
data_dict = {}
|
||
table_num = 0
|
||
for item_dict in reportContent_list:
|
||
if item_dict["type"] == "text":
|
||
data_dict[item_dict["key"]] = item_dict["value"]
|
||
elif item_dict["type"] == "table":
|
||
tableContent = item_dict.get("value", [])
|
||
# 获取表头
|
||
headers = []
|
||
if tableContent:
|
||
headers = list(tableContent[0].keys())
|
||
if headers:
|
||
header_str = ",".join(headers)
|
||
# newKey = item_dict["key"] + "_" + header_str
|
||
newKey = header_str
|
||
data_dict[newKey] = tableContent
|
||
table_num += 1
|
||
elif item_dict["type"] == "conclusion":
|
||
data_dict[item_dict["key"]] = item_dict["value"]
|
||
return data_dict, table_num
|
||
|
||
|
||
def base64ToImg(projectPath, reportContent_list):
|
||
"""把base64字符串转成图片"""
|
||
img_num = 0
|
||
for item_dict in reportContent_list:
|
||
if item_dict["type"] == "img":
|
||
img_base64_list = item_dict["value"]
|
||
img_base64_str = img_base64_list[0].get("src", "")
|
||
img_base64 = img_base64_str.split(",")[1]
|
||
img_data = base64.b64decode(img_base64)
|
||
img_path = f"{projectPath}/{item_dict['key']}.png"
|
||
img_num += 1
|
||
with open(img_path, 'wb') as img_file:
|
||
img_file.write(img_data)
|
||
# 新增字典中的picPath为图片路径
|
||
# item_dict["picPath"] = img_path
|
||
return img_num
|
||
|
||
|
||
def modiyReport(projectPath, reportContent_list, outputDocxPath):
|
||
"""根据JSON数据修改报告文件"""
|
||
doc = Document(outputDocxPath)
|
||
|
||
# 把base64转成图片
|
||
img_num = base64ToImg(projectPath, reportContent_list)
|
||
print(f"Info:共转换{img_num}张图片")
|
||
# 图片与模版中图片名称的对应关系 模版从2开始 对应图片1.png 2.png ...
|
||
image_dict = {
|
||
f"image{i}.png": f"pic{i-1}.png" for i in range(2, img_num+2)}
|
||
# image_dict = {"image2.png": "pic1.png",
|
||
# "image3.png": "pic2.png",
|
||
# "image4.png": "pic3.png",
|
||
# "image5.png": "pic4.png",
|
||
# "image6.png": "pic5.png",
|
||
# }
|
||
|
||
# 获取数据字典 用于替换文本
|
||
data_dict, table_num = getDataDict(reportContent_list)
|
||
|
||
# 处理页眉
|
||
for section in doc.sections:
|
||
if section.header:
|
||
for paragraph in section.header.paragraphs:
|
||
replace_text_in_paragraph(paragraph, data_dict)
|
||
# 处理页眉中的表格
|
||
for table in section.header.tables:
|
||
replace_text_in_table(table, data_dict)
|
||
|
||
# 替换图片
|
||
for rel in doc.part.rels.values():
|
||
if "image" in rel.reltype:
|
||
# print(rel.target_ref, rel.reltype)
|
||
# media/image3.png
|
||
imageItem = rel.target_ref.split("/")[-1]
|
||
picName = image_dict.get(imageItem, None)
|
||
if picName is not None and picName != ".png":
|
||
replace_image_path = os.path.join(projectPath, picName)
|
||
if os.path.exists(replace_image_path):
|
||
print(f"图片: {imageItem} -> {picName}")
|
||
rel.target_part._blob = open(
|
||
replace_image_path, "rb").read()
|
||
|
||
# 处理表格
|
||
remaining_count = [table_num]
|
||
for table in doc.tables:
|
||
replace_text_in_table(
|
||
table, data_dict, tableFlag=False, count_wrapper=remaining_count)
|
||
|
||
# 处理结论
|
||
# replaceConclusion(doc, data_dict)
|
||
|
||
# 处理文本框
|
||
replace_text_in_textbox(doc, data_dict)
|
||
|
||
# 保存文档
|
||
doc.save(outputDocxPath)
|
||
print(f"Info:Word文档已创建: {outputDocxPath}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
|
||
projectPath = sys.argv[1]
|
||
templatePath = sys.argv[2]
|
||
# pythonLogPath = "/opt/report"
|
||
|
||
# currentPath = os.getcwd()
|
||
# projectPath = f"{currentPath}/project/modiyTemp"
|
||
# templatePath = f"{currentPath}/Test/report_tem.docx"
|
||
pythonLogPath = os.path.dirname(projectPath)
|
||
|
||
# 输出log日志
|
||
# 日志文件路径
|
||
log_fileDir = f"{pythonLogPath}/pythonLog"
|
||
if not os.path.exists(log_fileDir):
|
||
os.makedirs(log_fileDir)
|
||
# log文件用日期命名
|
||
log_file = f"{log_fileDir}/{datetime.now().strftime('%Y%m%d')}.log"
|
||
with open(log_file, 'a', encoding='utf-8') as log_file_obj:
|
||
# 重定向标准输出和标准错误
|
||
original_stdout = sys.stdout
|
||
original_stderr = sys.stderr
|
||
|
||
# 创建Tee对象,同时输出到文件和原控制台
|
||
tee = Tee(log_file_obj, original_stdout)
|
||
sys.stdout = tee
|
||
sys.stderr = tee
|
||
|
||
# 在日志开头添加时间戳
|
||
print("=" * 60)
|
||
print(f"程序开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"日志文件: {log_file}")
|
||
print("=" * 60)
|
||
|
||
try:
|
||
# 获取报告内容JSON数据
|
||
reportContentPath = f"{projectPath}/reportContent.json"
|
||
reportContent_list = getJsonData(reportContentPath)
|
||
|
||
# 拷贝模板文件到项目目录下
|
||
outputDocxPath = f"{projectPath}/report_{datetime.now().strftime('%Y%m%d')}.docx"
|
||
shutil.copy(templatePath, outputDocxPath)
|
||
|
||
# 修改报告文件
|
||
modiyReport(projectPath, reportContent_list, outputDocxPath)
|
||
|
||
print("\n" + "=" * 60)
|
||
print(f"程序结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print("程序执行成功")
|
||
print("=" * 60)
|
||
|
||
except Exception as e:
|
||
print(f"\n程序执行出错: {str(e)}", file=sys.stderr)
|
||
traceback.print_exc()
|
||
print("=" * 60)
|
||
print(f"程序异常结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print("=" * 60)
|
||
raise
|
||
finally:
|
||
# 恢复标准输出
|
||
sys.stdout = original_stdout
|
||
sys.stderr = original_stderr
|