Source code for birdfsd_yolov5.label_studio_helpers.watchdog

#!/usr/bin/env python
# coding: utf-8

import argparse
import copy
import imghdr
import shutil
import time
import uuid
from glob import glob
from pathlib import Path

from PIL import Image, UnidentifiedImageError
from dotenv import load_dotenv
from loguru import logger

from birdfsd_yolov5.label_studio_helpers.add_and_sync_new_project import (
    add_new_project, add_and_sync_data_storage)
from birdfsd_yolov5.model_utils.handlers import catch_keyboard_interrupt


class _MissingArgument(Exception):
    pass


[docs]class WatchDog: """Creates new projects when a change is detected in the source folder. This class is used to create a new folder for the images to be stored in. The folder name is based on the current time and date. The folder is created in the root data folder. The folder name is returned. """ def __init__(self, root_data_folder: str, images_per_folder: int = 1000, debug: bool = False): """ Args: root_data_folder (str): The Root folder where the images are stored. images_per_folder (int): Number of Images per folder. debug (bool): Debug mode. Returns: str: The name of the folder that was created. """ self.root_data_folder = root_data_folder self.images_per_folder = images_per_folder self.debug = debug @staticmethod def _create_dummy_data() -> None: dummy_projects = ['project-1000', 'project-1001', 'MOVE_ME'] for project in dummy_projects: num_dummies = 1000 proj_folder = Path(f'dummy/{project}') if project == dummy_projects[-2]: num_dummies = 200 elif project == 'MOVE_ME': num_dummies = 3600 proj_folder = Path(project) proj_folder.mkdir(exist_ok=True, parents=True) for _ in range(num_dummies): fname = str(uuid.uuid4()) Path(proj_folder / Path(f'fake_{str(fname).zfill(4)}.jpg')).touch() logger.debug('Created dummy data') return
[docs] def validate_image_file(self, file: str) -> str: """Validates an image file. Args: file (str): The path to the image file. Returns: str: The path to the image file. Raises: UnidentifiedImageError: If the image file is corrupted. """ if self.debug: return file try: if imghdr.what(file) and Image.open(file): return file except UnidentifiedImageError: time.sleep(1) if imghdr.what(file) and Image.open(file): return file else: logger.warning(f'`{file}` is corrupted!') shutil.move( file, f'{Path(self.root_data_folder).parent}/data_corrupted')
def _generate_next_folder_name(self) -> str: project_folders = sorted(glob(f'{self.root_data_folder}/project-*')) num = str(int(project_folders[-1].split('project-')[-1]) + 1).zfill(4) Path(f'{self.root_data_folder}/project-{num}').mkdir() return f'{self.root_data_folder}/project-{num}' def _refresh_src(self) -> tuple: folders = glob(f'{self.root_data_folder}/*') project_folders = glob(f'{self.root_data_folder}/project-*') new_folders = list(set(folders).difference(project_folders)) if new_folders: logger.debug(f'New folder(s) detected: {new_folders}') new_files = [] for new_folder in new_folders: cur_files = [ x for x in glob(f'{new_folder}/**/*', recursive=True) if not Path(x).is_dir() ] if cur_files: new_files.append(cur_files) new_files = sum(new_files, []) return folders, project_folders, new_folders, new_files def _arrange_new_data_files(self) -> None: Path(f'{Path(self.root_data_folder).parent}/data_corrupted').mkdir( exist_ok=True) folders, project_folders, new_folders, new_files = self._refresh_src() not_filled_folders = [] project_folders = sorted(glob(f'{self.root_data_folder}/project-*')) for folder in project_folders: folder_size = len(glob(f'{folder}/*')) if self.images_per_folder > folder_size: not_filled_folders.append((folder, folder_size)) logger.debug(f'Not filled: {folder}, size: {folder_size}') if not_filled_folders: for folder, folder_size in not_filled_folders: for file in new_files: if self.validate_image_file(file): shutil.move(file, folder) if len(glob(f'{folder}/*')) == 1000: break folders, project_folders, new_folders, new_files = self._refresh_src() chunks = [ new_files[i:i + self.images_per_folder] for i in range(0, len(new_files), self.images_per_folder) ] for chunk in chunks: dst = self._generate_next_folder_name() folder_name = Path(dst).name for file in chunk: if self.validate_image_file(file): shutil.move(file, dst) if not self.debug: new_project = add_new_project(folder_name) _ = add_and_sync_data_storage(new_project['id'], new_project['title']) for empty_folder in new_folders: contains_any_file = [ x for x in glob(f'{empty_folder}/**/*', recursive=True) if not Path(x).is_dir() ] if not contains_any_file: shutil.rmtree(empty_folder)
[docs] def watch(self): """Monitor changes in the source and organize the data accordingly. Returns: None """ catch_keyboard_interrupt() if self.debug: self.root_data_folder = 'dummy' self._create_dummy_data() else: Path(f'{self.root_data_folder}/project-0001').mkdir(exist_ok=True) logger.debug('Started watchdog...') global_state = glob(f'{self.root_data_folder}/**/*', recursive=True) while True: local_state = glob(f'{self.root_data_folder}/**/*', recursive=True) if global_state != local_state: logger.debug('Detected change!') global_state = copy.deepcopy(local_state) self._arrange_new_data_files() time.sleep(60)
if __name__ == '__main__': load_dotenv() parser = argparse.ArgumentParser() parser.add_argument('--root-data-folder', help='Path to the folder where all the data is kept', type=str) parser.add_argument('--debug', help='Debug mode', action='store_true') parser.add_argument('--images-per-folder', help='Number of images per folder', type=int, default=1000) args = parser.parse_args() if not args.root_data_folder and not args.debug: raise _MissingArgument( '`--root_data_folder` is required when not in debug mode!') watch_dog = WatchDog(root_data_folder=args.root_data_folder, images_per_folder=args.images_per_folder, debug=args.debug) watch_dog.watch()