实现思路
–遍历PDF中所有的图片
–二分法压缩所有图片寻找最适合大小,迭代20次且<小于目标值
–再次微调压缩质量best_quality ± 8
–将文件末尾追加\x00 达到目标精度
遗留的问题
–使用pyinstaller打包出来的文件无法操作C盘以外的PDF文件
–对于小于目标大小的PDF文件应直接填充\x00
–可调迭代次数
–与目标大小差距过大的文件受压缩限制无法达到目标大小

import os
import sys
import io
import tempfile
import pikepdf
from PIL import Image
from PyQt5.QtWidgets import (
QApplication, QWidget, QLabel, QPushButton, QVBoxLayout,
QLineEdit, QFileDialog, QMessageBox, QProgressBar, QPlainTextEdit
)
try:
import fitz # PyMuPDF
PYMUPDF_AVAILABLE = True
except ImportError:
PYMUPDF_AVAILABLE = False
class PDFCompressor(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
self.setWindowTitle("存档工具-powered by wcybi")
self.setFixedSize(480, 520)
layout = QVBoxLayout()
self.input_label = QLabel("选择PDF文件:")
self.input_path = QLineEdit()
self.input_btn = QPushButton("浏览")
self.input_btn.clicked.connect(self.browse_file)
self.size_label = QLabel("目标大小 (字节):")
self.size_input = QLineEdit()
self.size_input.setPlaceholderText("例如:5690000")
self.output_label = QLabel("输出文件名:")
self.output_path = QLineEdit("compressed.pdf")
self.output_path.setReadOnly(True)
self.compress_btn = QPushButton("开始压缩")
self.compress_btn.clicked.connect(self.compress_pdf)
self.progress = QProgressBar()
self.progress.setValue(0)
self.log_output = QPlainTextEdit()
self.log_output.setReadOnly(True)
self.log_output.setPlaceholderText("LOG")
self.log_output.setMinimumHeight(200)
layout.addWidget(self.input_label)
layout.addWidget(self.input_path)
layout.addWidget(self.input_btn)
layout.addWidget(self.size_label)
layout.addWidget(self.size_input)
layout.addWidget(self.output_label)
layout.addWidget(self.output_path)
layout.addWidget(self.compress_btn)
layout.addWidget(self.progress)
layout.addWidget(self.log_output)
self.setLayout(layout)
def browse_file(self):
file_path, _ = QFileDialog.getOpenFileName(self, "选择PDF文件", "", "PDF 文件 (*.pdf)")
if file_path:
self.input_path.setText(file_path)
base_name = os.path.splitext(os.path.basename(file_path))[0]
suggested = os.path.join(os.path.dirname(file_path), f"{base_name}(已压缩).pdf")
self.output_path.setText(suggested)
def compress_pdf(self):
input_file = self.input_path.text().strip()
target_size_text = self.size_input.text().strip()
if not target_size_text or not target_size_text.isdigit():
QMessageBox.warning(self, "错误", "字节输入错误")
return
target_size = int(target_size_text)
target_size_mb = target_size / 1024 / 1024
self.log_output.clear()
self.log(f"开始压缩: {input_file}")
self.log(f"目标大小: {target_size} B (~{target_size_mb:.2f} MB)")
original_size = os.path.getsize(input_file)
self.log(f"原始大小: {original_size} B (~{original_size/1024/1024:.2f} MB)")
if original_size <= target_size:
QMessageBox.information(
self,
"提示",
"原文件体积已不大于目标值,\n"
f"原文件: {original_size} 字节 (~{original_size/1024/1024:.2f} MB)\n"
f"目标值: {target_size} 字节 (~{target_size_mb:.2f} MB)"
)
return
base_name = os.path.splitext(os.path.basename(input_file))[0]
output_dir = os.path.dirname(input_file)
output_file = os.path.join(output_dir, f"{base_name}(已压缩).pdf")
self.output_path.setText(output_file)
try:
self.compress_btn.setEnabled(False)
self.progress.setValue(5)
temp_file = tempfile.mktemp(suffix=".pdf")
with pikepdf.open(input_file) as pdf:
total_images = 0
for page_num, page in enumerate(pdf.pages):
page_images = 0
# 检查页面资源
if '/Resources' in page and '/XObject' in page.Resources:
xobjects = page.Resources.XObject
for img_name in xobjects.keys():
try:
obj = xobjects[img_name]
if obj.get('/Subtype') == '/Image':
page_images += 1
total_images += 1
except:
pass
if page_images > 0:
self.log(f"页面 {page_num + 1}: {page_images} 个图像")
self.log(f"总共找到 {total_images} 个图像对象")
if total_images == 0:
self.log("未找到可压缩图像,流程终止。")
QMessageBox.warning(self, "警告",
"PDF中未找到可压缩的图像!\n"
"此PDF可能主要包含文本或已经过优化。")
self.compress_btn.setEnabled(True)
return
pdf.save(temp_file, linearize=True)
self.progress.setValue(10)
min_quality = 20
max_quality = 95
best_file = None
best_diff = float("inf")
best_quality = None
tolerance = target_size * 0.002 # 0.2% 误差容忍度
iteration = 0
max_iterations = 20 # 迭代次数
# 二分法
self.log("=== 第一阶段:二分法 ===")
while iteration < max_iterations and (max_quality - min_quality) > 1:
iteration += 1
quality = (min_quality + max_quality) // 2
progress = 10 + int((iteration / max_iterations) * 50)
self.progress.setValue(min(60, progress))
compressed = self.recompress_images(temp_file, quality)
size = os.path.getsize(compressed)
diff = abs(size - target_size)
self.log(
f"迭代 {iteration}: 质量={quality}, "
f"大小={size} B (~{size/1024/1024:.4f} MB), "
f"目标={target_size} B (~{target_size_mb:.4f} MB), "
f"差异={diff} B"
)
# 保存最接近目标的结果
if diff < best_diff:
if best_file and os.path.exists(best_file):
try:
os.remove(best_file)
except:
pass
best_diff = diff
best_file = compressed
best_quality = quality
else:
# 删除临时文件
if os.path.exists(compressed):
try:
os.remove(compressed)
except:
pass
if diff <= tolerance:
self.log("达标")
break
# 二分调整质量
if size > target_size:
max_quality = quality - 1
else:
min_quality = quality + 1
# 微调
if best_quality is not None:
self.log(f"=== 第二阶段: {best_quality})===")
self.progress.setValue(65)
# 扩大搜索范围
scan_range = range(max(20, best_quality - 8), min(95, best_quality + 9))
results = [] # [(quality, size, diff, is_under_target), ...]
for q in scan_range:
if q == best_quality: # 跳过
continue
self.progress.setValue(65 + int((q - scan_range.start) / len(scan_range) * 25))
compressed = self.recompress_images(temp_file, q)
size = os.path.getsize(compressed)
diff = abs(size - target_size)
is_under = size < target_size
self.log(
f"大小={size} B (~{size/1024/1024:.4f} MB), "
f"差异={diff} B, "
f"状态={'[可填充]' if is_under else '[超出]'}"
)
results.append((q, size, diff, is_under, compressed))
if diff <= tolerance:
self.log(f"找到目标精度质量: {q}")
break
if results:
under_target = [r for r in results if r[3]] # 小于目标的结果
if under_target:
# 从小于目标的结果中找最大的
best_result = min(under_target, key=lambda x: target_size - x[1])
q, size, diff, _, compressed = best_result
else:
best_result = min(results, key=lambda x: x[2])
q, size, diff, _, compressed = best_result
# 清理
for r in results:
if r[4] != compressed and os.path.exists(r[4]):
try:
os.remove(r[4])
except:
pass
if best_file and os.path.exists(best_file):
try:
os.remove(best_file)
except:
pass
best_diff = diff
best_file = compressed
best_quality = q
self.log(f"选定质量 {q} (大小 {size} B, 状态 {'可填充' if size < target_size else '超出'})")
#填充空字节
current_size = os.path.getsize(best_file) if best_file else 0
if best_diff > tolerance and best_file and best_quality:
self.log("=== 第三阶段 ===")
self.progress.setValue(85)
if current_size > target_size:
for q_offset in [0.1, 0.2, 0.3, 0.5, 0.7]:
q = best_quality - q_offset
if q < 15:
continue
compressed = self.recompress_images(temp_file, q)
size = os.path.getsize(compressed)
diff = abs(size - target_size)
is_under = size < target_size
self.log(
f"大小={size} B, 差异={diff} B, "
f"状态={'[可填充]' if is_under else '[超出]'}"
)
current_best_size = os.path.getsize(best_file) if best_file and os.path.exists(best_file) else 0
if is_under and (
best_file is None
or current_best_size >= target_size
or size > current_best_size
):
if best_file and os.path.exists(best_file):
try:
os.remove(best_file)
except:
pass
best_diff = diff
best_file = compressed
best_quality = q
current_size = size
elif diff < best_diff:
if best_file and os.path.exists(best_file):
try:
os.remove(best_file)
except:
pass
best_diff = diff
best_file = compressed
best_quality = q
current_size = size
else:
if os.path.exists(compressed):
try:
os.remove(compressed)
except:
pass
current_size = os.path.getsize(best_file) if best_file else 0
size_diff_bytes = current_size - target_size
if best_diff > tolerance and best_file and best_quality and abs(size_diff_bytes) > 51200:
self.log(
f"=== 第五阶段:图像缩放微调(当前差异 {size_diff_bytes} B (~{size_diff_bytes/1024/1024:.3f} MB))==="
)
self.progress.setValue(95)
# 计算需要的缩放比例
if current_size > target_size:
# 估算缩放比例(图像大小与文件大小大致成二次方关系)
size_ratio = target_size / current_size
scale_factor = max(0.85, min(0.98, size_ratio ** 0.5))
self.log(f"文件过大,尝试缩放图像至 {scale_factor*100:.1f}%")
for sf in [scale_factor, scale_factor * 0.98, scale_factor * 1.02]:
if sf < 0.80 or sf > 1.0:
continue
compressed = self.recompress_images(temp_file, best_quality, scale_factor=sf)
size = os.path.getsize(compressed)
diff = abs(size - target_size)
self.log(
f"缩放 {sf*100:.1f}%: "
f"大小={size} B (~{size/1024/1024:.4f} MB), "
f"差异={diff} B"
)
if diff < best_diff:
if best_file and os.path.exists(best_file):
try:
os.remove(best_file)
except:
pass
best_diff = diff
best_file = compressed
current_size = size
if diff <= tolerance:
self.log("达到目标精度!")
break
else:
if os.path.exists(compressed):
try:
os.remove(compressed)
except:
pass
# 保存最终结果
if best_file and os.path.exists(best_file):
# 确保输出路径是完整的
if not os.path.isabs(output_file):
output_dir = os.path.dirname(input_file)
output_file = os.path.join(output_dir, output_file)
os.replace(best_file, output_file)
self.progress.setValue(100)
final_size_bytes = os.path.getsize(output_file)
padding_note = ""
if final_size_bytes < target_size:
pad_amount = target_size - final_size_bytes
try:
new_size = self.pad_pdf_to_size(output_file, target_size)
if new_size:
padding_note = (
f"已追加填充: {pad_amount} B (~{pad_amount/1024:.2f} KB)"
)
final_size_bytes = new_size
except Exception as pad_err:
padding_note = f"填充失败: {pad_err}"
self.log(f"填充失败: {pad_err}")
elif final_size_bytes > target_size:
padding_note = "注意: 最终大小超过目标,无法填充。"
final_size_mb = final_size_bytes / 1024 / 1024
diff_percent = (
abs(final_size_bytes - target_size) / target_size * 100
if target_size > 0 else 0
)
message_lines = [
"压缩完成!",
f"原始大小: {original_size} B (~{original_size/1024/1024:.2f} MB)",
f"目标大小: {target_size} B (~{target_size_mb:.2f} MB)",
f"最终大小: {final_size_bytes} B (~{final_size_mb:.2f} MB)",
f"误差: {diff_percent:.2f}%",
]
if padding_note:
message_lines.append(padding_note)
message_lines.append(f"输出文件: {output_file}")
self.log(
f"最终大小 {final_size_bytes} B (~{final_size_mb:.2f} MB),误差 {diff_percent:.2f}%"
)
self.log("压缩完成,准备显示结果。")
QMessageBox.information(self, "完成", "\n".join(message_lines))
else:
raise Exception("无法生成压缩文件")
# 清理临时文件
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except:
pass
except Exception as e:
err_msg = f"压缩失败:{str(e)}"
self.log(err_msg)
QMessageBox.critical(self, "错误", err_msg)
finally:
self.compress_btn.setEnabled(True)
self.progress.setValue(0)
def log(self, message):
text = str(message)
self.log_output.appendPlainText(text)
scrollbar = self.log_output.verticalScrollBar()
if scrollbar is not None:
scrollbar.setValue(scrollbar.maximum())
print(text)
def pad_pdf_to_size(self, file_path, target_size):
"""追加零字节"""
if target_size <= 0:
return os.path.getsize(file_path)
current_size = os.path.getsize(file_path)
if current_size >= target_size:
return current_size
pad_needed = target_size - current_size
filler = b"\x00" * pad_needed
with open(file_path, "ab") as fp:
fp.write(filler)
return target_size
def recompress_images(self, pdf_path, quality=80.0, scale_factor=1.0):
"""压缩 PDF 图片"""
tmp_path = tempfile.mktemp(suffix=".pdf")
compressed_count = 0
try:
with pikepdf.open(pdf_path) as pdf:
# 遍历所有页面
for page_num, page in enumerate(pdf.pages):
# 获取页面资源
if '/Resources' not in page:
continue
resources = page.Resources
if '/XObject' not in resources:
continue
xobjects = resources.XObject
# XObject遍历
for img_name in list(xobjects.keys()):
try:
obj = xobjects[img_name]
pdfimage = pikepdf.PdfImage(obj)
# 跳过太小的图像
if pdfimage.width < 20 or pdfimage.height < 20:
continue
pil_img = pdfimage.as_pil_image()
# 缩放
if scale_factor < 1.0:
new_w = max(20, int(pil_img.width * scale_factor))
new_h = max(20, int(pil_img.height * scale_factor))
pil_img = pil_img.resize((new_w, new_h), Image.Resampling.LANCZOS)
# 转换颜色模式
if pil_img.mode in ('RGBA', 'LA'):
bg = Image.new('RGB', pil_img.size, (255, 255, 255))
bg.paste(pil_img, mask=pil_img.split()[-1])
pil_img = bg
elif pil_img.mode == 'P':
pil_img = pil_img.convert('RGB')
elif pil_img.mode == 'L':
pass # 保持灰度
elif pil_img.mode not in ('RGB', 'L'):
pil_img = pil_img.convert('RGB')
img_buffer = io.BytesIO()
if pil_img.mode == 'L': # 灰度
pil_img.save(img_buffer, format='JPEG',
quality=int(quality), optimize=True)
else: # RGB
quality_int = int(quality)
subsampling = 2 if quality_int < 50 else (1 if quality_int < 75 else 0)
pil_img.save(img_buffer, format='JPEG',
quality=quality_int,
optimize=True,
progressive=True,
subsampling=subsampling)
img_buffer.seek(0)
# 新图像流
new_image = pikepdf.Stream(pdf, img_buffer.read())
new_image.Type = pikepdf.Name.XObject
new_image.Subtype = pikepdf.Name.Image
new_image.Width = pil_img.width
new_image.Height = pil_img.height
new_image.ColorSpace = pikepdf.Name.DeviceRGB if pil_img.mode == 'RGB' else pikepdf.Name.DeviceGray
new_image.BitsPerComponent = 8
new_image.Filter = pikepdf.Name.DCTDecode
xobjects[img_name] = new_image
compressed_count += 1
except Exception as e:
self.log(f"页面{page_num+1}图像{img_name}压缩失败: {str(e)}")
continue
self.log(f"压缩 {compressed_count} 个图像")
pdf.save(tmp_path,
linearize=True,
compress_streams=True,
stream_decode_level=pikepdf.StreamDecodeLevel.generalized,
object_stream_mode=pikepdf.ObjectStreamMode.generate)
except Exception as e:
self.log(f"压缩PDF时出错: {str(e)}")
raise
return tmp_path
if __name__ == "__main__":
app = QApplication(sys.argv)
window = PDFCompressor()
window.show()
sys.exit(app.exec_())
