src.ffmpeg_quality_metrics
1import importlib.metadata 2 3from .ffmpeg_quality_metrics import ( 4 FfmpegQualityMetrics, 5 FfmpegQualityMetricsError, 6 GlobalStats, 7 GlobalStatsData, 8 MetricData, 9 MetricName, 10 SingleMetricData, 11 VmafOptions, 12) 13 14__version__ = importlib.metadata.version("ffmpeg-quality-metrics") 15__all__ = [ 16 "FfmpegQualityMetrics", 17 "FfmpegQualityMetricsError", 18 "VmafOptions", 19 "MetricName", 20 "SingleMetricData", 21 "GlobalStatsData", 22 "GlobalStats", 23 "MetricData", 24 "__version__", 25]
84class FfmpegQualityMetrics: 85 """ 86 A class to calculate quality metrics with FFmpeg 87 """ 88 89 ALLOWED_SCALERS = [ 90 "fast_bilinear", 91 "bilinear", 92 "bicubic", 93 "experimental", 94 "neighbor", 95 "area", 96 "bicublin", 97 "gauss", 98 "sinc", 99 "lanczos", 100 "spline", 101 ] 102 DEFAULT_SCALER = "bicubic" 103 DEFAULT_THREADS = 0 104 105 DEFAULT_VMAF_THREADS = 0 # used to be os.cpu_count(), now auto 106 DEFAULT_VMAF_SUBSAMPLE = 1 # sample every frame 107 DEFAULT_VMAF_MODEL_DIRECTORY = os.path.join( 108 os.path.dirname(__file__), "vmaf_models" 109 ) 110 DEFAULT_VMAF_OPTIONS: VmafOptions = { 111 "model_path": None, 112 "model_params": [], 113 "n_threads": DEFAULT_VMAF_THREADS, 114 "n_subsample": DEFAULT_VMAF_SUBSAMPLE, 115 "features": [], 116 } 117 POSSIBLE_FILTERS: List[FilterName] = [ 118 "libvmaf", 119 "psnr", 120 "ssim", 121 "vif", 122 "msad", 123 ] 124 METRIC_TO_FILTER_MAP: Dict[MetricName, FilterName] = { 125 "vmaf": "libvmaf", 126 "psnr": "psnr", 127 "ssim": "ssim", 128 "vif": "vif", 129 "msad": "msad", 130 } 131 132 def __init__( 133 self, 134 ref: str, 135 dist: str, 136 scaling_algorithm: str = DEFAULT_SCALER, 137 framerate: Union[float, None] = None, 138 dist_delay: float = 0, 139 dry_run: Union[bool, None] = False, 140 verbose: Union[bool, None] = False, 141 threads: int = DEFAULT_THREADS, 142 progress: Union[bool, None] = False, 143 keep_tmp_files: Union[bool, None] = False, 144 tmp_dir: Union[str, None] = None, 145 ): 146 """Instantiate a new FfmpegQualityMetrics 147 148 Args: 149 ref (str): reference file 150 dist (str): distorted file 151 scaling_algorithm (str, optional): A scaling algorithm. Must be one of the following: ["fast_bilinear", "bilinear", "bicubic", "experimental", "neighbor", "area", "bicublin", "gauss", "sinc", "lanczos", "spline"]. Defaults to "bicubic" 152 framerate (float, optional): Force a frame rate. Defaults to None. 153 dist_delay (float): Delay the distorted file against the reference by this amount of seconds. Defaults to 0. 154 dry_run (bool, optional): Don't run anything, just print commands. Defaults to False. 155 verbose (bool, optional): Show more output. Defaults to False. 156 threads (int, optional): Number of ffmpeg threads. Defaults to 0 (auto). 157 progress (bool, optional): Show a progress bar. Defaults to False. 158 keep_tmp_files (bool, optional): Keep temporary files for debugging purposes. Defaults to False. 159 tmp_dir (str, optional): Directory to store temporary files. Will use system default if not specified. Defaults to None. 160 161 Raises: 162 FfmpegQualityMetricsError: A generic error 163 """ 164 self.ref = str(ref) 165 self.dist = str(dist) 166 self.scaling_algorithm = str(scaling_algorithm) 167 self.framerate = float(framerate) if framerate is not None else None 168 self.dist_delay = float(dist_delay) 169 self.dry_run = bool(dry_run) 170 self.verbose = bool(verbose) 171 self.threads = int(threads) 172 self.progress = bool(progress) 173 self.keep_tmp_files = bool(keep_tmp_files) 174 self.tmp_dir = str(tmp_dir) if tmp_dir is not None else tempfile.gettempdir() 175 176 if not os.path.isfile(self.ref): 177 raise FfmpegQualityMetricsError(f"Reference file not found: {self.ref}") 178 if not os.path.isfile(self.dist): 179 raise FfmpegQualityMetricsError(f"Distorted file not found: {self.dist}") 180 181 if self.ref == self.dist: 182 logger.warning( 183 "Reference and distorted files are the same! This may lead to unexpected results or numerical issues." 184 ) 185 186 if ref.endswith(".yuv") or dist.endswith(".yuv"): 187 raise FfmpegQualityMetricsError( 188 "YUV files are not supported, please convert to a format that ffmpeg can read natively, such as Y4M or FFV1." 189 ) 190 191 self.data: MetricData = { 192 "vmaf": [], 193 "psnr": [], 194 "ssim": [], 195 "vif": [], 196 "msad": [], 197 } 198 199 self.available_filters: List[str] = [] 200 201 self.global_stats: GlobalStats = {} 202 203 if not os.path.isdir(self.tmp_dir): 204 logger.debug(f"Creating temporary directory: {self.tmp_dir}") 205 os.makedirs(self.tmp_dir) 206 self.temp_files: Dict[FilterName, str] = {} 207 208 for filter_name in self.POSSIBLE_FILTERS: 209 suffix = "txt" if filter_name != "libvmaf" else "json" 210 211 self.temp_files[cast(FilterName, filter_name)] = os.path.join( 212 self.tmp_dir, 213 f"ffmpeg_quality_metrics_{filter_name}_{os.path.basename(self.ref)}_{os.path.basename(self.dist)}.{suffix}", 214 ) 215 logger.debug( 216 f"Writing temporary {filter_name.upper()} information to: {self.temp_files[cast(FilterName, filter_name)]}" 217 ) 218 219 if scaling_algorithm not in self.ALLOWED_SCALERS: 220 raise FfmpegQualityMetricsError( 221 f"Allowed scaling algorithms: {self.ALLOWED_SCALERS}" 222 ) 223 224 self._check_available_filters() 225 226 def _check_available_filters(self): 227 """ 228 Check which filters are available 229 """ 230 cmd = ["ffmpeg", "-filters"] 231 stdout, _ = run_command(cmd) 232 filter_list = [] 233 for line in stdout.split("\n"): 234 line = line.strip() 235 if line == "": 236 continue 237 cols = line.split(" ") 238 if len(cols) > 1: 239 filter_name = cols[1] 240 filter_list.append(filter_name) 241 242 for key in FfmpegQualityMetrics.POSSIBLE_FILTERS: 243 if key in filter_list: 244 self.available_filters.append(key) 245 246 logger.debug(f"Available filters: {self.available_filters}") 247 248 @staticmethod 249 def get_framerate(input_file: str) -> float: 250 """Parse the FPS from the input file. 251 252 Args: 253 input_file (str): Input file path 254 255 Raises: 256 FfmpegQualityMetricsError: A generic error 257 258 Returns: 259 float: The FPS parsed 260 """ 261 cmd = ["ffmpeg", "-nostdin", "-y", "-i", input_file] 262 263 output = run_command(cmd, allow_error=True) 264 pattern = re.compile(r"(\d+(\.\d+)?) fps") 265 try: 266 if pattern_ret := pattern.search(str(output)): 267 match = pattern_ret.groups()[0] 268 return float(match) 269 except Exception: 270 pass 271 272 raise FfmpegQualityMetricsError(f"could not parse FPS from file {input_file}!") 273 274 def _get_framerates(self) -> Tuple[float, float]: 275 """ 276 Get the framerates of the reference and distorted files. 277 278 Returns: 279 Tuple[float, float]: The framerates of the reference and distorted files 280 """ 281 ref_framerate = FfmpegQualityMetrics.get_framerate(self.ref) 282 dist_framerate = FfmpegQualityMetrics.get_framerate(self.dist) 283 284 if ref_framerate != dist_framerate: 285 logger.warning( 286 f"ref, dist framerates differ: {ref_framerate}, {dist_framerate}. " 287 "This may result in inaccurate quality metrics. Force an input framerate via the -r option." 288 ) 289 290 return ref_framerate, dist_framerate 291 292 def _get_filter_opts(self, filter_name: FilterName) -> str: 293 """ 294 Returns: 295 str: Specific ffmpeg filter options for a chosen metric filter. 296 """ 297 if filter_name in ["ssim", "psnr"]: 298 return f"{filter_name}='{win_path_check(self.temp_files[filter_name])}'" 299 elif filter_name == "libvmaf": 300 return f"libvmaf='{self._get_libvmaf_filter_opts()}'" 301 elif filter_name == "vif": 302 return "vif,metadata=mode=print" 303 elif filter_name == "msad": 304 return "msad,metadata=mode=print" 305 else: 306 raise FfmpegQualityMetricsError(f"Unknown filter {filter_name}!") 307 308 def calculate( 309 self, 310 metrics: List[MetricName] = ["ssim", "psnr"], 311 vmaf_options: Union[VmafOptions, None] = None, 312 ) -> Dict[MetricName, SingleMetricData]: 313 """Calculate one or more metrics. 314 315 Args: 316 metrics (list, optional): A list of metrics to calculate. 317 Possible values are ["ssim", "psnr", "vmaf"]. 318 Defaults to ["ssim", "psnr"]. 319 vmaf_options (dict, optional): VMAF-specific options. Uses defaults if not specified. 320 321 Raises: 322 FfmpegQualityMetricsError: In case of an error 323 e: A generic error 324 325 Returns: 326 dict: A dictionary of per-frame info, with the key being the metric name and the value being a dict of frame numbers ('n') and metric values. 327 """ 328 if not metrics: 329 raise FfmpegQualityMetricsError("No metrics specified!") 330 331 # check available metrics 332 for metric_name in metrics: 333 filter_name = self.METRIC_TO_FILTER_MAP.get(metric_name, None) 334 if filter_name not in self.POSSIBLE_FILTERS: 335 raise FfmpegQualityMetricsError(f"No such metric '{metric_name}'") 336 if filter_name not in self.available_filters: 337 raise FfmpegQualityMetricsError( 338 f"Your ffmpeg version does not have the filter '{filter_name}'" 339 ) 340 341 # set VMAF options specifically 342 if "vmaf" in metrics: 343 self._check_libvmaf_availability() 344 self.vmaf_options = self.DEFAULT_VMAF_OPTIONS 345 # override with user-supplied options 346 if vmaf_options: 347 for key, value in vmaf_options.items(): 348 if value is not None: 349 self.vmaf_options[key] = value # type: ignore 350 self._set_vmaf_model_path(self.vmaf_options["model_path"]) 351 352 # ffmpeg 7.1 or higher: scale2ref filter is deprecated 353 # input 0: ref, input 1: dist --> swapped for scale filter 354 filter_chains = [ 355 f"[1][0]scale=rw:rh:flags={self.scaling_algorithm}[dist]", 356 "[dist]settb=AVTB,setpts=PTS-STARTPTS[distpts]", 357 "[0]settb=AVTB,setpts=PTS-STARTPTS[refpts]", 358 ] 359 360 # generate split filters depending on the number of models 361 n_splits = len(metrics) 362 if n_splits > 1: 363 for source in ["dist", "ref"]: 364 suffixes = "".join([f"[{source}{n}]" for n in range(1, n_splits + 1)]) 365 filter_chains.extend( 366 [ 367 f"[{source}pts]split={n_splits}{suffixes}", 368 ] 369 ) 370 371 # special case, only one metric: 372 if n_splits == 1: 373 metric_name = metrics[0] 374 filter_chains.extend( 375 [ 376 f"[distpts][refpts]{self._get_filter_opts(self.METRIC_TO_FILTER_MAP[metric_name])}" 377 ] 378 ) 379 # all other cases: 380 else: 381 for n, metric_name in zip(range(1, n_splits + 1), metrics): 382 filter_chains.extend( 383 [ 384 f"[dist{n}][ref{n}]{self._get_filter_opts(self.METRIC_TO_FILTER_MAP[metric_name])}" 385 ] 386 ) 387 388 try: 389 output = self._run_ffmpeg_command(filter_chains, desc=", ".join(metrics)) 390 self._read_temp_files(metrics) 391 if output: 392 self._read_ffmpeg_output(output, metrics) 393 else: 394 raise FfmpegQualityMetricsError("ffmpeg output is empty!") 395 except Exception as e: 396 raise e 397 finally: 398 self._cleanup_temp_files() 399 400 # return only those data entries containing values 401 return cast( 402 Dict[MetricName, SingleMetricData], 403 {k: v for k, v in self.data.items() if v}, 404 ) 405 406 def _get_libvmaf_filter_opts(self) -> str: 407 """ 408 Returns: 409 410 str: A string to use for VMAF in ffmpeg filter chain 411 """ 412 # we only have one model, and its path parameter is not optional 413 all_model_params: Dict[str, str] = { 414 "path": win_vmaf_model_path_check(self.vmaf_model_path) 415 } 416 417 # add further model parameters 418 for model_param in self.vmaf_options["model_params"]: 419 key, value = model_param.split("=") 420 all_model_params[key] = value 421 422 all_model_params_str = "\\:".join( 423 f"{k}={v}" for k, v in all_model_params.items() 424 ) 425 426 vmaf_opts: Dict[str, str] = { 427 "model": all_model_params_str, 428 "log_path": win_path_check(self.temp_files["libvmaf"]), 429 "log_fmt": "json", 430 "n_threads": str(self.vmaf_options["n_threads"]), 431 "n_subsample": str(self.vmaf_options["n_subsample"]), 432 } 433 434 if self.vmaf_options["features"]: 435 features = [] 436 for feature in self.vmaf_options["features"]: 437 if not feature.startswith("name"): 438 feature = f"name={feature}" 439 features.append(feature.replace(":", "\\:")) 440 vmaf_opts["feature"] = "|".join(features) 441 442 vmaf_opts_string = ":".join( 443 f"{k}={v}" for k, v in vmaf_opts.items() if v is not None 444 ) 445 446 return vmaf_opts_string 447 448 def _check_libvmaf_availability(self) -> None: 449 if "libvmaf" not in self.available_filters: 450 raise FfmpegQualityMetricsError( 451 "Your ffmpeg build does not have support for VMAF. Make sure you download or build a version compiled with --enable-libvmaf!" 452 ) 453 454 def _set_vmaf_model_path(self, model_path: Union[str, None] = None) -> None: 455 """ 456 Logic to set the model path depending on the default or the user-supplied string 457 """ 458 if model_path is None: 459 self.vmaf_model_path = FfmpegQualityMetrics.get_default_vmaf_model_path() 460 else: 461 self.vmaf_model_path = str(model_path) 462 463 supplied_models = FfmpegQualityMetrics.get_supplied_vmaf_models() 464 465 if not os.path.isfile(self.vmaf_model_path): 466 # check if this is one of the supplied ones? e.g. user passed only a filename 467 if self.vmaf_model_path in supplied_models: 468 self.vmaf_model_path = os.path.join( 469 FfmpegQualityMetrics.DEFAULT_VMAF_MODEL_DIRECTORY, 470 self.vmaf_model_path, 471 ) 472 else: 473 raise FfmpegQualityMetricsError( 474 f"Could not find model at {self.vmaf_model_path}. Please set --model-path to a valid VMAF .json model file." 475 ) 476 477 def _read_vmaf_temp_file(self) -> None: 478 """ 479 Read the VMAF temp file and append the data to the data dict. 480 """ 481 with open(self.temp_files["libvmaf"], "r") as in_vmaf: 482 vmaf_log = json.load(in_vmaf) 483 logger.debug(f"VMAF log: {json.dumps(vmaf_log, indent=4)}") 484 for frame_data in vmaf_log["frames"]: 485 # append frame number, increase +1 486 frame_data["metrics"]["n"] = int(frame_data["frameNum"]) + 1 487 self.data["vmaf"].append(frame_data["metrics"]) 488 489 def _read_ffmpeg_output(self, ffmpeg_output: str, metrics=[]) -> None: 490 """ 491 Read the metric values from ffmpeg's stderr, for those that don't output 492 to a file. 493 """ 494 if self.dry_run: 495 return 496 if "vif" in metrics: 497 self._parse_ffmpeg_metadata_output(ffmpeg_output, "vif") 498 if "msad" in metrics: 499 self._parse_ffmpeg_metadata_output(ffmpeg_output, "msad") 500 501 def _parse_ffmpeg_metadata_output( 502 self, ffmpeg_output: str, metric_name: Literal["vif", "msad"] 503 ) -> None: 504 """ 505 Parse the filter output written to ffmpeg's metadata output 506 507 Args: 508 ffmpeg_output (str): The output of ffmpeg's stderr 509 metric_name (Literal["vif", "msad"]): The name of the metric to parse 510 """ 511 # Example for VIF: 512 # 513 # [Parsed_metadata_4 @ 0x7f995cd08640] frame:1 pts:1 pts_time:0.0401x 514 # [Parsed_metadata_4 @ 0x7f995cd08640] lavfi.vif.scale.0=0.263582 515 # [Parsed_metadata_4 @ 0x7f995cd08640] lavfi.vif.scale.1=0.560129 516 # [Parsed_metadata_4 @ 0x7f995cd08640] lavfi.vif.scale.2=0.626596 517 # [Parsed_metadata_4 @ 0x7f995cd08640] lavfi.vif.scale.3=0.682183 518 # 519 # Example for MSAD: 520 # 521 # [Parsed_metadata_6 @ 0x10ad04ea0] lavfi.msad.msad.Y=0.029998 522 # [Parsed_metadata_6 @ 0x10ad04ea0] lavfi.msad.msad.U=0.019501 523 # [Parsed_metadata_6 @ 0x10ad04ea0] lavfi.msad.msad.V=0.026455 524 # [Parsed_metadata_6 @ 0x10ad04ea0] lavfi.msad.msad_avg=0.025318 525 526 lines = [line.strip() for line in ffmpeg_output.split("\n")] 527 current_frame = None 528 frame_data: Dict[str, float] = {} 529 530 for line in lines: 531 if not line.startswith("[Parsed_metadata"): 532 continue 533 534 fields = line.split(" ") 535 536 # a new frame appears 537 if fields[3].startswith("frame"): 538 # if we have data already 539 if frame_data: 540 self.data[metric_name].append(frame_data) 541 542 # get the frame number and reset the frame data 543 current_frame = int(fields[3].split(":")[1]) 544 frame_data = {"n": current_frame} 545 continue 546 547 # no frame was set, or no VIF info present 548 if current_frame is None or not fields[3].startswith( 549 f"lavfi.{metric_name}" 550 ): 551 continue 552 553 # we have a frame 554 key, value = fields[3].split("=") 555 key = key.replace(f"lavfi.{metric_name}.", "").replace(".", "_").lower() 556 frame_data[key] = round(float(value), 3) 557 558 # append final frame data 559 if frame_data: 560 self.data[metric_name].append(frame_data) 561 562 def _read_temp_files(self, metrics=[]): 563 """ 564 Read the data from multiple temp files 565 """ 566 if self.dry_run: 567 return 568 if "vmaf" in metrics: 569 self._read_vmaf_temp_file() 570 if "ssim" in metrics: 571 self._read_ssim_temp_file() 572 if "psnr" in metrics: 573 self._read_psnr_temp_file() 574 575 def _run_ffmpeg_command( 576 self, filter_chains: List[str] = [], desc: str = "" 577 ) -> Union[str, None]: 578 """ 579 Run the ffmpeg command to get the quality metrics. 580 The filter chains must be specified manually. 581 'desc' can be a human readable description for the progress bar. 582 583 Returns: 584 Union[str, None]: The output of ffmpeg's stderr 585 """ 586 if not self.framerate: 587 ref_framerate, dist_framerate = self._get_framerates() 588 else: 589 ref_framerate = self.framerate 590 dist_framerate = self.framerate 591 592 cmd = [ 593 "ffmpeg", 594 "-nostdin", 595 "-nostats", 596 "-y", 597 "-threads", 598 str(self.threads), 599 "-r", 600 str(ref_framerate), 601 "-i", 602 self.ref, 603 "-itsoffset", 604 str(self.dist_delay), 605 "-r", 606 str(dist_framerate), 607 "-i", 608 self.dist, 609 "-filter_complex", 610 ";".join(filter_chains), 611 "-an", 612 "-f", 613 "null", 614 NUL, 615 ] 616 617 if self.progress: 618 logger.debug(quoted_cmd(cmd)) 619 with FfmpegProgress(cmd, self.dry_run) as ff: 620 with tqdm(total=100, position=1, desc=desc) as pbar: 621 for progress in ff.run_command_with_progress(): 622 pbar.update(progress - pbar.n) 623 return ff.stderr 624 else: 625 _, stderr = run_command(cmd, dry_run=self.dry_run) 626 return stderr 627 628 def _cleanup_temp_files(self) -> None: 629 """ 630 Remove the temporary files 631 """ 632 for temp_file in self.temp_files.values(): 633 if os.path.isfile(temp_file): 634 if self.keep_tmp_files: 635 logger.debug(f"Keeping temp file {temp_file}") 636 else: 637 os.remove(temp_file) 638 639 def _read_psnr_temp_file(self) -> None: 640 """ 641 Parse the PSNR generated logfile 642 """ 643 with open(self.temp_files["psnr"], "r") as in_psnr: 644 # n:1 mse_avg:529.52 mse_y:887.00 mse_u:233.33 mse_v:468.25 psnr_avg:20.89 psnr_y:18.65 psnr_u:24.45 psnr_v:21.43 645 lines = in_psnr.readlines() 646 for line in lines: 647 line = line.strip() 648 fields = line.split(" ") 649 frame_data = {} 650 for field in fields: 651 k, v = field.split(":") 652 frame_data[k] = round(float(v), 3) if k != "n" else int(v) 653 self.data["psnr"].append(frame_data) 654 655 def _read_ssim_temp_file(self) -> None: 656 """ 657 Parse the SSIM generated logfile 658 """ 659 with open(self.temp_files["ssim"], "r") as in_ssim: 660 # n:1 Y:0.937213 U:0.961733 V:0.945788 All:0.948245 (12.860441)\n 661 lines = in_ssim.readlines() 662 for line in lines: 663 line = line.strip().split(" (")[0] # remove excess 664 fields = line.split(" ") 665 frame_data = {} 666 for field in fields: 667 k, v = field.split(":") 668 if k != "n": 669 # make psnr and ssim keys the same 670 k = "ssim_" + k.lower() 671 k = k.replace("all", "avg") 672 frame_data[k] = round(float(v), 3) if k != "n" else int(v) 673 self.data["ssim"].append(frame_data) 674 675 @staticmethod 676 def get_brewed_vmaf_model_path() -> Union[str, None]: 677 """ 678 Hack to get path for VMAF model from Homebrew or Linuxbrew. 679 This works for libvmaf 2.x 680 681 Returns: 682 str or None: the path or None if not found 683 """ 684 stdout, _ = run_command(["brew", "--prefix", "libvmaf"]) 685 cellar_path = stdout.strip() 686 687 model_path = os.path.join(cellar_path, "share", "libvmaf", "model") 688 689 if not os.path.isdir(model_path): 690 logger.warning( 691 f"{model_path} does not exist. Are you sure you have installed the most recent version of libvmaf with Homebrew?" 692 ) 693 return None 694 695 return model_path 696 697 @staticmethod 698 def get_default_vmaf_model_path() -> str: 699 """ 700 Return the default model path depending on whether the user is running Homebrew 701 or has a static build. 702 703 Returns: 704 str: the path 705 """ 706 if has_brew() and ffmpeg_is_from_brew(): 707 # If the user installed ffmpeg using homebrew 708 model_path = FfmpegQualityMetrics.get_brewed_vmaf_model_path() 709 if model_path is not None: 710 return os.path.join( 711 model_path, 712 "vmaf_v0.6.1.json", 713 ) 714 715 share_path = os.path.join("/usr", "local", "share", "model") 716 if os.path.isdir(share_path): 717 return os.path.join(share_path, "vmaf_v0.6.1.json") 718 else: 719 # return the bundled file as a fallback 720 return os.path.join( 721 FfmpegQualityMetrics.DEFAULT_VMAF_MODEL_DIRECTORY, "vmaf_v0.6.1.json" 722 ) 723 724 @staticmethod 725 def get_supplied_vmaf_models() -> List[str]: 726 """ 727 Return a list of VMAF models supplied with the software. 728 729 Returns: 730 List[str]: A list of VMAF model names 731 """ 732 return [ 733 f 734 for f in os.listdir(FfmpegQualityMetrics.DEFAULT_VMAF_MODEL_DIRECTORY) 735 if f.endswith(".json") 736 ] 737 738 def get_global_stats(self) -> GlobalStats: 739 """ 740 Return a dictionary for each calculated metric, with different statstics 741 742 Returns: 743 dict: A dictionary with stats, each key being a metric name and each value being a dictionary with the stats for every submetric. The stats are: 'average', 'median', 'stdev', 'min', 'max'. 744 """ 745 for metric_name in self.data: 746 logger.debug(f"Aggregating stats for {metric_name}") 747 metric_data = cast(SingleMetricData, self.data[metric_name]) 748 if len(metric_data) == 0: 749 continue 750 submetric_keys = [k for k in metric_data[0].keys() if k != "n"] 751 752 stats: Dict[str, GlobalStatsData] = {} 753 for submetric_key in submetric_keys: 754 values = [float(frame[submetric_key]) for frame in metric_data] 755 # Filter out non-finite values (inf, -inf, nan) for robust statistics 756 finite = [v for v in values if math.isfinite(v)] 757 # Fallback to [0.0] if all values are non-finite to prevent crashes 758 all_values = finite if finite else [0.0] 759 760 stats[submetric_key] = { 761 "average": round(float(mean(all_values)), 3), 762 "median": round(float(median(all_values)), 3), 763 "stdev": round( 764 float(pstdev(finite)) if len(finite) > 1 else 0.0, 3 765 ), 766 "min": round(min(all_values), 3), 767 "max": round(max(all_values), 3), 768 } 769 self.global_stats[metric_name] = stats 770 771 return self.global_stats 772 773 def get_results_csv(self) -> str: 774 """ 775 Return a CSV string with the data 776 777 Returns: 778 str: The CSV string 779 """ 780 # Check if we have any data 781 has_data = any(metric_data for metric_data in self.data.values()) 782 if not has_data: 783 raise FfmpegQualityMetricsError("No data calculated!") 784 785 # Collect all frames and merge data by frame number 786 frames_data: Dict[int, Dict[str, Union[str, float, int]]] = {} 787 788 # Process each metric's data 789 for metric_data in self.data.values(): 790 if not metric_data: 791 continue 792 793 for frame_info in cast(SingleMetricData, metric_data): 794 frame_num = int(frame_info["n"]) 795 if frame_num not in frames_data: 796 frames_data[frame_num] = {"n": frame_num} 797 798 # Add all metric properties for this frame 799 for key, value in frame_info.items(): 800 if key != "n": # Skip frame number as it's already added 801 frames_data[frame_num][key] = value 802 803 if not frames_data: 804 raise FfmpegQualityMetricsError("No frame data found!") 805 806 # Sort frames by frame number 807 sorted_frames = sorted(frames_data.keys()) 808 809 # Collect all unique column names (excluding 'n' which we'll put first) 810 all_columns: set[str] = set() 811 for frame_data in frames_data.values(): 812 all_columns.update(frame_data.keys()) 813 all_columns.discard("n") 814 815 # Create column order: n first, then sorted metric columns, then input files 816 columns = ["n"] + sorted(all_columns) + ["input_file_dist", "input_file_ref"] 817 818 # Generate CSV using StringIO and csv module 819 output = StringIO() 820 writer = csv.writer(output) 821 822 # Write header 823 writer.writerow(columns) 824 825 # Write data rows 826 for frame_num in sorted_frames: 827 frame_data = frames_data[frame_num] 828 row = [] 829 for col in columns: 830 if col == "input_file_dist": 831 row.append(self.dist) 832 elif col == "input_file_ref": 833 row.append(self.ref) 834 else: 835 # Use the frame data value or empty string if not present 836 row.append(str(frame_data.get(col, ""))) 837 writer.writerow(row) 838 839 return output.getvalue() 840 841 def get_results_json(self) -> str: 842 """ 843 Return the results as JSON string 844 845 Returns: 846 str: The JSON string 847 """ 848 ret: Dict = {} 849 for key in self.data: 850 metric_data = cast(SingleMetricData, self.data[key]) 851 if len(metric_data) == 0: 852 continue 853 ret[key] = metric_data 854 ret["global"] = self.get_global_stats() 855 ret["input_file_dist"] = self.dist 856 ret["input_file_ref"] = self.ref 857 858 return json.dumps(ret, indent=4)
A class to calculate quality metrics with FFmpeg
132 def __init__( 133 self, 134 ref: str, 135 dist: str, 136 scaling_algorithm: str = DEFAULT_SCALER, 137 framerate: Union[float, None] = None, 138 dist_delay: float = 0, 139 dry_run: Union[bool, None] = False, 140 verbose: Union[bool, None] = False, 141 threads: int = DEFAULT_THREADS, 142 progress: Union[bool, None] = False, 143 keep_tmp_files: Union[bool, None] = False, 144 tmp_dir: Union[str, None] = None, 145 ): 146 """Instantiate a new FfmpegQualityMetrics 147 148 Args: 149 ref (str): reference file 150 dist (str): distorted file 151 scaling_algorithm (str, optional): A scaling algorithm. Must be one of the following: ["fast_bilinear", "bilinear", "bicubic", "experimental", "neighbor", "area", "bicublin", "gauss", "sinc", "lanczos", "spline"]. Defaults to "bicubic" 152 framerate (float, optional): Force a frame rate. Defaults to None. 153 dist_delay (float): Delay the distorted file against the reference by this amount of seconds. Defaults to 0. 154 dry_run (bool, optional): Don't run anything, just print commands. Defaults to False. 155 verbose (bool, optional): Show more output. Defaults to False. 156 threads (int, optional): Number of ffmpeg threads. Defaults to 0 (auto). 157 progress (bool, optional): Show a progress bar. Defaults to False. 158 keep_tmp_files (bool, optional): Keep temporary files for debugging purposes. Defaults to False. 159 tmp_dir (str, optional): Directory to store temporary files. Will use system default if not specified. Defaults to None. 160 161 Raises: 162 FfmpegQualityMetricsError: A generic error 163 """ 164 self.ref = str(ref) 165 self.dist = str(dist) 166 self.scaling_algorithm = str(scaling_algorithm) 167 self.framerate = float(framerate) if framerate is not None else None 168 self.dist_delay = float(dist_delay) 169 self.dry_run = bool(dry_run) 170 self.verbose = bool(verbose) 171 self.threads = int(threads) 172 self.progress = bool(progress) 173 self.keep_tmp_files = bool(keep_tmp_files) 174 self.tmp_dir = str(tmp_dir) if tmp_dir is not None else tempfile.gettempdir() 175 176 if not os.path.isfile(self.ref): 177 raise FfmpegQualityMetricsError(f"Reference file not found: {self.ref}") 178 if not os.path.isfile(self.dist): 179 raise FfmpegQualityMetricsError(f"Distorted file not found: {self.dist}") 180 181 if self.ref == self.dist: 182 logger.warning( 183 "Reference and distorted files are the same! This may lead to unexpected results or numerical issues." 184 ) 185 186 if ref.endswith(".yuv") or dist.endswith(".yuv"): 187 raise FfmpegQualityMetricsError( 188 "YUV files are not supported, please convert to a format that ffmpeg can read natively, such as Y4M or FFV1." 189 ) 190 191 self.data: MetricData = { 192 "vmaf": [], 193 "psnr": [], 194 "ssim": [], 195 "vif": [], 196 "msad": [], 197 } 198 199 self.available_filters: List[str] = [] 200 201 self.global_stats: GlobalStats = {} 202 203 if not os.path.isdir(self.tmp_dir): 204 logger.debug(f"Creating temporary directory: {self.tmp_dir}") 205 os.makedirs(self.tmp_dir) 206 self.temp_files: Dict[FilterName, str] = {} 207 208 for filter_name in self.POSSIBLE_FILTERS: 209 suffix = "txt" if filter_name != "libvmaf" else "json" 210 211 self.temp_files[cast(FilterName, filter_name)] = os.path.join( 212 self.tmp_dir, 213 f"ffmpeg_quality_metrics_{filter_name}_{os.path.basename(self.ref)}_{os.path.basename(self.dist)}.{suffix}", 214 ) 215 logger.debug( 216 f"Writing temporary {filter_name.upper()} information to: {self.temp_files[cast(FilterName, filter_name)]}" 217 ) 218 219 if scaling_algorithm not in self.ALLOWED_SCALERS: 220 raise FfmpegQualityMetricsError( 221 f"Allowed scaling algorithms: {self.ALLOWED_SCALERS}" 222 ) 223 224 self._check_available_filters()
Instantiate a new FfmpegQualityMetrics
Arguments:
- ref (str): reference file
- dist (str): distorted file
- scaling_algorithm (str, optional): A scaling algorithm. Must be one of the following: ["fast_bilinear", "bilinear", "bicubic", "experimental", "neighbor", "area", "bicublin", "gauss", "sinc", "lanczos", "spline"]. Defaults to "bicubic"
- framerate (float, optional): Force a frame rate. Defaults to None.
- dist_delay (float): Delay the distorted file against the reference by this amount of seconds. Defaults to 0.
- dry_run (bool, optional): Don't run anything, just print commands. Defaults to False.
- verbose (bool, optional): Show more output. Defaults to False.
- threads (int, optional): Number of ffmpeg threads. Defaults to 0 (auto).
- progress (bool, optional): Show a progress bar. Defaults to False.
- keep_tmp_files (bool, optional): Keep temporary files for debugging purposes. Defaults to False.
- tmp_dir (str, optional): Directory to store temporary files. Will use system default if not specified. Defaults to None.
Raises:
- FfmpegQualityMetricsError: A generic error
248 @staticmethod 249 def get_framerate(input_file: str) -> float: 250 """Parse the FPS from the input file. 251 252 Args: 253 input_file (str): Input file path 254 255 Raises: 256 FfmpegQualityMetricsError: A generic error 257 258 Returns: 259 float: The FPS parsed 260 """ 261 cmd = ["ffmpeg", "-nostdin", "-y", "-i", input_file] 262 263 output = run_command(cmd, allow_error=True) 264 pattern = re.compile(r"(\d+(\.\d+)?) fps") 265 try: 266 if pattern_ret := pattern.search(str(output)): 267 match = pattern_ret.groups()[0] 268 return float(match) 269 except Exception: 270 pass 271 272 raise FfmpegQualityMetricsError(f"could not parse FPS from file {input_file}!")
Parse the FPS from the input file.
Arguments:
- input_file (str): Input file path
Raises:
- FfmpegQualityMetricsError: A generic error
Returns:
float: The FPS parsed
308 def calculate( 309 self, 310 metrics: List[MetricName] = ["ssim", "psnr"], 311 vmaf_options: Union[VmafOptions, None] = None, 312 ) -> Dict[MetricName, SingleMetricData]: 313 """Calculate one or more metrics. 314 315 Args: 316 metrics (list, optional): A list of metrics to calculate. 317 Possible values are ["ssim", "psnr", "vmaf"]. 318 Defaults to ["ssim", "psnr"]. 319 vmaf_options (dict, optional): VMAF-specific options. Uses defaults if not specified. 320 321 Raises: 322 FfmpegQualityMetricsError: In case of an error 323 e: A generic error 324 325 Returns: 326 dict: A dictionary of per-frame info, with the key being the metric name and the value being a dict of frame numbers ('n') and metric values. 327 """ 328 if not metrics: 329 raise FfmpegQualityMetricsError("No metrics specified!") 330 331 # check available metrics 332 for metric_name in metrics: 333 filter_name = self.METRIC_TO_FILTER_MAP.get(metric_name, None) 334 if filter_name not in self.POSSIBLE_FILTERS: 335 raise FfmpegQualityMetricsError(f"No such metric '{metric_name}'") 336 if filter_name not in self.available_filters: 337 raise FfmpegQualityMetricsError( 338 f"Your ffmpeg version does not have the filter '{filter_name}'" 339 ) 340 341 # set VMAF options specifically 342 if "vmaf" in metrics: 343 self._check_libvmaf_availability() 344 self.vmaf_options = self.DEFAULT_VMAF_OPTIONS 345 # override with user-supplied options 346 if vmaf_options: 347 for key, value in vmaf_options.items(): 348 if value is not None: 349 self.vmaf_options[key] = value # type: ignore 350 self._set_vmaf_model_path(self.vmaf_options["model_path"]) 351 352 # ffmpeg 7.1 or higher: scale2ref filter is deprecated 353 # input 0: ref, input 1: dist --> swapped for scale filter 354 filter_chains = [ 355 f"[1][0]scale=rw:rh:flags={self.scaling_algorithm}[dist]", 356 "[dist]settb=AVTB,setpts=PTS-STARTPTS[distpts]", 357 "[0]settb=AVTB,setpts=PTS-STARTPTS[refpts]", 358 ] 359 360 # generate split filters depending on the number of models 361 n_splits = len(metrics) 362 if n_splits > 1: 363 for source in ["dist", "ref"]: 364 suffixes = "".join([f"[{source}{n}]" for n in range(1, n_splits + 1)]) 365 filter_chains.extend( 366 [ 367 f"[{source}pts]split={n_splits}{suffixes}", 368 ] 369 ) 370 371 # special case, only one metric: 372 if n_splits == 1: 373 metric_name = metrics[0] 374 filter_chains.extend( 375 [ 376 f"[distpts][refpts]{self._get_filter_opts(self.METRIC_TO_FILTER_MAP[metric_name])}" 377 ] 378 ) 379 # all other cases: 380 else: 381 for n, metric_name in zip(range(1, n_splits + 1), metrics): 382 filter_chains.extend( 383 [ 384 f"[dist{n}][ref{n}]{self._get_filter_opts(self.METRIC_TO_FILTER_MAP[metric_name])}" 385 ] 386 ) 387 388 try: 389 output = self._run_ffmpeg_command(filter_chains, desc=", ".join(metrics)) 390 self._read_temp_files(metrics) 391 if output: 392 self._read_ffmpeg_output(output, metrics) 393 else: 394 raise FfmpegQualityMetricsError("ffmpeg output is empty!") 395 except Exception as e: 396 raise e 397 finally: 398 self._cleanup_temp_files() 399 400 # return only those data entries containing values 401 return cast( 402 Dict[MetricName, SingleMetricData], 403 {k: v for k, v in self.data.items() if v}, 404 )
Calculate one or more metrics.
Arguments:
- metrics (list, optional): A list of metrics to calculate. Possible values are ["ssim", "psnr", "vmaf"]. Defaults to ["ssim", "psnr"].
- vmaf_options (dict, optional): VMAF-specific options. Uses defaults if not specified.
Raises:
- FfmpegQualityMetricsError: In case of an error
- e: A generic error
Returns:
dict: A dictionary of per-frame info, with the key being the metric name and the value being a dict of frame numbers ('n') and metric values.
675 @staticmethod 676 def get_brewed_vmaf_model_path() -> Union[str, None]: 677 """ 678 Hack to get path for VMAF model from Homebrew or Linuxbrew. 679 This works for libvmaf 2.x 680 681 Returns: 682 str or None: the path or None if not found 683 """ 684 stdout, _ = run_command(["brew", "--prefix", "libvmaf"]) 685 cellar_path = stdout.strip() 686 687 model_path = os.path.join(cellar_path, "share", "libvmaf", "model") 688 689 if not os.path.isdir(model_path): 690 logger.warning( 691 f"{model_path} does not exist. Are you sure you have installed the most recent version of libvmaf with Homebrew?" 692 ) 693 return None 694 695 return model_path
Hack to get path for VMAF model from Homebrew or Linuxbrew. This works for libvmaf 2.x
Returns:
str or None: the path or None if not found
697 @staticmethod 698 def get_default_vmaf_model_path() -> str: 699 """ 700 Return the default model path depending on whether the user is running Homebrew 701 or has a static build. 702 703 Returns: 704 str: the path 705 """ 706 if has_brew() and ffmpeg_is_from_brew(): 707 # If the user installed ffmpeg using homebrew 708 model_path = FfmpegQualityMetrics.get_brewed_vmaf_model_path() 709 if model_path is not None: 710 return os.path.join( 711 model_path, 712 "vmaf_v0.6.1.json", 713 ) 714 715 share_path = os.path.join("/usr", "local", "share", "model") 716 if os.path.isdir(share_path): 717 return os.path.join(share_path, "vmaf_v0.6.1.json") 718 else: 719 # return the bundled file as a fallback 720 return os.path.join( 721 FfmpegQualityMetrics.DEFAULT_VMAF_MODEL_DIRECTORY, "vmaf_v0.6.1.json" 722 )
Return the default model path depending on whether the user is running Homebrew or has a static build.
Returns:
str: the path
724 @staticmethod 725 def get_supplied_vmaf_models() -> List[str]: 726 """ 727 Return a list of VMAF models supplied with the software. 728 729 Returns: 730 List[str]: A list of VMAF model names 731 """ 732 return [ 733 f 734 for f in os.listdir(FfmpegQualityMetrics.DEFAULT_VMAF_MODEL_DIRECTORY) 735 if f.endswith(".json") 736 ]
Return a list of VMAF models supplied with the software.
Returns:
List[str]: A list of VMAF model names
738 def get_global_stats(self) -> GlobalStats: 739 """ 740 Return a dictionary for each calculated metric, with different statstics 741 742 Returns: 743 dict: A dictionary with stats, each key being a metric name and each value being a dictionary with the stats for every submetric. The stats are: 'average', 'median', 'stdev', 'min', 'max'. 744 """ 745 for metric_name in self.data: 746 logger.debug(f"Aggregating stats for {metric_name}") 747 metric_data = cast(SingleMetricData, self.data[metric_name]) 748 if len(metric_data) == 0: 749 continue 750 submetric_keys = [k for k in metric_data[0].keys() if k != "n"] 751 752 stats: Dict[str, GlobalStatsData] = {} 753 for submetric_key in submetric_keys: 754 values = [float(frame[submetric_key]) for frame in metric_data] 755 # Filter out non-finite values (inf, -inf, nan) for robust statistics 756 finite = [v for v in values if math.isfinite(v)] 757 # Fallback to [0.0] if all values are non-finite to prevent crashes 758 all_values = finite if finite else [0.0] 759 760 stats[submetric_key] = { 761 "average": round(float(mean(all_values)), 3), 762 "median": round(float(median(all_values)), 3), 763 "stdev": round( 764 float(pstdev(finite)) if len(finite) > 1 else 0.0, 3 765 ), 766 "min": round(min(all_values), 3), 767 "max": round(max(all_values), 3), 768 } 769 self.global_stats[metric_name] = stats 770 771 return self.global_stats
Return a dictionary for each calculated metric, with different statstics
Returns:
dict: A dictionary with stats, each key being a metric name and each value being a dictionary with the stats for every submetric. The stats are: 'average', 'median', 'stdev', 'min', 'max'.
773 def get_results_csv(self) -> str: 774 """ 775 Return a CSV string with the data 776 777 Returns: 778 str: The CSV string 779 """ 780 # Check if we have any data 781 has_data = any(metric_data for metric_data in self.data.values()) 782 if not has_data: 783 raise FfmpegQualityMetricsError("No data calculated!") 784 785 # Collect all frames and merge data by frame number 786 frames_data: Dict[int, Dict[str, Union[str, float, int]]] = {} 787 788 # Process each metric's data 789 for metric_data in self.data.values(): 790 if not metric_data: 791 continue 792 793 for frame_info in cast(SingleMetricData, metric_data): 794 frame_num = int(frame_info["n"]) 795 if frame_num not in frames_data: 796 frames_data[frame_num] = {"n": frame_num} 797 798 # Add all metric properties for this frame 799 for key, value in frame_info.items(): 800 if key != "n": # Skip frame number as it's already added 801 frames_data[frame_num][key] = value 802 803 if not frames_data: 804 raise FfmpegQualityMetricsError("No frame data found!") 805 806 # Sort frames by frame number 807 sorted_frames = sorted(frames_data.keys()) 808 809 # Collect all unique column names (excluding 'n' which we'll put first) 810 all_columns: set[str] = set() 811 for frame_data in frames_data.values(): 812 all_columns.update(frame_data.keys()) 813 all_columns.discard("n") 814 815 # Create column order: n first, then sorted metric columns, then input files 816 columns = ["n"] + sorted(all_columns) + ["input_file_dist", "input_file_ref"] 817 818 # Generate CSV using StringIO and csv module 819 output = StringIO() 820 writer = csv.writer(output) 821 822 # Write header 823 writer.writerow(columns) 824 825 # Write data rows 826 for frame_num in sorted_frames: 827 frame_data = frames_data[frame_num] 828 row = [] 829 for col in columns: 830 if col == "input_file_dist": 831 row.append(self.dist) 832 elif col == "input_file_ref": 833 row.append(self.ref) 834 else: 835 # Use the frame data value or empty string if not present 836 row.append(str(frame_data.get(col, ""))) 837 writer.writerow(row) 838 839 return output.getvalue()
Return a CSV string with the data
Returns:
str: The CSV string
841 def get_results_json(self) -> str: 842 """ 843 Return the results as JSON string 844 845 Returns: 846 str: The JSON string 847 """ 848 ret: Dict = {} 849 for key in self.data: 850 metric_data = cast(SingleMetricData, self.data[key]) 851 if len(metric_data) == 0: 852 continue 853 ret[key] = metric_data 854 ret["global"] = self.get_global_stats() 855 ret["input_file_dist"] = self.dist 856 ret["input_file_ref"] = self.ref 857 858 return json.dumps(ret, indent=4)
Return the results as JSON string
Returns:
str: The JSON string
Common base class for all non-exit exceptions.
36class VmafOptions(TypedDict): 37 """ 38 VMAF-specific options. 39 """ 40 41 model_path: Union[str, None] 42 """Use a specific VMAF model file. If none is chosen, picks a default model.""" 43 model_params: List[str] 44 """A list of params to pass to the VMAF model, specified as key=value.""" 45 n_threads: Union[int, None] 46 """Number of threads to use. Defaults to 0 (auto).""" 47 n_subsample: Union[int, None] 48 """Subsampling interval. Defaults to 1.""" 49 features: List[str] 50 """ 51 List of features to enable in addition to the default features. 52 Each entry must be a string beginning with name=feature_name, and additional parameters can be specified as 53 key=value, separated by colons. 54 """
VMAF-specific options.