206 lines
10 KiB
Python
206 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
# Required libraries: paddleocr, paddlepaddle (or paddlepaddle-gpu), Pillow, opencv-python
|
||
# Install using: pip install paddlepaddle paddleocr Pillow opencv-python numpy
|
||
import os
|
||
import time
|
||
from paddleocr import PaddleOCR # Import PaddleOCR class
|
||
import logging
|
||
from PIL import Image # Added
|
||
import numpy as np # Added
|
||
import cv2 # Added
|
||
|
||
# Suppress excessive PaddleOCR logging if desired
|
||
logging.disable(logging.INFO) # Disables INFO level logs, keeps WARNING and ERROR
|
||
# logging.disable(logging.WARNING) # To disable WARNING level logs as well
|
||
|
||
# --- Configuration ---
|
||
BASE_IMAGE_DIR = '/Users/zpc01/workspace/zzlh/compliance/assets/images/井筒'
|
||
# Language configuration: 'ch' supports Chinese and English by default.
|
||
# Other options include 'en', 'korean', 'japan', 'french', 'german', etc.
|
||
LANG = 'ch'
|
||
USE_ANGLE_CLS = True # Use angle classification to help with rotated text
|
||
MAX_CHUNK_HEIGHT = 3000 # Added: Maximum height for each image chunk in pixels. Adjust if needed.
|
||
# PaddleOCR attempts to use GPU automatically if paddlepaddle-gpu is installed
|
||
# and CUDA is available. You can force CPU with use_gpu=False.
|
||
# USE_GPU = False # Uncomment to force CPU usage
|
||
|
||
# --- Initialize PaddleOCR Engine ---
|
||
# This needs to run only once to download and load models into memory.
|
||
print(f"正在初始化 PaddleOCR (语言: {LANG})... 这可能需要一些时间,特别是首次运行时。")
|
||
try:
|
||
# Initialize PaddleOCR
|
||
# If you want to explicitly control GPU usage, add use_gpu=USE_GPU parameter
|
||
ocr_engine = PaddleOCR(use_angle_cls=USE_ANGLE_CLS, lang=LANG) # Default: use_gpu=True (auto-detect)
|
||
print("PaddleOCR 初始化成功。")
|
||
except Exception as e:
|
||
print(f"初始化 PaddleOCR 时出错: {e}")
|
||
print("请确保已正确安装 paddleocr 和 paddlepaddle (或 paddlepaddle-gpu)。")
|
||
print("GPU 用户请确保 CUDA/cuDNN 环境配置正确。")
|
||
exit() # Exit if initialization fails
|
||
|
||
# --- Iterate through base directory's subfolders ---
|
||
print(f"开始处理目录: {BASE_IMAGE_DIR}")
|
||
|
||
if not os.path.isdir(BASE_IMAGE_DIR):
|
||
print(f"错误:基础目录 '{BASE_IMAGE_DIR}' 不存在或不是一个有效的目录。")
|
||
exit()
|
||
|
||
# --- Determine processing targets ---
|
||
paths_to_process_with_display_names = []
|
||
subfolders = []
|
||
try:
|
||
# Attempt to find subfolders
|
||
subfolders = [f for f in os.listdir(BASE_IMAGE_DIR) if os.path.isdir(os.path.join(BASE_IMAGE_DIR, f))]
|
||
except OSError as e:
|
||
print(f"错误:无法访问目录 '{BASE_IMAGE_DIR}' 来查找子文件夹。请检查权限。 {e}")
|
||
exit()
|
||
|
||
if subfolders:
|
||
print(f"找到 {len(subfolders)} 个子文件夹,将逐一处理...")
|
||
for sf_name in subfolders:
|
||
paths_to_process_with_display_names.append(
|
||
(os.path.join(BASE_IMAGE_DIR, sf_name), sf_name)
|
||
)
|
||
else:
|
||
# No subfolders, check BASE_IMAGE_DIR itself for PNGs
|
||
print(f"在 '{BASE_IMAGE_DIR}' 下没有找到任何子文件夹。正在检查该目录下是否直接包含 PNG 文件...")
|
||
try:
|
||
items_in_base_dir = os.listdir(BASE_IMAGE_DIR)
|
||
png_files_in_base_dir = [
|
||
f for f in items_in_base_dir
|
||
if f.lower().endswith('.png') and os.path.isfile(os.path.join(BASE_IMAGE_DIR, f))
|
||
]
|
||
if png_files_in_base_dir:
|
||
print(f"在 '{BASE_IMAGE_DIR}' 目录下直接找到 {len(png_files_in_base_dir)} 个 PNG 文件。将进行处理。")
|
||
display_name = os.path.basename(BASE_IMAGE_DIR.rstrip(os.sep))
|
||
if not display_name: # Handle case where BASE_IMAGE_DIR might be root "/"
|
||
display_name = "根目录中的图片"
|
||
paths_to_process_with_display_names.append((BASE_IMAGE_DIR, display_name))
|
||
else:
|
||
print(f"在 '{BASE_IMAGE_DIR}' 下既没有找到子文件夹,也没有直接找到 PNG 文件。脚本将退出。")
|
||
exit()
|
||
except OSError as e:
|
||
print(f"错误:无法访问或读取 '{BASE_IMAGE_DIR}' 的内容以查找PNG文件。 {e}")
|
||
exit()
|
||
|
||
if not paths_to_process_with_display_names:
|
||
print("没有找到可处理的目录或PNG文件。脚本将退出。")
|
||
exit()
|
||
|
||
# --- Process each identified path ---
|
||
for current_scan_path, name_for_display in paths_to_process_with_display_names:
|
||
print(f"\n--- 正在处理目标: {name_for_display} (路径: {current_scan_path}) ---")
|
||
|
||
png_filenames_sorted = [] # Store sorted PNG filenames
|
||
|
||
# --- Find and sort PNG files in the current path ---
|
||
try:
|
||
all_items_in_path = os.listdir(current_scan_path)
|
||
# Filter for PNG files only and sort them
|
||
png_filenames = [
|
||
f for f in all_items_in_path
|
||
if f.lower().endswith('.png') and os.path.isfile(os.path.join(current_scan_path, f))
|
||
]
|
||
png_filenames.sort() # Sort alphabetically (dictionary order)
|
||
png_filenames_sorted = png_filenames
|
||
|
||
if not png_filenames_sorted:
|
||
print(f" 在 '{name_for_display}' (路径: {current_scan_path}) 中未找到 PNG 图片。")
|
||
continue # Skip to the next target
|
||
|
||
except OSError as e:
|
||
print(f" 错误:无法访问 '{name_for_display}' (路径: {current_scan_path}) 或读取其内容。跳过此目标。 {e}")
|
||
continue # Skip to the next target
|
||
|
||
print(f" 找到 {len(png_filenames_sorted)} 个 PNG 文件,将按字典序处理...")
|
||
|
||
# --- Process PNG files in sorted order ---
|
||
for filename in png_filenames_sorted:
|
||
image_path = os.path.join(current_scan_path, filename)
|
||
print(f" 正在识别图片 (顺序: {png_filenames_sorted.index(filename) + 1}/{len(png_filenames_sorted)}): {filename} ...")
|
||
|
||
try:
|
||
start_time = time.time()
|
||
current_image_texts = [] # To store all texts from this image (chunks or whole)
|
||
|
||
# Load image with Pillow
|
||
pil_image = Image.open(image_path)
|
||
img_width, img_height = pil_image.size
|
||
|
||
if img_height > MAX_CHUNK_HEIGHT:
|
||
print(f" 图片高度 {img_height}px 超过限制 {MAX_CHUNK_HEIGHT}px,将进行分块处理...")
|
||
num_chunks = (img_height + MAX_CHUNK_HEIGHT - 1) // MAX_CHUNK_HEIGHT # Ceiling division
|
||
|
||
for i in range(num_chunks):
|
||
top = i * MAX_CHUNK_HEIGHT
|
||
bottom = min((i + 1) * MAX_CHUNK_HEIGHT, img_height)
|
||
chunk_box = (0, top, img_width, bottom)
|
||
|
||
pil_chunk = pil_image.crop(chunk_box)
|
||
|
||
# Convert PIL chunk to BGR NumPy array for PaddleOCR
|
||
# PaddleOCR expects BGR format if a numpy array is provided.
|
||
if pil_chunk.mode != 'RGB':
|
||
pil_chunk = pil_chunk.convert('RGB')
|
||
img_rgb_np = np.array(pil_chunk)
|
||
img_bgr_np = cv2.cvtColor(img_rgb_np, cv2.COLOR_RGB2BGR)
|
||
|
||
print(f" 正在识别分块 {i+1}/{num_chunks} (像素 {top}-{bottom})...")
|
||
# Result structure: [[line_info_1], [line_info_2], ...]
|
||
# line_info: [[[box_coords], (text, confidence)]]
|
||
# We need the first element of the result for a single image/chunk.
|
||
ocr_output_for_chunk_list = ocr_engine.ocr(img_bgr_np, cls=USE_ANGLE_CLS)
|
||
|
||
if ocr_output_for_chunk_list and ocr_output_for_chunk_list[0]: # Check if list is not empty and first element (page result) exists
|
||
ocr_output_for_chunk = ocr_output_for_chunk_list[0]
|
||
for line_info in ocr_output_for_chunk: # Iterate through detected lines in the chunk
|
||
# line_info format is [[box_coords], (text, confidence)]
|
||
if line_info and len(line_info) == 2 and isinstance(line_info[1], tuple) and len(line_info[1]) >= 1:
|
||
current_image_texts.append(line_info[1][0]) # Append text
|
||
if num_chunks > 0: # only print if chunking actually happened
|
||
print(f" 所有分块识别完毕。")
|
||
else: # Process the image as a whole (height <= MAX_CHUNK_HEIGHT)
|
||
# Convert PIL image to BGR NumPy array for PaddleOCR
|
||
pil_to_ocr = pil_image
|
||
if pil_to_ocr.mode != 'RGB':
|
||
pil_to_ocr = pil_to_ocr.convert('RGB')
|
||
img_rgb_np = np.array(pil_to_ocr)
|
||
img_bgr_np = cv2.cvtColor(img_rgb_np, cv2.COLOR_RGB2BGR)
|
||
|
||
# Result structure: [[line_info_1], [line_info_2], ...]
|
||
ocr_output_whole_image_list = ocr_engine.ocr(img_bgr_np, cls=USE_ANGLE_CLS)
|
||
|
||
if ocr_output_whole_image_list and ocr_output_whole_image_list[0]: # Check if list is not empty and first element (page result) exists
|
||
ocr_output_whole_image = ocr_output_whole_image_list[0]
|
||
for line_info in ocr_output_whole_image: # Iterate through detected lines
|
||
if line_info and len(line_info) == 2 and isinstance(line_info[1], tuple) and len(line_info[1]) >= 1:
|
||
current_image_texts.append(line_info[1][0])
|
||
|
||
end_time = time.time()
|
||
|
||
# Extracted text from all chunks (or whole image) is in current_image_texts
|
||
if current_image_texts:
|
||
print(f" 识别到 {len(current_image_texts)} 段文本,总耗时: {end_time - start_time:.2f} 秒")
|
||
|
||
# --- 为当前图片写入 .txt 文件 ---
|
||
base_img_filename, _ = os.path.splitext(filename)
|
||
output_txt_filename_for_image = f"{base_img_filename}.txt"
|
||
output_txt_path_for_image = os.path.join(current_scan_path, output_txt_filename_for_image)
|
||
|
||
try:
|
||
with open(output_txt_path_for_image, 'w', encoding='utf-8') as f_img:
|
||
for text_segment in current_image_texts:
|
||
f_img.write(text_segment + '\n')
|
||
print(f" 已将识别结果写入文件: {output_txt_path_for_image}")
|
||
except IOError as e:
|
||
print(f" 错误:无法写入输出文件 '{output_txt_path_for_image}'. {e}")
|
||
else:
|
||
print(" 未识别到文本。")
|
||
|
||
except Exception as e:
|
||
print(f" 处理图片 '{filename}' 时发生错误: {e}")
|
||
continue
|
||
|
||
print("\n--- 所有目标处理完毕 ---")
|
||
|