Codice sorgente per utilities.services

import shutil
from glob import glob
import os

from disturbs.ligthness import contrast_correction
from disturbs.noise import *
from disturbs.focus import *
from disturbs.geometric import *

from utilities.metrics import *
import random
import re

from utilities.noise_data import NoiseData

""" 
Questa classe implementa tutte le funzionalità per caricare immagini e label, aggiungere il disturbo alle immagini e 
andare a creare un dataset compatibile con yolo. Il caricamento delle immagini sfrutta OpenCV2 mentre la creazione 
dei disturbi viene eseguita con le funzioni sviluppate. 
"""


[documenti] class Services: image_list = [] metrics = {"metrics_image": [], "metrics_image_d": []} output_path = "" input_path = "" perc_tot = 1.0 def __init__(self, image_list: list, output_path: str, perc_tot: float, seed=True): """ Permette la creazione di un oggetto della classe Services. Args: image_list: una lista che contiene tutti i path alle immagini output_path: un path in cui verrà creato il nuovo dataset contenente tutte le immagini con i disturbi. perc_tot: la percentuale di immagini che dovranno contenere disturbi. Deve essere un numero compreso nell'intervallo [0, 1]. Se il numero specificato è compreso nell'intervallo [0, 100] verrà convertito nell'intervallo [0, 1]. Raises: ValueError: se il parametro perc_tot non è un numero compreso tra 0 e 1. Returns: """ self.image_list = image_list self.output_path = output_path if 0.0 <= perc_tot <= 1.0: self.perc_tot = perc_tot elif 0.0 <= perc_tot <= 100: self.perc_tot = perc_tot / 100 else: raise ValueError("perc_tot deve essere un numero compreso tra 0.0 e 1.0") if seed: random.seed(4242) # Per riprodurre risultati self.__init_input_path() # self.__copy_yolo_dir() def __read_file(self, input_path: str) -> cv2.typing.MatLike: """ Questo metodo permette di caricare in memoria una singola immagine. Args: input_path: il path di input dell'immagine da caricare. Raises: FileNotFoundError: se l'input_path non esiste oppure non contiene immagini. ValueError: se l'immagine trovata nel path non è valida o non leggibile. Returns: l'immagine caricata in memoria sottoforma di matrice. """ if not os.path.exists(input_path): raise FileNotFoundError(f"Path '{input_path}' non esistente.") # image_extensions = ("*.jpg", "*.jpeg", "*.png", "*.bmp") image_files = [] # for ext in image_extensions: image_files.extend(glob(input_path)) # os.path.join(input_path, ext) if not image_files: raise FileNotFoundError("Nessuna immagine trovata nella cartella.") image_path = image_files[0] image = cv2.imread(image_path) if image is None: raise ValueError(f"Immagine non valida o non leggibile: {image_path}") return image def __read_label(self, label_path: str) -> list[str]: """ Questo metodo permette di caricare la label di un'immagine. Args: label_path: il path di input della label da caricare. Returns: una lista contenente la/le label caricate in memoria. """ try: with open(label_path, 'r') as f: labels = [line.strip() for line in f if line.strip()] except FileNotFoundError: print(f"Error: File not found at {label_path}") exit() return labels def __save_image(self, image: cv2.typing.MatLike, label: Union[str, list], filename_image: str, filename_label: Union[str, list]) -> None: """ Questo metodo permette di salvare un immagine e la relativa/e label nelle directory specificate. Args: image: l'immagine da salvare. label: la/le label relativa/e all'immagine da salvare. filename_image: il nome del file in cui verrà salvata l'immagine. filename_label: il nome del/dei file in cui verrò salvata la label. Raises: IOError: se si presenta un errore durante il salvataggio dell'immagine. Returns: """ if not os.path.exists(self.output_path): os.makedirs(self.output_path) IMAGE_DIR = self.output_path + "images/" LABEL_DIR = self.output_path + "labels/" if not os.path.exists(IMAGE_DIR): os.makedirs(IMAGE_DIR) if not os.path.exists(LABEL_DIR): os.makedirs(LABEL_DIR) out_img_path = os.path.join(IMAGE_DIR, filename_image) success = cv2.imwrite(out_img_path, image) if not success: raise IOError(f"Errore durante il salvataggio dell'immagine in: {out_img_path}") out_label_path = os.path.join(LABEL_DIR, filename_label) with open(out_label_path, "w") as file: if (isinstance(label, str)): file.write(label) elif (isinstance(label, list)): for l in label: file.write(l + "\n") # return out_img_path, out_label_path def __parse_path(self, image_path): """ Questo metodo permette di ricostruire il path della label partendo da quello dell'immagine. Si suppone che le immagini siano salvate in ./dataset/test/images/nome_immagine.jpg e che le labels siano salvate in un path del tipo: ./dataset/test/labels/nome_immagine.txt Args: image_path: il path dell'immagine da cui otterremo il path della label. Returns: il path della label nel formato descritto in precedenza. """ # Replace 'images/' with 'labels/' and '.jpg' (or other extensions) with '.txt' label_path = re.sub(r'/images/(.*)\.(jpg|jpeg|png)$', r'/labels/\1.txt', image_path) return label_path def __distorsions(self, noise_data_list: list[NoiseData], image: cv2.typing.MatLike, label: Union[list[str], str], prev=False) -> (cv2.typing.MatLike, Union[list[str], str]): """ Questo metodo permette di applicare le distorsioni definite nel modulo library. Non tutti i parametri dei metodi presentati in quel modulo saranno modificabili a causa di scelte implementative meglio descritte nella relazione. Args: noise_data_list: una lista che contiene tutte le distorsioni da applicare. image: l'immagine a cui applicare le distorsioni. label: la label dell'immagine, nella rotazione dovrà essere modificata. Returns: l'immagine e la label con i disturbi applicati. """ noise_data_list = sorted(noise_data_list, key=lambda n: n.priority) for noise in noise_data_list: if noise.typology == "lightness": meanpoint = (noise.values[0] + noise.values[1]) // 2 min_v = random.randint(noise.values[0], meanpoint) max_v = random.randint(meanpoint + 1, noise.values[1]) if prev: min_v = noise.values[0] max_v = noise.values[1] image = contrast_correction(image, out_min=min_v, out_max=max_v) # Blur if noise.typology == "focus_blur": sigma = random.uniform(0, noise.values[0]) while sigma <= 0e-12: sigma = random.uniform(0, noise.values[0]) if prev: sigma = noise.values[0] image = apply_focus_blur(image, sigma=sigma) if noise.typology == "motion_blur": ksize = random.randint(0, noise.values[0]) if prev: ksize = noise.values[0] print("Motion Blur ks: " + str(ksize)) while float(ksize) <= 0e-12: odds = [x for x in range(1, noise.values[0]) if x % 2 != 0] ksize = random.choice(odds) # ksize = random.randint(1, noise.values[0]) angle = random.uniform(-360, 360) if ksize % 2 == 0: ksize += 1 image = apply_motion_blur(image, ksize=ksize, angle=angle) # Noise if noise.typology == "gaussian": # print("input: ", noise.values[0]) std_rand = random.uniform(0, noise.values[0]) if prev: std_rand = noise.values[0] # print(std_rand) while std_rand <= 0e-12: std_rand = random.uniform(0, noise.values[0]) image = add_gaussian_noise(image, std_dev=std_rand) if noise.typology == "salt_pepper": amount = random.uniform(0.0, noise.values[0]) if prev: amount = noise.values[0] while amount <= 0e-12: amount = random.uniform(0, noise.values[0]) image = add_salt_and_pepper_noise(image, amount=amount) # Geometry if noise.typology == "geometric": angle = random.uniform(-noise.values[0], noise.values[0]) if prev: angle = noise.values[0] image, label = rotate_image_and_label(image, label, angle=angle) return image, label
[documenti] def preview(self, noise_data_list: list[NoiseData]) -> (cv2.typing.MatLike, Union[list[str], str], cv2.typing.MatLike, Union[list[str], str]): """ Questo metodo permette di applicare i disturbi ad un'immagine caricata in memoria senza andare a salvarla. Args: noise_data_list: una lista che contiene tutti i rumori da applicare all'immagine. Returns: l'immagine con i disturbi e la label con i disturbi. """ image_path = np.random.choice(self.image_list, size=1)[0] # print(image_path) image = self.__read_file(image_path) label = self.__read_label(self.__parse_path(image_path)) nimage, nlabel = self.__distorsions(noise_data_list, image, label, True) metrics_prev = self.log_metric(image, nimage, save = False) return image, nimage, label, nlabel, metrics_prev
[documenti] def apply_distortion(self, noise_data_list: list[NoiseData]) -> None: """ Questo metodo permette di applicare le distorsioni ad una determinata percentuale di immagini presenti nel dataset. Inoltre verranno anche salvate all'interno dell'output path specificato nel costruttore. Args: noise_data_list: una lista contenente tutte le distorsioni da applicare all'immagine. Returns: """ if len(self.image_list) == 0: return if len(noise_data_list) == 0 and not (isinstance(noise_data_list, NoiseData)): return n = int(len(self.image_list) * self.perc_tot) # TODO: Parallelizzare selected_images = np.random.choice(self.image_list, size=n, replace=False) for image_path in selected_images: image = self.__read_file(image_path) label = self.__read_label(self.__parse_path(image_path)) if isinstance(label, str): if label == None or label == '': continue if isinstance(label, list): if len(label) == 0: continue original_image = image.copy() image, label = self.__distorsions(noise_data_list, image, label) _ = self.log_metric(original_image, image, True) self.__save_image(image, label, os.path.basename(image_path), os.path.basename(self.__parse_path(image_path))) selected_set = set(selected_images) oimages = [img for img in self.image_list if img not in selected_set] for image_path in oimages: image = self.__read_file(image_path) label = self.__read_label(self.__parse_path(image_path)) if isinstance(label, str): if label == None or label == '': continue if isinstance(label, list): if len(label) == 0: continue self.__save_image( image, label, os.path.basename(image_path), os.path.basename(self.__parse_path(image_path)) )
[documenti] def log_metric(self, original_image, distorted_image, save=True): """ Questo metodo calcola le metriche dell'immagine prima e dopo l'applicazione delle distorsioni Args: save: permette di salvare le metriche in un dizionario. original_image: l'immagine senza disturbi. distorted_image: la stessa immagine precedente con i disturbi applicati. Returns: un dizionario che contiene le metriche dell'immagine originale e l'immagine a cui sono stati applicati i disturbi. """ original_metrics = self.__get_image_metrics(original_image) distorted_metrics = self.__get_image_metrics(distorted_image) if save: # print("saving metrics") self.metrics["metrics_image"].append(original_metrics) self.metrics["metrics_image_d"].append(distorted_metrics) return {"metrics_image": [original_metrics], "metrics_image_d": [distorted_metrics]}
def __get_image_metrics(self, image): """ Calcola le metriche di distorsione data un immagine Args: image: l'immagine in input. Returns: un dizionario che contiene le metriche che riguardano l'immagine con il rumore. """ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) cont = contrast(gray) blur = focus(gray) return { "mean_contrast_std": cont, "blurriness_laplacian_var": blur } def __copy_yolo_dir(self) -> None: """ Questo metodo permette di copiare una directory yolo presente nel sistema all'interno dell'output path specificato nel costruttore. Returns: """ if not os.path.exists(self.output_path): os.makedirs(self.output_path) distorted_dir = os.path.join(self.output_path, 'train/') for dir_name in ['train']: dir_path = os.path.join(self.output_path, dir_name) if not os.path.exists(dir_path): os.makedirs(dir_path) data_yml_path = os.path.join(self.input_path, 'data.yaml') if os.path.exists(data_yml_path): shutil.copy(data_yml_path, self.output_path) else: print(f"Warning: 'data.yml' non trovato in '{self.input_path}'.") for dir_name in ['test', 'valid']: src_dir = os.path.join(self.input_path, dir_name) dst_dir = os.path.join(self.output_path, dir_name) if os.path.exists(src_dir): shutil.copytree(src_dir, dst_dir) else: print(f"Warning: '{dir_name}' directory non trovata in '{self.input_path}'.") self.output_path = distorted_dir
[documenti] def make_directory(self) -> None: """ Questo metodo permette di invocare il metodo privato __copy_yolo_dir(). Returns: """ self.__copy_yolo_dir()
def __init_input_path(self) -> None: #TODO """ Questo metodo permette di inizializzare l'input path sfruttando la prima immagine presente all'interno di image_list. Returns: """ if len(self.image_list) == 0: raise IndexError("image_list non contiene alcun path perché è vuota.") url = self.image_list[0] parts = url.split('/') self.input_path = '/'.join(parts[:len(parts) - 3]) + '/'