import sys from pathlib import Path from subprocess import PIPE, run from cv2 import imread, imwrite import pandas as pd import numpy as np from .movie import Movie from .views import UnivariateSequence, MultivariateSequence from .visuals import UnivariatePlot, MultivariatePlot _summary = {'seqmean': {'monochromatic': ['luminance'], 'saturation': ['saturation', 'colourfulness'], 'hue': ['hsV', 'labHcl']}, 'distribution': {'monochromatic': ['luminance'], 'saturation': ['saturation', 'colourfulness'], 'hue': ['hsV', 'labHcl']}} class Corpus(object): """Main class for working with a corpus of movies and movie data Attributes: basedir (Path) -- The root directory in which all moviefolders together with their data reside. """ def __init__(self, path='./', summary=False): self.basedir = Path(path) self.summary = summary if summary else _summary self.status = self._report() # Aktualisieren nach jeder Digitalisier. self.movies = self._instantiate_movies() def _report(self): """Indexes the available resources for each movie in the corpus. Returns: dict -- a dictionary in which each key corresponds with one movie title holding another 4 keys with information about the availability of the 'video' file, extracted 'frames', 'data' and 'visual'izations. """ # get project directory status = {} korpus_dir = Path(self.basedir).resolve() movie_dirs = [d for d in korpus_dir.iterdir() if d.is_dir()] # filter dot paths and tmp folders movie_dirs = [d for d in movie_dirs if not('.' in str(d.name)[0]) and not('_' in str(d.name)[0])] for m in movie_dirs: self.status = {} movie = m.name status[movie] = {} # get info about movie file video = m / ('movie/' + movie + '.mkv') status[movie]['video'] = video if video.is_file() else None # get info about frame images frames = len(list((m / 'frames/240p30/').glob('*'))) status[movie]['frames'] = frames if frames > 0 else None # get info about data pickles status[movie].setdefault('data', {}) data_files = (m / ('data/')).glob('*.pkl') for d in data_files: d = d.name.split('_') status[movie]['data'].setdefault( d[-4], {}).setdefault( d[-3], {}).update( {d[-2]: True}) # get info about diagram *.png files status[movie].setdefault('visuals', {}) vis_files = (m.glob('*.png')) for v in vis_files: v = v.name.split('_') status[movie]['visuals'].setdefault( v[-4], {}).setdefault( v[-3], {}).update( {v[-2]: True}) return status def _instantiate_movies(self): """Creates a dictionary with one Movie instance for each movie Returns: dict() -- A dictionary in which the movie slugs are the keys and the values are corresponding instances of the Movie class. """ movies = {} for m in self.status.keys(): prefix = m + '_' folder = str(self.basedir / (m + '/frames/240p30/')) movies[m] = Movie(prefix, folder, fps=4) return movies def extract(self): """Exctracts missing images and data from movies in the corpus The method looks for missing frame images and contrast data in the object's status dictionary and extracts them from the movie file and the frame images. It uses the summary dictionary in order to decide which contrast values should be extracted. """ tasks = self._extraction_tasks() for movie, missing in tasks.items(): # Extract frames if 'frames' in missing: self._extract_frames(movie) # Extract data try: for strategy, contrasts in missing['data'].items(): for ctrst, methods in contrasts.items(): for meth in methods: print('{:15s} (data, {:18s}): {} missing with methods: {}'.format(movie, strategy, ctrst, meth)) self._extract_data(movie, strategy, ctrst, meth) except KeyError: print('{0} (data): Nothing to do'.format(movie)) # Extract visuals try: for strategy, contrasts in missing['visuals'].items(): for ctrst, methods in contrasts.items(): for meth in methods: print('{:15s} (visuals, {:15s}): {} missing with methods: {}'.format(movie, strategy, ctrst, meth)) self._extract_visuals(movie, strategy, ctrst, meth) except KeyError: print('{0} (visuals): Nothing to do'.format(movie)) # TODO Extractors implementieren return tasks def _extraction_tasks(self): """Identifies which images and data for which movies are missing """ def task_parser(component): """Identifies tasks by comparing the status and the _summary dict() Incrementally walks through elements of a specific component in the _summary dictionary of the module and looks if corresponding elements are registered in the status dictionary. If this is not the case the element is added to the dictionary of tasks for this component. """ missing = {} for m in self.status.keys(): # For each view, look out if a key for that view already exist. # If it doesnt, no element for that view is available for movie # m. Hence, copy the whole branch of elements for that view to # the task dictionary. Otherwise begin to compare available # contrsts. for view in _summary.keys(): if view not in self.status[m][component].keys(): missing[m] = {component: {view: _summary[view]}} else: # For each contrast in the current view, look out if a # key for that contrast already exist. If it doesn't, # copy the whole branch of that contrast to the task # dictionary. for contrast in _summary[view].keys(): ctrst_keys = self.status[m][component][view].keys() if contrast not in ctrst_keys: missing.setdefault(m, {component: {}}) missing[m][component].setdefault(view, {}) missing[m][component][view][contrast] = \ (_summary[view][contrast]) else: # For each method in the crrent view and # contrast look out if a key for that method # exist already. If not add that method to the # task dictionary for method in _summary[view][contrast]: meth_keys = self.status[m][component][view][contrast].keys() if method not in meth_keys: missing.setdefault(m, {component: {}}) missing[m][component].setdefault(view, {}) missing[m][component][view].setdefault(contrast, []) missing[m][component][view][contrast].append(method) return missing # Movies without frames without_frames = {k: {'frames': 0} for (k, v) in self.status.items() if v['frames'] is None} # Movies with missing data missing_data = task_parser('data') # Movies with missing visuals missing_visuals = task_parser('visuals') # Build extraction tasks dictionary tasks = without_frames [tasks.setdefault(k, {}).update(v) for k, v in missing_data.items()] [tasks.setdefault(k, {}).update(v) for k, v in missing_visuals.items()] return tasks # TODO Die _extract Methoden gehören eigentlich zur Movie Klasse def _extract_frames(self, movie='in_the_loop', fps=4, scale=240): """Extract frames from movie files The implementation uses ffmpeg in order to extract frames from a movie file in a Matroska container. Keyword Arguments: movie {str} -- the filename of the movie file without the file extension.add() (default: {'in_the_loop'}) fps {int} -- how many frames per second are extracted from the movie file.add() (default: {4}) scale {int} -- the resulting size of the movie measured in vertical hight (default: {240}) """ # Create frame images directory Path(self.basedir / movie / 'frames' / (str(scale) + 'p30')).mkdir( parents=True, exist_ok=True) # build shell string for popen i = '"' + str(self.basedir) + '/' + movie + '/movie/' +\ movie + '.mkv' + '"' vf = '"scale=iw*sar:ih,scale=-1:' + str(scale) + ',fps=4"' o = '"' + str(self.basedir) + '/' + movie + '/frames/' +\ str(scale) + 'p30/' + movie + '_' + '%' + '05d.png' + '"' cmd = 'ffmpeg ' + '-i ' + i + ' -vf ' + vf + ' ' + o # -vf 'fps=1' = 1 pro Sekunde; `fps=1` 2 pro Sekunde; `fps=1/60` alle # 60 Sekunden 1 Bild # %05d Namingpattern für die Ausgabe Dateien print('Extracting frames from {0}'.format(movie)) # Spawn ffmpeg process response = run(cmd, shell=True, stdout=PIPE, stderr=PIPE, encoding='utf-8') # Evaluate the result if response.returncode != 0: print(response.stderr) else: print('Finished!') def _extract_data(self, movie, strategy, contrast, method): try: m = Movie(movie + '_', str(self.basedir / movie / 'frames' / '240p30') + '/') if strategy == 'seqmean': view = UnivariateSequence(m._frames) view.seqmean(ctrst=contrast, method=method, frm_stp=4,) elif strategy == 'distribution': view = MultivariateSequence(m._frames) view.populate(ctrst=contrast, method=method, frm_stp=4,) title = movie + '_' + strategy + '_' + contrast + '_' + method + '_4fps' Path(self.basedir / movie / 'data').mkdir(parents=True, exist_ok=True) data = pd.DataFrame(view[:]) data.to_pickle(str(self.basedir / 'data' / (title + '.pkl'))) except: e = sys.exc_info()[0] print('{} (data, {}): {} ({}) raised an error:'.format(movie, strategy, contrast, method)) print('Error: {}'.format(e)) def _extract_visuals(self, movie, strategy, contrast, method): m = Movie(movie + '_', str(self.basedir / movie / 'frames' / '240p30') + '/') data = pd.read_pickle(str(self.basedir / movie / 'data' / (movie + '_' + strategy + '_' + contrast + '_' + method + '_4fps.pkl'))) try: if strategy == 'seqmean': view = UnivariateSequence(m._frames, input_array=data.to_numpy().flatten()) view._contrast = contrast view._method = method view.feature = 'mean' viz = UnivariatePlot(view,) elif strategy == 'distribution': view = MultivariateSequence(m._frames, input_array=data.to_numpy()) view._bins = 16 view._threshold = 6000 view._contrast = contrast view._method = method viz = MultivariatePlot(view) viz.plot(view) title = movie + '_' +\ strategy + '_' +\ contrast + '_' +\ method +\ '_4fps' file_name = title + '.png' viz.saveplt(title=title, fname=self.basedir / movie / file_name) except: e = sys.exc_info()[0] print('{} (visual, {}): {} ({}) raised an error:'.format(movie, strategy, contrast, method)) print('Error: {}'.format(e)) def tableau(self, mode='contrast', select=['distribution', 'monochromatic', 'luminance'], write=True): """Creates a tableau of available diagrams for a given contrast Diagrams for the tableau will not be created in case some of the movies in the corpus lack thre required diagram. Consequently, the tableau will only show diagrams from movies in the corpus that exist already. Keyword Arguments: mode {str} -- Defines the type of components that are drawn together in the tableau. 'contrast' selects diagrams from all the movies in the corpus which represents the contrast, defined in the select argument. 'movie' creates one tableau with all diagrams for each movie, defined in the select argument. (default: {'contrast'}) select {[str]} -- Describes the components selected for the tableau. If the mode is 'contrast' the argument requires a list of 3 strings, refering to the moment, contrast and method in order to count the contrast ([moment, contrast, method]). If mode 'movie' is given the select argument requires a list of 1 to many strings, referring to the movie slugs of movies in the corpus. (default: {['distribution', 'monochromatic', 'luminance']} write {bool} -- Decide, if the tableau image should be written to disk in the corpus folder or not. (default: {True}) Returns: numpy.ndarray -- A numpy ndarry with the data type uint8 showing the tableau as an image. Todo: FIXME Instead of an if/else statement, create tableau class in the visuals module and subclass it for movies and contrasts. Especially the movie table should also be provided as a method for the Movie class. """ if mode == 'contrast': components = self._filter_diagrams(mode=mode, select=select) layout = self.layout_tableau(len(components)) tableau = self._fit_components(components, layout) if write: file_name = self.basedir / (select[0] + '_' + select[1] + '_' + select[2] + '.png') imwrite(str(file_name), tableau) elif mode == 'movie': for movie in select: components = self._filter_diagrams(mode=mode, select=movie) layout = self.layout_tableau(len(components)) tableau = self._fit_components(components, layout) if write: Path(self.basedir / movie / 'visuals' / 'tableau').mkdir( parents=True, exist_ok=True) folder_name = Path(self.basedir / movie / 'visuals' / 'tableau') file_name = folder_name / (movie + '_tableau.png') imwrite(str(file_name), tableau) return tableau def _filter_diagrams(self, mode, select): """Filters which diagrams in a corpus belong to a specific contrast The filter looks for diagrams represented in *.png files only Keyword Arguments: mode {str} -- Where to look for diagrams (see tableau) select {[str]} -- For which movies or contrasts should diagrams be selected (see tableau) Returns: [pathlib.Path] -- A list of file paths to the diagrams that match the selected contrast visualization. Todo: FEATURE Parametize the file-format instead of looking at png files only. """ diagrams = [] if mode == 'contrast': shape, ctrst, meth = select for k, v in self.status.items(): if shape in v['visuals'].keys(): if ctrst in v['visuals'][shape].keys(): if meth in v['visuals'][shape][ctrst].keys(): diagrams.append(Path((self.basedir / k) / (k + '_' + shape + '_' + ctrst + '_' + meth + '_4fps.png'))) elif mode == 'movie': for moment, i in self.status[select]['visuals'].items(): for ctrst, j in i.items(): for meth, k in j.items(): diagrams.append(Path(self.basedir / select / (select + '_' + moment + '_' + ctrst + '_' + meth + '_4fps.png'))) return diagrams @staticmethod def layout_tableau(n, ratio=12): """Calculates the shape of a tableau for a given number of diagrams Arguments: n {int} -- The number of diagrams that should fit into the tableau Keyword Arguments: ratio {int} -- The number of rows that should be created before a new column is created (default: {12}) Returns: (int, int, int) -- A tuple with three values describing the number of rows and the number of columns so that the diagrams fit into it considering the given row/column ratio (n) as well as the of missing placeholder images in order to fill-up the whole tableau. """ cols, r = divmod(n, ratio) if cols == 0: cols, r = (1, 0) rows = divmod(n, cols)[0] if r > 0: rows += 1 return (rows, cols, r) @staticmethod def _fit_components(components, layout): """Builds a tablea out of diagrams in a given layout First, the method creates a list of col diagrams for one row. Then this list is stacked to create one row image. This is repeated for each row so that the result is a list of row images. Again this is stacked to one image with is the tableau image. Arguments: components {[pathlib.Path]} -- A list of file paths to the diagram image files. layout {(int, int, int)} -- A tuple describing the number of diagrams for each row, column as well as the differance of available diagrams and places in the tableau. Returns: numpy.ndarray -- A numpy ndarry with the data type uint8 showing the tableau as an image. """ tableau = [] for row in range(layout[0]): column = [] for col in range(layout[1]): n = row * layout[1] + col try: img = imread(str(components[n])) # Create dummy images to fill-up the remaining space in the # tableau except IndexError: img = np.full((1200, 16000, 3), (255, 255, 255)) column.append(img) column = np.hstack(column) tableau.append(column) tableau = np.vstack(tableau) return tableau