Source code for movieparse.main

"""yaya.

yaya.

Typical usage example:

foo = ClassFoo()
bar = foo.FunctionBar()
"""

import asyncio
import os
import re
from pathlib import Path
from typing import Dict
from typing import List

import aiohttp
import numpy as np
import pandas as pd
from tqdm import tqdm


[docs]class Movieparse: """Movieparse object used for storing configuration and metadata. Attributes: output_dir: Output directory where files get written to. tmdb_api_key: TMDB API Key. Falls back to environment variable TMDB_API_KEY. parsing_style: Define parsing style to use. -1 for estimating parsing style. strict: Always use title and release year for looking up metadata, no fallback to title only. language: ISO-639-1 shortcode for getting locale information. """ mapping = pd.DataFrame() cast = ( collect ) = crew = details = genres = prod_comp = prod_count = spoken_langs = pd.DataFrame() cached_mapping = pd.DataFrame() default_codes = { "DEFAULT": 0, "NO_RESULT": -1, "NO_EXTRACT": -2, "BAD_RESPONSE": -3, }
[docs] @staticmethod def get_parsing_patterns() -> dict[int, re.Pattern[str]]: """Lists all valid patterns for extracting title and (optionally release year) from input. Returns: A dict mapping integer keys to their regex pattern. """ return { 0: re.compile(r"^(?P<disk_year>\d{4})\s{1}(?P<disk_title>.+)$"), 1: re.compile(r"^(?P<disk_year>\d{4})\s-\s(?P<disk_title>.+)$"), 2: re.compile(r"^(?P<disk_title>.+)\s(?P<disk_year>\d{4})$"), }
def __init__( self, output_dir: Path | None = None, tmdb_api_key: str | None = None, parsing_style: int = -1, strict: bool = False, language: str = "en_US", ): """Initilizes movieparser.""" self._STRICT = strict self._LANGUAGE = language if output_dir is None: output_dir = Path.cwd() elif output_dir.is_dir() is False: raise Exception("please supply an OUTPUT_DIR that is a directory!") self._OUTPUT_DIR = output_dir if tmdb_api_key is None and os.getenv("TMDB_API_KEY") is not None: self._TMDB_API_KEY = os.getenv("TMDB_API_KEY") elif tmdb_api_key is not None: self._TMDB_API_KEY = tmdb_api_key else: raise Exception("please supply a TMDB_API_KEY!") if parsing_style not in range( -1, max(Movieparse.get_parsing_patterns().keys()) ): raise Exception("please supply a valid PARSING_STYLE!") else: self._PARSING_STYLE = parsing_style self._read_existing() def _metadata(self) -> dict[str, pd.DataFrame]: """Provides a dictionary for compactly allocating metadata. Returns: Dictionary with filenames as keys and internal dataframes as values. """ return { "cast": self.cast, "collection": self.collect, "crew": self.crew, "genres": self.genres, "production_companies": self.prod_comp, "production_countries": self.prod_count, "spoken_languages": self.spoken_langs, "details": self.details, } def _read_existing(self) -> None: """Uses _table_iter() to read existing metadata and append to internal dataframes.""" df_list = [] for fname, df in self._metadata().items(): tmp_path = self._OUTPUT_DIR / f"{fname}.csv" if tmp_path.exists(): df = pd.read_csv(tmp_path) df = self._assign_types(df) else: df = pd.DataFrame() df_list.append(df) ( self.cast, self.collect, self.crew, self.genres, self.prod_comp, self.prod_count, self.spoken_langs, self.details, ) = df_list tmp_path = self._OUTPUT_DIR / "mapping.csv" if tmp_path.exists(): self.cached_mapping = pd.read_csv(tmp_path) self.cached_mapping = self._assign_types(self.cached_mapping)
[docs] def parse_movielist(self, movielist: List[str]) -> None: """Parse movie metadata from movielist. Args: movielist: List of titles (and optionally release years). """ if not movielist: raise Exception("movielist can't be empty!") self.mapping = pd.DataFrame( { "input": movielist, "canonical_input": movielist, } ) self._generic_parse()
[docs] def parse_root_movie_dir(self, root_movie_dir: Path) -> None: """Parse movie metadata from folders inside root_movie_dir. Args: root_movie_dir: directory where movie subfolders lie. """ if root_movie_dir.is_dir() is False: raise Exception("root_movie_dir has to be a directory!") names = [] for folder in root_movie_dir.iterdir(): if folder.is_dir(): names.append(folder) self.mapping = pd.DataFrame( { "input": names, "canonical_input": [x.name for x in names], } ) self._generic_parse()
def _generic_parse(self) -> None: self.mapping["tmdb_id"] = self.default_codes["NO_EXTRACT"] self.mapping["tmdb_id_man"] = self.default_codes["DEFAULT"] if self._PARSING_STYLE == -1: self._guess_parsing_style() self._update_mapping() self._get_ids() self._update_metadata_lookup_ids() asyncio.run(self._get_metadata()) def _guess_parsing_style(self) -> None: """Iterates over canonical input, matching the _PARSING_STYLE according to most matches. Raises: Expection if two or more styles have the same amount of matches or if no styles match. """ tmp = self.mapping[["canonical_input"]].copy() max_matches = 0 conflict = False for style, pattern in Movieparse.get_parsing_patterns().items(): matches = ( tmp["canonical_input"] .str.extract(pattern, expand=True) .notnull() .sum() .sum() ) if matches > max_matches: self._PARSING_STYLE = style max_matches = matches conflict = False elif matches == max_matches: conflict = True if max_matches == 0 and self._PARSING_STYLE == -1 or conflict: raise Exception( "couldn't estimate a parsing style, please supply one for yourself!" ) accuracy = f"{(max_matches / (len(tmp.index) * 2) * 100):.2f}" print(f"best parsing style: {self._PARSING_STYLE} with {accuracy}% accuracy") def _update_mapping(self) -> None: """Concatenates cached mapping and newly generated mapping, keeping the cached mappings entries if duplicates occur. For dupes only the column canonical_input is considered. If the user previously entered values in tmdb_id_man, these will be kept. """ self.mapping = pd.concat( [self.cached_mapping, self.mapping], axis=0, ignore_index=True ).drop_duplicates(subset="canonical_input", keep="first") def _get_ids(self) -> None: asyncio.run(self._get_ids_async(exact=True)) if not self._STRICT: asyncio.run(self._get_ids_async(exact=False)) self.mapping = self._assign_types(self.mapping) self.mapping.to_csv( (self._OUTPUT_DIR / "mapping.csv"), date_format="%Y-%m-%d", index=False ) async def _get_ids_async(self, exact: bool) -> None: """Asynchronously lookup tmdb_ids from canonical_input. Args: exact: whether to create tasks using title and year only. Returns: dataframe with a potentially incomplete index and column tmdb_id. Uses _PARSING_STYLE to extract title and year from canonical input. If input doesn't match it's dropped from canon_ext. This missing index is used later for stitching the results together. Depending on the exact-argument a list of tasks is created and then run asynchronously. """ pattern = Movieparse.get_parsing_patterns()[self._PARSING_STYLE] needed_lookups = self.mapping[ self.mapping["tmdb_id"].isin([v for v in self.default_codes.values()]) ].copy() canon_ext = ( needed_lookups["canonical_input"] .str.extract(pattern, expand=True) .dropna(how="any", axis=0) ) session = aiohttp.ClientSession() if exact: tasks = [ session.get( f"https://api.themoviedb.org/3/search/movie/?api_key={self._TMDB_API_KEY}&query={x}&year={y}&include_adult=true", ssl=False, ) for x, y in zip(canon_ext["disk_title"], canon_ext["disk_year"]) ] else: tasks = [ session.get( f"https://api.themoviedb.org/3/search/movie/?api_key={self._TMDB_API_KEY}&query={x}&include_adult=true", ssl=False, ) for x in canon_ext["disk_title"] ] results = [] responses = [ await f for f in tqdm( asyncio.as_completed(tasks), desc="{:<35}".format(f"getting ids from TMDB, exact: {exact}"), total=len(tasks), ) ] for response in responses: try: resp = await response.json() results.append(resp["results"][0]["id"]) except IndexError: results.append(self.default_codes["NO_RESULT"]) except KeyError: results.append(self.default_codes["BAD_RESPONSE"]) await session.close() tmp = pd.DataFrame({"tmdb_id": results}, index=canon_ext.index).reindex( self.mapping.index ) self.mapping["tmdb_id"] = np.where( pd.notnull(tmp["tmdb_id"]), tmp["tmdb_id"], self.mapping["tmdb_id"] ) def _update_metadata_lookup_ids(self) -> None: """Creates a set of ids for looking up metadata and removes movieparse default_codes.""" self.metadata_lookup_ids = set(self.mapping["tmdb_id"]) | set( self.mapping["tmdb_id_man"] ) self.metadata_lookup_ids -= {x for x in self.default_codes.values()} def _dissect_metadata_response(self, response: Dict[str, object]) -> None: results = [] tmdb_id = response.pop("id") for c, df in self._metadata().items(): tmp = pd.DataFrame() if c in ["cast", "crew"]: tmp = pd.json_normalize(response["credits"].pop(c)).add_prefix(f"{c}.") # type: ignore [attr-defined] elif c == "collection": collect = response.pop("belongs_to_collection") if collect is not None: tmp = pd.json_normalize(collect).add_prefix(f"{c}.") elif c == "details": tmp = pd.json_normalize(response) else: tmp = pd.json_normalize(response.pop(c)).add_prefix(f"{c}.") tmp["tmdb_id"] = tmdb_id if tmp.empty is False: first_column = tmp.pop("tmdb_id") tmp.insert(0, "tmdb_id", first_column) df = pd.concat([df, tmp], axis=0, ignore_index=True) results.append(df) ( self.cast, self.collect, self.crew, self.genres, self.prod_comp, self.prod_count, self.spoken_langs, self.details, ) = results async def _get_metadata(self) -> None: session = aiohttp.ClientSession() tasks = [ session.get( f"https://api.themoviedb.org/3/movie/{tmdb_id}?api_key={self._TMDB_API_KEY}&language={self._LANGUAGE}&append_to_response=credits", ssl=False, ) for tmdb_id in self.metadata_lookup_ids ] responses = [ await f for f in tqdm( asyncio.as_completed(tasks), desc="{:<35}".format("getting metadata from TMDB"), total=len(tasks), ) ] for response in tqdm(responses, desc="{:<35}".format("organizing responses")): if response.status == 200: self._dissect_metadata_response(await response.json()) await session.close()
[docs] def write(self) -> None: """Writes all non-empty metadata dataframes as CSV files to output_dir.""" for fname, df in self._metadata().items(): tmp_path = self._OUTPUT_DIR / f"{fname}.csv" if df.empty is False: df.to_csv( tmp_path, date_format="%Y-%m-%d", index=False, float_format="%.3f" )
def _assign_types(self, df: pd.DataFrame) -> pd.DataFrame: """Casts df columns to specified types. tmdb_id is shared among all metadata files but only listed for mapping.csv. Args: df: dataframe to be casted Returns: df with casted columns. """ types = { # mapping.csv "tmdb_id": "int32", "tmdb_id_man": "int32", "input": object, # can be both a string and a path! "canonical_input": str, # cast.csv "cast.adult": bool, "cast.gender": "int8", "cast.id": int, "cast.known_for_department": "category", "cast.name": str, "cast.original_name": str, "cast.popularity": float, "cast.profile_path": str, "cast.cast_id": "int8", "cast.character": str, "cast.credit_id": str, "cast.order": "int8", # collections.csv "collection.id": int, "collection.name": str, "collection.poster_path": str, "collection.backdrop_path": str, # crew.csv "crew.adult": bool, "crew.gender": "int8", "crew.id": int, "crew.known_for_department": "category", "crew.name": str, "crew.original_name": str, "crew.popularity": float, "crew.profile_path": str, "crew.credit_id": str, "crew.department": "category", "crew.job": str, # genres.csv "genres.id": "int8", "genres.name": str, # production_companies.csv "production_companies.id": "int32", "production_companies.logo_path": str, "production_companies.name": "category", "production_companies.origin_country": "category", "production_countries.iso_3166_1": "category", "production_countries.name": str, # spoken_languages.csv "spoken_languages.english_name": "category", "spoken_languages.iso_3166_1": "category", "spoken_languages.name": str, # details.csv "adult": bool, "backdrop_path": str, "budget": int, "homepage": str, "imdb_id": str, "original_language": "category", "original_title": str, "overview": str, "popularity": float, "poster_path": str, "release_date": "datetime64[ns]", "revenue": int, "runtime": "int16", "status": "category", "tagline": str, "title": str, "video": bool, "vote_average": float, "vote_count": "int16", } for k, v in types.items(): if k in df.columns: df[k] = df[k].astype(v) return df