PDF压缩工具(字节精度)

实现思路

–遍历PDF中所有的图片

–二分法压缩所有图片寻找最适合大小,迭代20次且<小于目标值

–再次微调压缩质量best_quality ± 8

–将文件末尾追加\x00 达到目标精度

遗留的问题

–使用pyinstaller打包出来的文件无法操作C盘以外的PDF文件

–对于小于目标大小的PDF文件应直接填充\x00

–可调迭代次数

–与目标大小差距过大的文件受压缩限制无法达到目标大小

import os
import sys
import io
import tempfile
import pikepdf
from PIL import Image
from PyQt5.QtWidgets import (
    QApplication, QWidget, QLabel, QPushButton, QVBoxLayout,
    QLineEdit, QFileDialog, QMessageBox, QProgressBar, QPlainTextEdit
)
try:
    import fitz  # PyMuPDF
    PYMUPDF_AVAILABLE = True
except ImportError:
    PYMUPDF_AVAILABLE = False



class PDFCompressor(QWidget):
    def __init__(self):
        super().__init__()
        self.init_ui()

    def init_ui(self):
        self.setWindowTitle("存档工具-powered by wcybi")
        self.setFixedSize(480, 520)

        layout = QVBoxLayout()

        self.input_label = QLabel("选择PDF文件:")
        self.input_path = QLineEdit()
        self.input_btn = QPushButton("浏览")
        self.input_btn.clicked.connect(self.browse_file)

        self.size_label = QLabel("目标大小 (字节):")
        self.size_input = QLineEdit()
        self.size_input.setPlaceholderText("例如:5690000")

        self.output_label = QLabel("输出文件名:")
        self.output_path = QLineEdit("compressed.pdf")
        self.output_path.setReadOnly(True)

        self.compress_btn = QPushButton("开始压缩")
        self.compress_btn.clicked.connect(self.compress_pdf)

        self.progress = QProgressBar()
        self.progress.setValue(0)

        self.log_output = QPlainTextEdit()
        self.log_output.setReadOnly(True)
        self.log_output.setPlaceholderText("LOG")
        self.log_output.setMinimumHeight(200)

        layout.addWidget(self.input_label)
        layout.addWidget(self.input_path)
        layout.addWidget(self.input_btn)
        layout.addWidget(self.size_label)
        layout.addWidget(self.size_input)
        layout.addWidget(self.output_label)
        layout.addWidget(self.output_path)
        layout.addWidget(self.compress_btn)
        layout.addWidget(self.progress)
        layout.addWidget(self.log_output)

        self.setLayout(layout)

    def browse_file(self):
        file_path, _ = QFileDialog.getOpenFileName(self, "选择PDF文件", "", "PDF 文件 (*.pdf)")
        if file_path:
            self.input_path.setText(file_path)
            base_name = os.path.splitext(os.path.basename(file_path))[0]
            suggested = os.path.join(os.path.dirname(file_path), f"{base_name}(已压缩).pdf")
            self.output_path.setText(suggested)

    def compress_pdf(self):
        input_file = self.input_path.text().strip()
        target_size_text = self.size_input.text().strip()

        if not target_size_text or not target_size_text.isdigit():
            QMessageBox.warning(self, "错误", "字节输入错误")
            return

        target_size = int(target_size_text)


        target_size_mb = target_size / 1024 / 1024
        
        self.log_output.clear()
        self.log(f"开始压缩: {input_file}")
        self.log(f"目标大小: {target_size} B (~{target_size_mb:.2f} MB)")

        original_size = os.path.getsize(input_file)
        self.log(f"原始大小: {original_size} B (~{original_size/1024/1024:.2f} MB)")
        if original_size <= target_size:
            QMessageBox.information(
                self,
                "提示",
                "原文件体积已不大于目标值,\n"
                f"原文件: {original_size} 字节 (~{original_size/1024/1024:.2f} MB)\n"
                f"目标值: {target_size} 字节 (~{target_size_mb:.2f} MB)"
            )
            return

        base_name = os.path.splitext(os.path.basename(input_file))[0]
        output_dir = os.path.dirname(input_file)
        output_file = os.path.join(output_dir, f"{base_name}(已压缩).pdf")
        self.output_path.setText(output_file)

        try:
            self.compress_btn.setEnabled(False)
            self.progress.setValue(5)

            temp_file = tempfile.mktemp(suffix=".pdf")
            
            with pikepdf.open(input_file) as pdf:
                total_images = 0
                for page_num, page in enumerate(pdf.pages):
                    page_images = 0
                    
                    # 检查页面资源
                    if '/Resources' in page and '/XObject' in page.Resources:
                        xobjects = page.Resources.XObject
                        for img_name in xobjects.keys():
                            try:
                                obj = xobjects[img_name]
                                if obj.get('/Subtype') == '/Image':
                                    page_images += 1
                                    total_images += 1
                            except:
                                pass
                    
                    if page_images > 0:
                        self.log(f"页面 {page_num + 1}: {page_images} 个图像")

                self.log(f"总共找到 {total_images} 个图像对象")
                
                if total_images == 0:
                    self.log("未找到可压缩图像,流程终止。")
                    QMessageBox.warning(self, "警告", 
                        "PDF中未找到可压缩的图像!\n"
                        "此PDF可能主要包含文本或已经过优化。")
                    self.compress_btn.setEnabled(True)
                    return
                
                pdf.save(temp_file, linearize=True)

            self.progress.setValue(10)
            
            min_quality = 20
            max_quality = 95
            best_file = None
            best_diff = float("inf")
            best_quality = None
            tolerance = target_size * 0.002  # 0.2% 误差容忍度
            
            iteration = 0
            max_iterations = 20  # 迭代次数
            
            # 二分法
            self.log("=== 第一阶段:二分法 ===")
            while iteration < max_iterations and (max_quality - min_quality) > 1:
                iteration += 1
                quality = (min_quality + max_quality) // 2
                
                progress = 10 + int((iteration / max_iterations) * 50)
                self.progress.setValue(min(60, progress))
                
                compressed = self.recompress_images(temp_file, quality)
                size = os.path.getsize(compressed)
                diff = abs(size - target_size)
                
                self.log(
                    f"迭代 {iteration}: 质量={quality}, "
                    f"大小={size} B (~{size/1024/1024:.4f} MB), "
                    f"目标={target_size} B (~{target_size_mb:.4f} MB), "
                    f"差异={diff} B"
                )
                
                # 保存最接近目标的结果
                if diff < best_diff:
                    if best_file and os.path.exists(best_file):
                        try:
                            os.remove(best_file)
                        except:
                            pass
                    best_diff = diff
                    best_file = compressed
                    best_quality = quality
                else:
                    # 删除临时文件
                    if os.path.exists(compressed):
                        try:
                            os.remove(compressed)
                        except:
                            pass
                
                if diff <= tolerance:
                    self.log("达标")
                    break
                
                # 二分调整质量
                if size > target_size:
                    max_quality = quality - 1
                else:
                    min_quality = quality + 1
            
            # 微调
            if best_quality is not None:
                self.log(f"=== 第二阶段: {best_quality})===")
                self.progress.setValue(65)
                
                # 扩大搜索范围
                scan_range = range(max(20, best_quality - 8), min(95, best_quality + 9))
                

                results = []  # [(quality, size, diff, is_under_target), ...]
                
                for q in scan_range:
                    if q == best_quality:  # 跳过
                        continue
                        
                    self.progress.setValue(65 + int((q - scan_range.start) / len(scan_range) * 25))
                    
                    compressed = self.recompress_images(temp_file, q)
                    size = os.path.getsize(compressed)
                    diff = abs(size - target_size)
                    is_under = size < target_size
                    
                    self.log(
                        f"大小={size} B (~{size/1024/1024:.4f} MB), "
                        f"差异={diff} B, "
                        f"状态={'[可填充]' if is_under else '[超出]'}"
                    )
                    
                    results.append((q, size, diff, is_under, compressed))
                    
                    if diff <= tolerance:
                        self.log(f"找到目标精度质量: {q}")
                        break
                
                if results:
                    under_target = [r for r in results if r[3]]  # 小于目标的结果
                    
                    if under_target:
                        # 从小于目标的结果中找最大的
                        best_result = min(under_target, key=lambda x: target_size - x[1])
                        q, size, diff, _, compressed = best_result
                    else:
                        best_result = min(results, key=lambda x: x[2])
                        q, size, diff, _, compressed = best_result
                    
                    # 清理
                    for r in results:
                        if r[4] != compressed and os.path.exists(r[4]):
                            try:
                                os.remove(r[4])
                            except:
                                pass
                    
                    if best_file and os.path.exists(best_file):
                        try:
                            os.remove(best_file)
                        except:
                            pass
                    
                    best_diff = diff
                    best_file = compressed
                    best_quality = q
                    
                    self.log(f"选定质量 {q} (大小 {size} B, 状态 {'可填充' if size < target_size else '超出'})")
            #填充空字节
            current_size = os.path.getsize(best_file) if best_file else 0
            if best_diff > tolerance and best_file and best_quality:
                self.log("=== 第三阶段 ===")
                self.progress.setValue(85)
                

                if current_size > target_size:

                    for q_offset in [0.1, 0.2, 0.3, 0.5, 0.7]:
                        q = best_quality - q_offset
                        if q < 15:
                            continue
                            
                        compressed = self.recompress_images(temp_file, q)
                        size = os.path.getsize(compressed)
                        diff = abs(size - target_size)
                        
                        is_under = size < target_size
                        self.log(
                            f"大小={size} B, 差异={diff} B, "
                            f"状态={'[可填充]' if is_under else '[超出]'}"
                        )
                        
                        current_best_size = os.path.getsize(best_file) if best_file and os.path.exists(best_file) else 0

                        if is_under and (
                            best_file is None
                            or current_best_size >= target_size
                            or size > current_best_size
                        ):
                            if best_file and os.path.exists(best_file):
                                try:
                                    os.remove(best_file)
                                except:
                                    pass
                            best_diff = diff
                            best_file = compressed
                            best_quality = q
                            current_size = size
                        elif diff < best_diff:
                            if best_file and os.path.exists(best_file):
                                try:
                                    os.remove(best_file)
                                except:
                                    pass
                            best_diff = diff
                            best_file = compressed
                            best_quality = q
                            current_size = size
                        else:
                            if os.path.exists(compressed):
                                try:
                                    os.remove(compressed)
                                except:
                                    pass
            
            current_size = os.path.getsize(best_file) if best_file else 0
            size_diff_bytes = current_size - target_size
            
            if best_diff > tolerance and best_file and best_quality and abs(size_diff_bytes) > 51200:
                self.log(
                    f"=== 第五阶段:图像缩放微调(当前差异 {size_diff_bytes} B (~{size_diff_bytes/1024/1024:.3f} MB))==="
                )
                self.progress.setValue(95)
                
                # 计算需要的缩放比例
                if current_size > target_size:
                    # 估算缩放比例(图像大小与文件大小大致成二次方关系)
                    size_ratio = target_size / current_size
                    scale_factor = max(0.85, min(0.98, size_ratio ** 0.5))
                    
                    self.log(f"文件过大,尝试缩放图像至 {scale_factor*100:.1f}%")
                    
                    for sf in [scale_factor, scale_factor * 0.98, scale_factor * 1.02]:
                        if sf < 0.80 or sf > 1.0:
                            continue
                            
                        compressed = self.recompress_images(temp_file, best_quality, scale_factor=sf)
                        size = os.path.getsize(compressed)
                        diff = abs(size - target_size)
                        
                        self.log(
                            f"缩放 {sf*100:.1f}%: "
                            f"大小={size} B (~{size/1024/1024:.4f} MB), "
                            f"差异={diff} B"
                        )
                        
                        if diff < best_diff:
                            if best_file and os.path.exists(best_file):
                                try:
                                    os.remove(best_file)
                                except:
                                    pass
                            best_diff = diff
                            best_file = compressed
                            current_size = size
                            
                            if diff <= tolerance:
                                self.log("达到目标精度!")
                                break
                        else:
                            if os.path.exists(compressed):
                                try:
                                    os.remove(compressed)
                                except:
                                    pass

            # 保存最终结果
            if best_file and os.path.exists(best_file):
                # 确保输出路径是完整的
                if not os.path.isabs(output_file):
                    output_dir = os.path.dirname(input_file)
                    output_file = os.path.join(output_dir, output_file)
                
                os.replace(best_file, output_file)
                self.progress.setValue(100)

                final_size_bytes = os.path.getsize(output_file)
                padding_note = ""

                if final_size_bytes < target_size:
                    pad_amount = target_size - final_size_bytes
                    try:
                        new_size = self.pad_pdf_to_size(output_file, target_size)
                        if new_size:
                            padding_note = (
                                f"已追加填充: {pad_amount} B (~{pad_amount/1024:.2f} KB)"
                            )
                            final_size_bytes = new_size
                    except Exception as pad_err:
                        padding_note = f"填充失败: {pad_err}"
                        self.log(f"填充失败: {pad_err}")
                elif final_size_bytes > target_size:
                    padding_note = "注意: 最终大小超过目标,无法填充。"

                final_size_mb = final_size_bytes / 1024 / 1024
                diff_percent = (
                    abs(final_size_bytes - target_size) / target_size * 100
                    if target_size > 0 else 0
                )

                message_lines = [
                    "压缩完成!",
                    f"原始大小: {original_size} B (~{original_size/1024/1024:.2f} MB)",
                    f"目标大小: {target_size} B (~{target_size_mb:.2f} MB)",
                    f"最终大小: {final_size_bytes} B (~{final_size_mb:.2f} MB)",
                    f"误差: {diff_percent:.2f}%",
                ]

                if padding_note:
                    message_lines.append(padding_note)

                message_lines.append(f"输出文件: {output_file}")

                self.log(
                    f"最终大小 {final_size_bytes} B (~{final_size_mb:.2f} MB),误差 {diff_percent:.2f}%"
                )
                self.log("压缩完成,准备显示结果。")
                QMessageBox.information(self, "完成", "\n".join(message_lines))
            else:
                raise Exception("无法生成压缩文件")
            
            # 清理临时文件
            if os.path.exists(temp_file):
                try:
                    os.remove(temp_file)
                except:
                    pass

        except Exception as e:
            err_msg = f"压缩失败:{str(e)}"
            self.log(err_msg)
            QMessageBox.critical(self, "错误", err_msg)

        finally:
            self.compress_btn.setEnabled(True)
            self.progress.setValue(0)

    def log(self, message):
        text = str(message)
        self.log_output.appendPlainText(text)
        scrollbar = self.log_output.verticalScrollBar()
        if scrollbar is not None:
            scrollbar.setValue(scrollbar.maximum())
        print(text)

    def pad_pdf_to_size(self, file_path, target_size):
        """追加零字节"""
        if target_size <= 0:
            return os.path.getsize(file_path)

        current_size = os.path.getsize(file_path)
        if current_size >= target_size:
            return current_size

        pad_needed = target_size - current_size
        filler = b"\x00" * pad_needed

        with open(file_path, "ab") as fp:
            fp.write(filler)

        return target_size

    def recompress_images(self, pdf_path, quality=80.0, scale_factor=1.0):
        """压缩 PDF 图片"""
        tmp_path = tempfile.mktemp(suffix=".pdf")
        compressed_count = 0
        
        try:
            with pikepdf.open(pdf_path) as pdf:
                # 遍历所有页面
                for page_num, page in enumerate(pdf.pages):
                    # 获取页面资源
                    if '/Resources' not in page:
                        continue
                    
                    resources = page.Resources
                    if '/XObject' not in resources:
                        continue
                    
                    xobjects = resources.XObject
                    
                    # XObject遍历
                    for img_name in list(xobjects.keys()):
                        try:
                            obj = xobjects[img_name]

                            pdfimage = pikepdf.PdfImage(obj)
                            
                            # 跳过太小的图像
                            if pdfimage.width < 20 or pdfimage.height < 20:
                                continue
                            
                            pil_img = pdfimage.as_pil_image()
                            
                            # 缩放
                            if scale_factor < 1.0:
                                new_w = max(20, int(pil_img.width * scale_factor))
                                new_h = max(20, int(pil_img.height * scale_factor))
                                pil_img = pil_img.resize((new_w, new_h), Image.Resampling.LANCZOS)
                            
                            # 转换颜色模式
                            if pil_img.mode in ('RGBA', 'LA'):
                                bg = Image.new('RGB', pil_img.size, (255, 255, 255))
                                bg.paste(pil_img, mask=pil_img.split()[-1])
                                pil_img = bg
                            elif pil_img.mode == 'P':
                                pil_img = pil_img.convert('RGB')
                            elif pil_img.mode == 'L':
                                pass  # 保持灰度
                            elif pil_img.mode not in ('RGB', 'L'):
                                pil_img = pil_img.convert('RGB')
                            
                            img_buffer = io.BytesIO()
                            

                            if pil_img.mode == 'L':  # 灰度
                                pil_img.save(img_buffer, format='JPEG', 
                                           quality=int(quality), optimize=True)
                            else:  # RGB
                                quality_int = int(quality)
                                subsampling = 2 if quality_int < 50 else (1 if quality_int < 75 else 0)
                                pil_img.save(img_buffer, format='JPEG',
                                           quality=quality_int,
                                           optimize=True,
                                           progressive=True,
                                           subsampling=subsampling)
                            
                            img_buffer.seek(0)
                            
                            # 新图像流
                            new_image = pikepdf.Stream(pdf, img_buffer.read())
                            new_image.Type = pikepdf.Name.XObject
                            new_image.Subtype = pikepdf.Name.Image
                            new_image.Width = pil_img.width
                            new_image.Height = pil_img.height
                            new_image.ColorSpace = pikepdf.Name.DeviceRGB if pil_img.mode == 'RGB' else pikepdf.Name.DeviceGray
                            new_image.BitsPerComponent = 8
                            new_image.Filter = pikepdf.Name.DCTDecode
                            
                            
                            xobjects[img_name] = new_image
                            compressed_count += 1
                            
                        except Exception as e:
                            self.log(f"页面{page_num+1}图像{img_name}压缩失败: {str(e)}")
                            continue
                
                self.log(f"压缩 {compressed_count} 个图像")
                
                pdf.save(tmp_path, 
                        linearize=True,
                        compress_streams=True,
                        stream_decode_level=pikepdf.StreamDecodeLevel.generalized,
                        object_stream_mode=pikepdf.ObjectStreamMode.generate)
                
        except Exception as e:
            self.log(f"压缩PDF时出错: {str(e)}")
            raise
            
        return tmp_path
    

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = PDFCompressor()
    window.show()
    sys.exit(app.exec_())

留下评论

您的邮箱地址不会被公开。 必填项已用 * 标注