Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion runtime/ops/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ def _import_operators():
from . import remove_duplicate_sentences
from . import knowledge_relation_slice
from . import pii_ner_detection

# ===== Video operators (PR1-PR5) =====
from . import _video_common
from . import video_format_convert
from . import video_sensitive_detect
from . import video_sensitive_crop
from . import video_mot_track
from . import video_subject_crop
from . import video_classify_qwenvl
from . import video_summary_qwenvl
from . import video_event_tag_qwenvl
from . import video_keyframe_extract
from . import video_deborder_crop
from . import video_audio_extract
from . import video_speech_asr
from . import video_subtitle_ocr
from . import video_text_ocr

_import_operators()
1 change: 1 addition & 0 deletions runtime/ops/mapper/_video_common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
117 changes: 117 additions & 0 deletions runtime/ops/mapper/_video_common/ffmpeg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
import os
import subprocess

def run_cmd(cmd, logger=None):
if logger:
logger.info("FFmpeg cmd: " + " ".join(cmd))
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0:
msg = f"FFmpeg failed (code={p.returncode}).\nSTDOUT:\n{p.stdout}\nSTDERR:\n{p.stderr}"
raise RuntimeError(msg)
return p.stdout, p.stderr

def convert_to_mp4_h264(
in_path: str,
out_path: str,
crf: int = 23,
preset: str = "veryfast",
audio: bool = True,
fps: int = None,
scale: str = None, # e.g. "1280:720" or None
logger=None,
):
"""
最通用的“交付格式”:mp4(H.264) + yuv420p
- crf 越小质量越高,体积越大(18~28常用)
- preset 越慢压缩越好但越耗时(veryfast/fast/medium)
"""
os.makedirs(os.path.dirname(out_path), exist_ok=True)

cmd = ["ffmpeg", "-y", "-i", in_path]

# 视频参数
cmd += ["-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", preset, "-crf", str(crf)]

# 可选 fps / scale
if fps is not None:
cmd += ["-r", str(int(fps))]
if scale is not None:
cmd += ["-vf", f"scale={scale}"]

# 音频
if audio:
cmd += ["-c:a", "aac", "-b:a", "128k"]
else:
cmd += ["-an"]

cmd += [out_path]
return run_cmd(cmd, logger=logger)

def transcode_any(
in_path: str,
out_path: str,
vcodec: str = "libx264",
acodec: str = "aac",
pix_fmt: str = "yuv420p",
crf: int = 23,
preset: str = "veryfast",
vbitrate: str = None, # e.g. "2M"
abitrate: str = "128k",
fps: int = None,
scale: str = None, # e.g. "1280:720"
extra_args: list = None,
logger=None,
):
"""
通用转码:支持任意容器/编码器组合
- vcodec/acodec 支持 'copy'(封装重打包或直接流拷贝)
- out_path 后缀决定容器格式:.mp4/.mkv/.mov/.avi/.wmv...
"""
os.makedirs(os.path.dirname(out_path), exist_ok=True)
cmd = ["ffmpeg", "-y", "-i", in_path]

# video
cmd += ["-c:v", vcodec]
if vcodec != "copy":
cmd += ["-pix_fmt", pix_fmt]
if crf is not None:
cmd += ["-crf", str(crf)]
if preset:
cmd += ["-preset", preset]
if vbitrate:
cmd += ["-b:v", str(vbitrate)]

# fps/scale
if fps is not None:
cmd += ["-r", str(int(fps))]
if scale is not None:
cmd += ["-vf", f"scale={scale}"]

# audio
cmd += ["-c:a", acodec]
if acodec != "copy":
if abitrate:
cmd += ["-b:a", str(abitrate)]

if extra_args:
cmd += list(extra_args)

cmd += [out_path]
return run_cmd(cmd, logger=logger)



def cut_segment(in_path: str, out_path: str, start: float, end: float, logger=None):
os.makedirs(os.path.dirname(out_path), exist_ok=True)
cmd = ["ffmpeg", "-y", "-ss", str(start), "-to", str(end), "-i", in_path, "-c", "copy", out_path]
return run_cmd(cmd, logger=logger)

def concat_segments(segment_paths, out_path: str, logger=None):
os.makedirs(os.path.dirname(out_path), exist_ok=True)
list_file = out_path + ".txt"
with open(list_file, "w", encoding="utf-8") as f:
for p in segment_paths:
f.write(f"file '{os.path.abspath(p)}'\n")
cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", out_path]
return run_cmd(cmd, logger=logger)
13 changes: 13 additions & 0 deletions runtime/ops/mapper/_video_common/io_video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
import cv2

def get_video_info(video_path: str):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
return fps, width, height, frames
23 changes: 23 additions & 0 deletions runtime/ops/mapper/_video_common/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
import logging
import os

def get_logger(name: str, log_dir: str = None):
logger = logging.getLogger(name)
if logger.handlers:
return logger

logger.setLevel(logging.INFO)
fmt = logging.Formatter("[%(asctime)s] [%(levelname)s] %(message)s")

sh = logging.StreamHandler()
sh.setFormatter(fmt)
logger.addHandler(sh)

if log_dir:
os.makedirs(log_dir, exist_ok=True)
fh = logging.FileHandler(os.path.join(log_dir, "run.log"), encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(fh)

return logger
28 changes: 28 additions & 0 deletions runtime/ops/mapper/_video_common/model_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os

def get_model_root(params=None) -> str:
"""
模型根目录优先级:
1) params['model_root']
2) 环境变量 DATAMATE_MODEL_ROOT
3) 默认 /mnt/models
"""
params = params or {}
return params.get("model_root") or os.environ.get("DATAMATE_MODEL_ROOT") or "/mnt/models"


def resolve_model_path(params, param_key: str, default_rel: str) -> str:
"""
解析模型路径:
- 如果 params[param_key] 是绝对路径:直接用
- 如果是相对路径:拼到 model_root
- 如果没传:用 model_root + default_rel
"""
params = params or {}
root = get_model_root(params)

v = params.get(param_key)
if v:
return v if os.path.isabs(v) else os.path.join(root, v)

return os.path.join(root, default_rel)
18 changes: 18 additions & 0 deletions runtime/ops/mapper/_video_common/paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
import os
import time
import uuid

def ensure_dir(p: str):
os.makedirs(p, exist_ok=True)
return p

def make_run_dir(export_path: str, op_name: str):
"""
统一输出目录:{export_path}/{op_name}/{timestamp_uuid}/
"""
ts = time.strftime("%Y%m%d_%H%M%S")
run_id = f"{ts}_{uuid.uuid4().hex[:8]}"
out_dir = os.path.join(export_path, op_name, run_id)
ensure_dir(out_dir)
return out_dir
42 changes: 42 additions & 0 deletions runtime/ops/mapper/_video_common/qwen_http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
import os
import json
import cv2
import requests

def qwenvl_infer_by_image_path(
image_path: str,
task: str,
service_url: str = "http://127.0.0.1:18080",
max_new_tokens: int = 64,
language: str = "zh",
style: str = "normal",
timeout: int = 180,
):
"""
对齐你当前服务端 qwen_vl_server.py 的接口:
POST {service_url}/infer
JSON: {image_path, task, max_new_tokens, language, style}

返回:服务端 jsonify 的 dict
"""
sess = requests.Session()
sess.trust_env = False # 避免系统代理拦 localhost

payload = {
"image_path": image_path,
"task": task,
"max_new_tokens": int(max_new_tokens),
"language": language,
"style": style,
}
r = sess.post(service_url.rstrip("/") + "/infer", json=payload, timeout=timeout)
r.raise_for_status()
return r.json()

def save_frame_to_jpg(frame_bgr, out_path: str):
os.makedirs(os.path.dirname(out_path), exist_ok=True)
ok = cv2.imwrite(out_path, frame_bgr)
if not ok:
raise RuntimeError(f"failed to write jpg: {out_path}")
return out_path
9 changes: 9 additions & 0 deletions runtime/ops/mapper/_video_common/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
def init_tracks_schema(video_path, fps, width, height):
return {
"video": video_path,
"fps": float(fps),
"width": int(width),
"height": int(height),
"frames": [] # {"frame_id": i, "objects":[{"track_id":..,"bbox":[..],...}]}
}
6 changes: 6 additions & 0 deletions runtime/ops/mapper/video_audio_extract/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from datamate.core.base_op import OPERATORS

OPERATORS.register_module(
module_name="VideoAudioExtract",
module_path="ops.mapper.video_audio_extract.process",
)
16 changes: 16 additions & 0 deletions runtime/ops/mapper/video_audio_extract/metadata.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: '视频抽取音频'
name_en: 'Video Audio Extract'
description: '从视频中抽取音频,默认输出 wav(16k/mono);也可输出 aac,并生成音频信息 audio_info.json。'
description_en: 'Extract audio from video, default wav (16k/mono); can output aac; also generates audio_info.json.'
language: 'python'
vendor: 'huawei'
raw_id: 'VideoAudioExtract'
version: '1.0.0'
types:
- 'cleaning'
modal: 'video'
effect:
before: ''
after: ''
inputs: 'video'
outputs: 'audio'
81 changes: 81 additions & 0 deletions runtime/ops/mapper/video_audio_extract/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
import os
import json
import shutil
import subprocess

from .._video_common.paths import make_run_dir, ensure_dir
from .._video_common.log import get_logger


class VideoAudioExtract:
"""从视频提取音频(wav 16k mono)

params:
- ffmpeg_path: str, optional
- sample_rate: int, default 16000
- channels: int, default 1
- out_format: wav|aac, default wav

outputs:
- artifacts/audio.wav (or audio.aac)
- artifacts/audio_info.json
"""

@staticmethod
def execute(sample, params):
video_path = sample["filePath"]
export_path = sample.get("export_path", "./outputs")

op_name = "video_audio_extract"
out_dir = make_run_dir(export_path, op_name)
log_dir = ensure_dir(os.path.join(out_dir, "logs"))
art_dir = ensure_dir(os.path.join(out_dir, "artifacts"))

logger = get_logger(op_name, log_dir)
logger.info(f"video={video_path}")
logger.info(f"out_dir={out_dir}")

ffmpeg_path = params.get("ffmpeg_path") or shutil.which("ffmpeg")
if not ffmpeg_path:
raise RuntimeError("ffmpeg not found. Please install ffmpeg or pass params.ffmpeg_path")

sr = int(params.get("sample_rate", 16000))
ch = int(params.get("channels", 1))
out_format = (params.get("out_format", "wav") or "wav").lower()

if out_format == "aac":
audio_path = os.path.join(art_dir, "audio.aac")
cmd = [
ffmpeg_path, "-hide_banner", "-y",
"-i", video_path,
"-vn",
"-ac", str(ch),
"-ar", str(sr),
"-c:a", "aac",
audio_path
]
else:
audio_path = os.path.join(art_dir, "audio.wav")
cmd = [
ffmpeg_path, "-hide_banner", "-y",
"-i", video_path,
"-vn",
"-ac", str(ch),
"-ar", str(sr),
"-c:a", "pcm_s16le",
audio_path
]

logger.info("FFmpeg cmd: " + " ".join(cmd))
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0:
raise RuntimeError(f"FFmpeg failed (code={p.returncode}).\nSTDERR:\n{p.stderr}")

info = {"audio_path": audio_path, "sample_rate": sr, "channels": ch, "format": out_format}
info_path = os.path.join(art_dir, "audio_info.json")
with open(info_path, "w", encoding="utf-8") as f:
json.dump(info, f, ensure_ascii=False, indent=2)

logger.info(f"Done. audio={audio_path}")
return {"out_dir": out_dir, "audio_path": audio_path, "audio_info": info_path}
Loading