"""根据前端传来的json数据,修改模板文件,生成新的word文档""" from docx.oxml.ns import qn from docx.shared import Pt from docx.enum.text import WD_ALIGN_PARAGRAPH import os import sys import traceback from datetime import datetime import json import shutil from lxml import etree import base64 from docx import Document from docx.shared import RGBColor import re from docx.enum.table import WD_ALIGN_VERTICAL def getJsonData(jsonFilePath): """获取JSON数据""" if not os.path.exists(jsonFilePath): print(f"Error:JSON文件路径 '{jsonFilePath}' 不存在,请确认") return None with open(jsonFilePath, 'r', encoding='utf-8') as jsonFile: data = json.load(jsonFile) return data class Tee: """自定义文件对象,同时写入文件和原控制台""" def __init__(self, *files): self.files = files def write(self, obj): for f in self.files: f.write(obj) f.flush() # 确保立即写入 def flush(self): for f in self.files: f.flush() def replace_text_in_textbox(doc, data_dict): """替换文本框中的文本""" namespaces = { 'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', 'wp': 'http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing', 'a': 'http://schemas.openxmlformats.org/drawingml/2006/main', 'wps': 'http://schemas.microsoft.com/office/word/2010/wordprocessingShape' } doc_modified = False # 整个文档是否被修改 for part_id, related_part in doc.part.related_parts.items(): if hasattr(related_part, 'blob'): xml_content = related_part.blob # 检查是否是 XML 内容(以 {value}") part_modified = True doc_modified = True # 关键步骤:将修改后的 XML 写回到部件中 if part_modified: # 同时更新 _element 和 _blob if hasattr(related_part, '_element'): related_part._element = root updated_xml = etree.tostring( root, encoding='UTF-8', xml_declaration=True) related_part._blob = updated_xml except etree.XMLSyntaxError as e: print(f"Warning:部件 {part_id} 不是有效的 XML: {e}") continue except Exception as e: print(f"Warning:处理部件 {part_id} 时出错: {e}") continue def replace_text_in_paragraph(paragraph, data_dict, csvPath=None): """新增段落中的文本""" text = paragraph.text # 居中显示 # paragraph.alignment = 1 ngFlag = False if "$" in text and "<" not in text and ">" not in text: keys_list = re.findall(r'\$([a-zA-Z0-9_]+)', text) for keyName in keys_list: value = data_dict.get(keyName, None) if value is not None: placeholder = f"${keyName}" text = text.replace(placeholder, str(value)) print(f"文本: {placeholder} -> {value}") paragraph.text = text def add_text_in_table(paragraph, data_dict): """新增表格中的文本""" text = paragraph.text if "$" in text and "<" not in text and ">" not in text: keys_list = re.findall(r'\$([a-zA-Z0-9_]+)', text) for keyName in keys_list: value = data_dict.get(keyName, None) if value is not None: placeholder = f"${keyName}" text = text.replace(placeholder, str(value)) print(f"文本: {placeholder} -> {value}") paragraph.text = text def get_reference_style(table): """从表格第一行提取样式特征""" style = { 'font_name': '宋体', 'font_size': Pt(10.5), 'alignment': WD_ALIGN_PARAGRAPH.CENTER, 'vertical': WD_ALIGN_VERTICAL.CENTER } try: if len(table.rows) > 0: # 尝试从第一行第一个单元格获取字体信息 cell = table.rows[0].cells[0] style['vertical'] = cell.vertical_alignment or WD_ALIGN_VERTICAL.CENTER if cell.paragraphs and cell.paragraphs[0].runs: run = cell.paragraphs[0].runs[0] if run.font.name: style['font_name'] = run.font.name if run.font.size: style['font_size'] = run.font.size style['alignment'] = cell.paragraphs[0].alignment or WD_ALIGN_PARAGRAPH.CENTER except Exception: pass return style def apply_style_to_cell(cell, text, style): """将提取或默认的样式应用到单元格""" cell.text = str(text) cell.vertical_alignment = style['vertical'] for paragraph in cell.paragraphs: paragraph.alignment = style['alignment'] for run in paragraph.runs: run.font.name = style['font_name'] run.font.size = style['font_size'] # 确保中文字体兼容性 run._element.rPr.rFonts.set(qn('w:eastAsia'), style['font_name']) def replace_text_in_table(table, data_dict, tableFlag=False, count_wrapper=[0]): """ 替换表格中的文本 count_wrapper: 传入一个列表,例如 [2],表示剩余可执行次数 """ for row in table.rows: headers = [cell.text for cell in row.cells] if headers: header_str = ",".join(headers) if header_str in data_dict and not tableFlag and count_wrapper[0] > 0: print(f"================表头匹配到数据: {header_str}") table_data = data_dict[header_str] # 在表格末尾添加数据行 # 提取原表格样式 ref_style = get_reference_style(table) for data_row in table_data: new_row = table.add_row() # 尝试继承原行高 if len(table.rows) > 1: new_row.height = table.rows[0].height for idx, header in enumerate(headers): # content = data_row.get(header, "") # apply_style_to_cell( # new_row.cells[idx], content, ref_style) key_list = list(data_row.keys()) new_row.cells[idx].text = str(data_row[key_list[idx]]) # 根据内容长短,自适应水平居中显示 不换行 # for paragraph in new_row.cells[idx].paragraphs: # paragraph.vertical = WD_ALIGN_VERTICAL.CENTER # paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER print( f"表格新增行内容: {[str(data_row.get(header, '')) for header in headers]}") tableFlag = True count_wrapper[0] -= 1 if not tableFlag: for cell in row.cells: # 递归处理单元格中的嵌套表格 if cell.tables: for nested_table in cell.tables: replace_text_in_table( nested_table, data_dict, tableFlag=tableFlag, count_wrapper=count_wrapper) # 新增表格内容 for paragraph in cell.paragraphs: replace_text_in_paragraph(paragraph, data_dict) def getDataDict(reportContent_list): """获取数据字典 用于替换文本""" data_dict = {} table_num = 0 for item_dict in reportContent_list: if item_dict["type"] == "text": data_dict[item_dict["key"]] = item_dict["value"] elif item_dict["type"] == "table": tableContent = item_dict.get("value", []) # 获取表头 headers = [] if tableContent: headers = list(tableContent[0].keys()) if headers: header_str = ",".join(headers) # newKey = item_dict["key"] + "_" + header_str newKey = header_str data_dict[newKey] = tableContent table_num += 1 elif item_dict["type"] == "conclusion": data_dict[item_dict["key"]] = item_dict["value"] return data_dict, table_num def base64ToImg(projectPath, reportContent_list): """把base64字符串转成图片""" img_num = 0 for item_dict in reportContent_list: if item_dict["type"] == "img": img_base64_list = item_dict["value"] img_base64_str = img_base64_list[0].get("src", "") img_base64 = img_base64_str.split(",")[1] img_data = base64.b64decode(img_base64) img_path = f"{projectPath}/{item_dict['key']}.png" img_num += 1 with open(img_path, 'wb') as img_file: img_file.write(img_data) # 新增字典中的picPath为图片路径 # item_dict["picPath"] = img_path return img_num def modiyReport(projectPath, reportContent_list, outputDocxPath): """根据JSON数据修改报告文件""" doc = Document(outputDocxPath) # 把base64转成图片 img_num = base64ToImg(projectPath, reportContent_list) print(f"Info:共转换{img_num}张图片") # 图片与模版中图片名称的对应关系 模版从2开始 对应图片1.png 2.png ... image_dict = { f"image{i}.png": f"pic{i-1}.png" for i in range(2, img_num+2)} # image_dict = {"image2.png": "pic1.png", # "image3.png": "pic2.png", # "image4.png": "pic3.png", # "image5.png": "pic4.png", # "image6.png": "pic5.png", # } # 获取数据字典 用于替换文本 data_dict, table_num = getDataDict(reportContent_list) # 处理页眉 for section in doc.sections: if section.header: for paragraph in section.header.paragraphs: replace_text_in_paragraph(paragraph, data_dict) # 处理页眉中的表格 for table in section.header.tables: replace_text_in_table(table, data_dict) # 替换图片 for rel in doc.part.rels.values(): if "image" in rel.reltype: # print(rel.target_ref, rel.reltype) # media/image3.png imageItem = rel.target_ref.split("/")[-1] picName = image_dict.get(imageItem, None) if picName is not None and picName != ".png": replace_image_path = os.path.join(projectPath, picName) if os.path.exists(replace_image_path): print(f"图片: {imageItem} -> {picName}") rel.target_part._blob = open( replace_image_path, "rb").read() # 处理表格 remaining_count = [table_num] for table in doc.tables: replace_text_in_table( table, data_dict, tableFlag=False, count_wrapper=remaining_count) # 处理结论 # replaceConclusion(doc, data_dict) # 处理文本框 replace_text_in_textbox(doc, data_dict) # 保存文档 doc.save(outputDocxPath) print(f"Info:Word文档已创建: {outputDocxPath}") if __name__ == "__main__": projectPath = sys.argv[1] templatePath = sys.argv[2] # pythonLogPath = "/opt/report" # currentPath = os.getcwd() # projectPath = f"{currentPath}/project/modiyTemp" # templatePath = f"{currentPath}/Test/report_tem.docx" pythonLogPath = os.path.dirname(projectPath) # 输出log日志 # 日志文件路径 log_fileDir = f"{pythonLogPath}/pythonLog" if not os.path.exists(log_fileDir): os.makedirs(log_fileDir) # log文件用日期命名 log_file = f"{log_fileDir}/{datetime.now().strftime('%Y%m%d')}.log" with open(log_file, 'a', encoding='utf-8') as log_file_obj: # 重定向标准输出和标准错误 original_stdout = sys.stdout original_stderr = sys.stderr # 创建Tee对象,同时输出到文件和原控制台 tee = Tee(log_file_obj, original_stdout) sys.stdout = tee sys.stderr = tee # 在日志开头添加时间戳 print("=" * 60) print(f"程序开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"日志文件: {log_file}") print("=" * 60) try: # 获取报告内容JSON数据 reportContentPath = f"{projectPath}/reportContent.json" reportContent_list = getJsonData(reportContentPath) # 拷贝模板文件到项目目录下 outputDocxPath = f"{projectPath}/report_{datetime.now().strftime('%Y%m%d')}.docx" shutil.copy(templatePath, outputDocxPath) # 修改报告文件 modiyReport(projectPath, reportContent_list, outputDocxPath) print("\n" + "=" * 60) print(f"程序结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("程序执行成功") print("=" * 60) except Exception as e: print(f"\n程序执行出错: {str(e)}", file=sys.stderr) traceback.print_exc() print("=" * 60) print(f"程序异常结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) raise finally: # 恢复标准输出 sys.stdout = original_stdout sys.stderr = original_stderr