VideoAgentTrek-ScreenFilter Python接口调用实战:从环境搭建到批量处理

张开发
2026/4/7 5:20:40 15 分钟阅读

分享文章

VideoAgentTrek-ScreenFilter Python接口调用实战:从环境搭建到批量处理
VideoAgentTrek-ScreenFilter Python接口调用实战从环境搭建到批量处理你是不是也遇到过这样的烦恼手头有一堆视频素材但里面混杂着各种录屏、演示界面真正有用的内容被干扰手动一帧帧去筛选简直让人崩溃。最近我在处理一批教学视频时就碰到了这个问题直到我尝试了VideoAgentTrek-ScreenFilter。简单来说它能自动识别视频中的屏幕内容比如PPT、代码编辑器、软件界面然后帮你把这些部分过滤掉或者单独提取出来。听起来是不是很实用但光有模型服务还不够关键是怎么把它用起来。今天我就来分享一下如何在星图GPU平台上部署好VideoAgentTrek-ScreenFilter之后通过Python来调用它从最简单的单视频处理到复杂的批量任务管理一步步带你打通整个流程。如果你熟悉Python想把AI视频处理能力集成到自己的项目里那这篇内容应该能帮到你。1. 动手之前环境与依赖准备在开始写代码之前我们得先把“战场”布置好。这里假设你已经按照星图平台的指引成功部署了VideoAgentTrek-ScreenFilter的镜像服务并且拿到了访问地址比如一个IP和端口。我们的所有操作都将基于这个服务端点。1.1 安装必要的Python包我们主要会用到两个库requests用于HTTP通信concurrent.futures用于处理并发任务。通常它们都是内置或极易安装的。打开你的终端或命令行创建一个新的项目目录然后确保安装了requestspip install requests如果需要对视频进行一些本地预处理比如检查格式、时长你可能还需要opencv-python或moviepy但就核心的API调用而言requests足够了。# 可选用于本地视频信息读取 pip install opencv-python pip install moviepy1.2 准备你的第一个测试视频找一个短小的视频文件比如10-30秒的MP4文件放在项目目录里。最好这个视频里包含一些明显的屏幕内容比如电脑桌面、软件窗口这样处理效果一目了然。我们给它起个简单的名字比如test_video.mp4。2. 核心第一步发起一个处理请求万事俱备现在我们来写第一个脚本目标是把本地的视频文件发送给ScreenFilter服务并拿到处理结果。2.1 理解API的基本格式VideoAgentTrek-ScreenFilter通常提供一个HTTP API接口。最核心的端点Endpoint可能是一个/process或/inference之类的URL。我们需要以POST请求的方式将视频文件和数据发送过去。请求里一般需要包含视频文件作为表单数据multipart/form-data的一部分上传。处理参数以JSON格式指定比如告诉模型是要“过滤屏幕”还是“提取屏幕”或者设置置信度阈值。服务处理完成后会返回一个JSON响应。这个响应里可能包含task_id本次处理任务的唯一ID用于后续查询。status任务状态如processing,success,failed。result_url或result_path处理后的视频文件下载地址如果服务直接返回文件流则可能内嵌在响应中。2.2 编写单视频处理函数下面是一个最基础的Python函数它完成了上述过程import requests import json import time def process_single_video(api_base_url, video_file_path, modefilter, threshold0.5): 发送单个视频文件进行处理。 参数: api_base_url (str): 模型服务的基地址例如 http://127.0.0.1:8080 video_file_path (str): 本地视频文件的路径。 mode (str): 处理模式如 filter (过滤屏幕), extract (提取屏幕)。 threshold (float): 屏幕内容检测的置信度阈值范围通常为0-1。 返回: dict: API的JSON响应内容。 # 1. 构造完整的API端点 process_url f{api_base_url.rstrip(/)}/process # 2. 准备请求数据 # 文件部分 with open(video_file_path, rb) as f: files {video: (video_file_path, f, video/mp4)} # 参数部分 data { mode: mode, threshold: str(threshold) } # 3. 发送POST请求 print(f正在发送视频处理请求: {video_file_path}) try: response requests.post(process_url, filesfiles, datadata, timeout60) response.raise_for_status() # 如果状态码不是200会抛出HTTPError异常 result response.json() print(请求成功响应内容) print(json.dumps(result, indent2, ensure_asciiFalse)) return result except requests.exceptions.RequestException as e: print(f请求失败: {e}) if hasattr(e, response) and e.response is not None: print(f错误响应: {e.response.text}) return None # 使用示例 if __name__ __main__: API_BASE http://你的服务IP:端口 # 请替换为你的实际地址 VIDEO_PATH ./test_video.mp4 # 调用函数尝试过滤掉屏幕内容 api_result process_single_video(API_BASE, VIDEO_PATH, modefilter, threshold0.7) # 假设返回了任务ID我们可以用它来查询或下载结果 if api_result and task_id in api_result: task_id api_result[task_id] print(f任务已提交任务ID: {task_id})代码说明我们使用requests.post方法通过files参数上传视频通过data参数传递其他配置。response.raise_for_status()是一个好习惯它能帮我们快速发现HTTP层面的错误如404 500。返回的结果被解析为JSON字典方便我们提取信息。运行这个脚本如果一切顺利你应该能在控制台看到服务返回的JSON数据里面包含了你的任务ID。3. 进阶操作任务状态查询与结果获取在很多实际场景中视频处理不是瞬间完成的尤其是长视频。服务更常见的模式是“异步处理”你提交任务立刻得到一个task_id然后你需要用这个ID去轮询任务状态等处理完成后再获取结果。3.1 轮询任务状态我们来写一个轮询函数它每隔一段时间就去问问服务“我那个任务搞定了没”def poll_task_status(api_base_url, task_id, poll_interval2, max_polls30): 轮询查询任务状态直到完成或超时。 参数: api_base_url (str): 模型服务的基地址。 task_id (str): 需要查询的任务ID。 poll_interval (int): 每次查询的间隔时间秒。 max_polls (int): 最大轮询次数防止无限等待。 返回: dict: 任务最终状态的响应内容如果超时或失败则返回None。 status_url f{api_base_url.rstrip(/)}/task/status params {task_id: task_id} for i in range(max_polls): try: resp requests.get(status_url, paramsparams, timeout10) resp.raise_for_status() status_info resp.json() current_status status_info.get(status) print(f轮询 {i1}/{max_polls} - 任务状态: {current_status}) if current_status success: print(任务处理成功) return status_info elif current_status failed: print(f任务处理失败。详情: {status_info.get(message, 无)}) return status_info elif current_status processing: # 还在处理中继续等待 time.sleep(poll_interval) else: # 未知状态 print(f遇到未知状态: {current_status}) time.sleep(poll_interval) except requests.exceptions.RequestException as e: print(f第{i1}次轮询请求异常: {e}) time.sleep(poll_interval) print(f轮询超过{max_polls}次任务仍未完成已超时。) return None # 结合第一个函数使用 if __name__ __main__: API_BASE http://你的服务IP:端口 VIDEO_PATH ./test_video.mp4 # 1. 提交任务 submit_result process_single_video(API_BASE, VIDEO_PATH) if not submit_result: exit() task_id submit_result.get(task_id) if not task_id: print(响应中未找到任务ID。) exit() # 2. 轮询状态 final_status poll_task_status(API_BASE, task_id)3.2 下载处理结果当轮询到状态为success时响应里通常会包含结果文件的URL或路径。我们需要写一个函数来下载它。def download_result(api_base_url, task_id, save_path./output_video.mp4): 根据任务ID下载处理后的视频结果。 参数: api_base_url (str): 模型服务的基地址。 task_id (str): 已完成的任务ID。 save_path (str): 结果视频保存到本地的路径。 返回: bool: 下载是否成功。 download_url f{api_base_url.rstrip(/)}/task/result params {task_id: task_id} try: print(f正在下载任务 {task_id} 的结果...) response requests.get(download_url, paramsparams, streamTrue, timeout60) response.raise_for_status() # 检查响应头确认是视频文件 content_type response.headers.get(content-type, ) if video not in content_type and application/octet-stream not in content_type: print(f警告响应内容类型不是视频: {content_type}) # 可能是错误信息打印出来看看 print(response.text[:200]) return False # 以二进制流的方式写入文件 with open(save_path, wb) as f: for chunk in response.iter_content(chunk_size8192): f.write(chunk) print(f结果视频已成功下载到: {save_path}) return True except requests.exceptions.RequestException as e: print(f下载结果失败: {e}) return False # 整合到主流程中 if __name__ __main__: # ... 之前的提交和轮询代码 ... # 3. 如果任务成功下载结果 if final_status and final_status.get(status) success: # 假设结果文件链接在 result_url 字段中或者我们使用任务ID下载 # 这里演示使用任务ID下载的通用方法 download_success download_result(API_BASE, task_id, save_pathf./filtered_{task_id}.mp4) if download_success: print(恭喜整个处理流程已完成。)4. 生产力提升实现批量视频处理处理单个视频只是开始我们真正需要的是批量处理能力。我们可以利用Python的并发特性同时提交多个任务并高效地管理它们。4.1 使用线程池进行并发提交下面的代码演示了如何扫描一个文件夹下的所有视频文件并利用线程池并发提交处理请求。import os from concurrent.futures import ThreadPoolExecutor, as_completed def batch_process_videos(api_base_url, video_dir, output_dir./batch_output, max_workers3): 批量处理一个目录下的所有视频文件。 参数: api_base_url (str): 模型服务的基地址。 video_dir (str): 存放待处理视频的目录路径。 output_dir (str): 结果保存目录。 max_workers (int): 并发线程数根据你的机器和服务负载调整。 # 支持的视频格式 video_extensions (.mp4, .avi, .mov, .mkv, .flv) video_files [] for f in os.listdir(video_dir): if f.lower().endswith(video_extensions): video_files.append(os.path.join(video_dir, f)) if not video_files: print(f在目录 {video_dir} 中未找到视频文件。) return print(f找到 {len(video_files)} 个待处理视频。) os.makedirs(output_dir, exist_okTrue) # 用于存储任务ID和源文件路径的映射 task_map {} # 使用线程池提交任务 with ThreadPoolExecutor(max_workersmax_workers) as executor: future_to_file {} for vf in video_files: # 为每个视频提交一个处理任务 future executor.submit(process_single_video, api_base_url, vf) future_to_file[future] vf # 收集提交结果 for future in as_completed(future_to_file): video_file future_to_file[future] try: result future.result(timeout60) # 设置单个任务提交超时 if result and task_id in result: task_id result[task_id] task_map[task_id] video_file print(f视频 {os.path.basename(video_file)} 提交成功任务ID: {task_id}) else: print(f视频 {os.path.basename(video_file)} 提交失败或无任务ID。) except Exception as exc: print(f视频 {os.path.basename(video_file)} 提交时产生异常: {exc}) print(f\n所有视频提交完成。共 {len(task_map)} 个任务进入处理队列。) return task_map # 使用示例 if __name__ __main__: API_BASE http://你的服务IP:端口 INPUT_DIR ./videos_to_process OUTPUT_DIR ./processed_videos task_info batch_process_videos(API_BASE, INPUT_DIR, OUTPUT_DIR) # task_info 是一个字典格式为 {task_id: video_file_path} print(task_info)4.2 批量轮询与结果收集提交之后我们需要一个更智能的轮询器来批量检查所有任务的状态并下载已完成的任务。def batch_poll_and_download(api_base_url, task_map, output_dir, poll_interval5): 批量轮询任务状态并自动下载已完成的任务。 参数: api_base_url (str): 模型服务的基地址。 task_map (dict): 任务ID到源文件路径的映射。 output_dir (str): 结果保存目录。 poll_interval (int): 批量轮询的间隔时间秒。 pending_tasks list(task_map.keys()) # 待处理的任务ID列表 completed_tasks [] while pending_tasks: print(f\n 开始新一轮轮询剩余 {len(pending_tasks)} 个任务 ) current_round_tasks pending_tasks.copy() for task_id in current_round_tasks: status_url f{api_base_url.rstrip(/)}/task/status try: resp requests.get(status_url, params{task_id: task_id}, timeout10) if resp.status_code 200: status_info resp.json() task_status status_info.get(status) if task_status success: print(f任务 {task_id} 已完成。) # 下载结果 source_path task_map[task_id] base_name os.path.basename(source_path) name_without_ext os.path.splitext(base_name)[0] save_name f{name_without_ext}_filtered_{task_id[:8]}.mp4 save_path os.path.join(output_dir, save_name) if download_result(api_base_url, task_id, save_path): completed_tasks.append(task_id) pending_tasks.remove(task_id) else: print(f任务 {task_id} 结果下载失败下次继续尝试。) elif task_status failed: print(f任务 {task_id} 处理失败。) pending_tasks.remove(task_id) # 从待处理列表中移除失败任务 elif task_status processing: # 仍在处理留在列表中 pass else: print(f任务 {task_id} 状态未知: {task_status}) else: print(f查询任务 {task_id} 状态失败HTTP {resp.status_code}) except requests.exceptions.RequestException as e: print(f轮询任务 {task_id} 时发生请求异常: {e}) # 如果还有任务未完成等待一段时间再继续 if pending_tasks: print(f等待 {poll_interval} 秒后继续轮询...) time.sleep(poll_interval) print(f\n所有任务处理完毕成功完成 {len(completed_tasks)} 个任务。) # 整合批量流程 if __name__ __main__: API_BASE http://你的服务IP:端口 INPUT_DIR ./videos_to_process OUTPUT_DIR ./processed_videos # 1. 批量提交 task_info batch_process_videos(API_BASE, INPUT_DIR, OUTPUT_DIR) if task_info: # 2. 批量轮询并下载 batch_poll_and_download(API_BASE, task_info, OUTPUT_DIR)5. 总结与实用建议走完这一套流程你应该已经能够熟练地通过Python来驱动VideoAgentTrek-ScreenFilter服务了。从单个视频的试水到构建一个健壮的、支持并发和错误处理的批量处理流水线核心思路就是围绕HTTP API进行封装。在实际集成到你的项目时还有几个小建议错误处理要细致网络超时、服务重启、视频格式不支持、磁盘空间不足等情况都可能发生。上面的示例代码提供了基本的异常捕获但在生产环境中可能需要更完善的重试机制和日志记录。参数可以更丰富除了mode和thresholdScreenFilter服务可能还支持输出分辨率、编解码器、特定区域过滤等高级参数。仔细阅读服务的API文档能让处理结果更符合你的预期。资源管理并发数max_workers不是越大越好需要根据你部署服务的机器性能特别是GPU内存和网络带宽来调整避免把服务“打挂”。结果管理对于大量视频的长期处理建议将任务ID、状态、源文件路径、结果路径存入数据库如SQLite或MySQL方便追踪和管理。整体用下来通过Python来调用这类AI服务思路其实很清晰封装请求、处理响应、管理状态。剩下的就是根据实际业务需求像搭积木一样把这些模块组合起来。希望这些代码片段能成为你项目中的一个有力起点。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。

更多文章