import shutil
from glob import glob
import os
from disturbs.ligthness import contrast_correction
from disturbs.noise import *
from disturbs.focus import *
from disturbs.geometric import *
from utilities.metrics import *
import random
import re
from utilities.noise_data import NoiseData
"""
Questa classe implementa tutte le funzionalità per caricare immagini e label, aggiungere il disturbo alle immagini e
andare a creare un dataset compatibile con yolo. Il caricamento delle immagini sfrutta OpenCV2 mentre la creazione
dei disturbi viene eseguita con le funzioni sviluppate.
"""
[documenti]
class Services:
image_list = []
metrics = {"metrics_image": [], "metrics_image_d": []}
output_path = ""
input_path = ""
perc_tot = 1.0
def __init__(self, image_list: list, output_path: str, perc_tot: float, seed=True):
"""
Permette la creazione di un oggetto della classe Services.
Args:
image_list: una lista che contiene tutti i path alle immagini
output_path: un path in cui verrà creato il nuovo dataset contenente tutte le immagini con i disturbi.
perc_tot: la percentuale di immagini che dovranno contenere disturbi. Deve essere un numero compreso nell'intervallo [0, 1]. Se il numero specificato è compreso nell'intervallo [0, 100] verrà convertito nell'intervallo [0, 1].
Raises:
ValueError: se il parametro perc_tot non è un numero compreso tra 0 e 1.
Returns:
"""
self.image_list = image_list
self.output_path = output_path
if 0.0 <= perc_tot <= 1.0:
self.perc_tot = perc_tot
elif 0.0 <= perc_tot <= 100:
self.perc_tot = perc_tot / 100
else:
raise ValueError("perc_tot deve essere un numero compreso tra 0.0 e 1.0")
if seed:
random.seed(4242) # Per riprodurre risultati
self.__init_input_path()
# self.__copy_yolo_dir()
def __read_file(self, input_path: str) -> cv2.typing.MatLike:
"""
Questo metodo permette di caricare in memoria una singola immagine.
Args:
input_path: il path di input dell'immagine da caricare.
Raises:
FileNotFoundError: se l'input_path non esiste oppure non contiene immagini.
ValueError: se l'immagine trovata nel path non è valida o non leggibile.
Returns:
l'immagine caricata in memoria sottoforma di matrice.
"""
if not os.path.exists(input_path):
raise FileNotFoundError(f"Path '{input_path}' non esistente.")
# image_extensions = ("*.jpg", "*.jpeg", "*.png", "*.bmp")
image_files = []
# for ext in image_extensions:
image_files.extend(glob(input_path)) # os.path.join(input_path, ext)
if not image_files:
raise FileNotFoundError("Nessuna immagine trovata nella cartella.")
image_path = image_files[0]
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Immagine non valida o non leggibile: {image_path}")
return image
def __read_label(self, label_path: str) -> list[str]:
"""
Questo metodo permette di caricare la label di un'immagine.
Args:
label_path: il path di input della label da caricare.
Returns:
una lista contenente la/le label caricate in memoria.
"""
try:
with open(label_path, 'r') as f:
labels = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"Error: File not found at {label_path}")
exit()
return labels
def __save_image(self, image: cv2.typing.MatLike, label: Union[str, list], filename_image: str,
filename_label: Union[str, list]) -> None:
"""
Questo metodo permette di salvare un immagine e la relativa/e label nelle directory specificate.
Args:
image: l'immagine da salvare.
label: la/le label relativa/e all'immagine da salvare.
filename_image: il nome del file in cui verrà salvata l'immagine.
filename_label: il nome del/dei file in cui verrò salvata la label.
Raises:
IOError: se si presenta un errore durante il salvataggio dell'immagine.
Returns:
"""
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
IMAGE_DIR = self.output_path + "images/"
LABEL_DIR = self.output_path + "labels/"
if not os.path.exists(IMAGE_DIR):
os.makedirs(IMAGE_DIR)
if not os.path.exists(LABEL_DIR):
os.makedirs(LABEL_DIR)
out_img_path = os.path.join(IMAGE_DIR, filename_image)
success = cv2.imwrite(out_img_path, image)
if not success:
raise IOError(f"Errore durante il salvataggio dell'immagine in: {out_img_path}")
out_label_path = os.path.join(LABEL_DIR, filename_label)
with open(out_label_path, "w") as file:
if (isinstance(label, str)):
file.write(label)
elif (isinstance(label, list)):
for l in label:
file.write(l + "\n")
# return out_img_path, out_label_path
def __parse_path(self, image_path):
"""
Questo metodo permette di ricostruire il path della label partendo da quello dell'immagine. Si suppone che le
immagini siano salvate in ./dataset/test/images/nome_immagine.jpg e che le labels siano salvate in un path del
tipo: ./dataset/test/labels/nome_immagine.txt
Args:
image_path: il path dell'immagine da cui otterremo il path della label.
Returns:
il path della label nel formato descritto in precedenza.
"""
# Replace 'images/' with 'labels/' and '.jpg' (or other extensions) with '.txt'
label_path = re.sub(r'/images/(.*)\.(jpg|jpeg|png)$', r'/labels/\1.txt', image_path)
return label_path
def __distorsions(self, noise_data_list: list[NoiseData], image: cv2.typing.MatLike,
label: Union[list[str], str], prev=False) -> (cv2.typing.MatLike, Union[list[str], str]):
"""
Questo metodo permette di applicare le distorsioni definite nel modulo library. Non tutti i parametri dei metodi
presentati in quel modulo saranno modificabili a causa di scelte implementative meglio descritte nella relazione.
Args:
noise_data_list: una lista che contiene tutte le distorsioni da applicare.
image: l'immagine a cui applicare le distorsioni.
label: la label dell'immagine, nella rotazione dovrà essere modificata.
Returns:
l'immagine e la label con i disturbi applicati.
"""
noise_data_list = sorted(noise_data_list, key=lambda n: n.priority)
for noise in noise_data_list:
if noise.typology == "lightness":
meanpoint = (noise.values[0] + noise.values[1]) // 2
min_v = random.randint(noise.values[0], meanpoint)
max_v = random.randint(meanpoint + 1, noise.values[1])
if prev:
min_v = noise.values[0]
max_v = noise.values[1]
image = contrast_correction(image, out_min=min_v, out_max=max_v)
# Blur
if noise.typology == "focus_blur":
sigma = random.uniform(0, noise.values[0])
while sigma <= 0e-12:
sigma = random.uniform(0, noise.values[0])
if prev:
sigma = noise.values[0]
image = apply_focus_blur(image, sigma=sigma)
if noise.typology == "motion_blur":
ksize = random.randint(0, noise.values[0])
if prev:
ksize = noise.values[0]
print("Motion Blur ks: " + str(ksize))
while float(ksize) <= 0e-12:
odds = [x for x in range(1, noise.values[0]) if x % 2 != 0]
ksize = random.choice(odds)
# ksize = random.randint(1, noise.values[0])
angle = random.uniform(-360, 360)
if ksize % 2 == 0:
ksize += 1
image = apply_motion_blur(image, ksize=ksize, angle=angle)
# Noise
if noise.typology == "gaussian":
# print("input: ", noise.values[0])
std_rand = random.uniform(0, noise.values[0])
if prev:
std_rand = noise.values[0]
# print(std_rand)
while std_rand <= 0e-12:
std_rand = random.uniform(0, noise.values[0])
image = add_gaussian_noise(image, std_dev=std_rand)
if noise.typology == "salt_pepper":
amount = random.uniform(0.0, noise.values[0])
if prev:
amount = noise.values[0]
while amount <= 0e-12:
amount = random.uniform(0, noise.values[0])
image = add_salt_and_pepper_noise(image, amount=amount)
# Geometry
if noise.typology == "geometric":
angle = random.uniform(-noise.values[0], noise.values[0])
if prev:
angle = noise.values[0]
image, label = rotate_image_and_label(image, label, angle=angle)
return image, label
[documenti]
def preview(self, noise_data_list: list[NoiseData]) -> (cv2.typing.MatLike, Union[list[str], str],
cv2.typing.MatLike, Union[list[str], str]):
"""
Questo metodo permette di applicare i disturbi ad un'immagine caricata in memoria senza andare a salvarla.
Args:
noise_data_list: una lista che contiene tutti i rumori da applicare all'immagine.
Returns:
l'immagine con i disturbi e la label con i disturbi.
"""
image_path = np.random.choice(self.image_list, size=1)[0]
# print(image_path)
image = self.__read_file(image_path)
label = self.__read_label(self.__parse_path(image_path))
nimage, nlabel = self.__distorsions(noise_data_list, image, label, True)
metrics_prev = self.log_metric(image, nimage, save = False)
return image, nimage, label, nlabel, metrics_prev
[documenti]
def apply_distortion(self, noise_data_list: list[NoiseData]) -> None:
"""
Questo metodo permette di applicare le distorsioni ad una determinata percentuale di immagini presenti nel
dataset. Inoltre verranno anche salvate all'interno dell'output path specificato nel costruttore.
Args:
noise_data_list: una lista contenente tutte le distorsioni da applicare all'immagine.
Returns:
"""
if len(self.image_list) == 0:
return
if len(noise_data_list) == 0 and not (isinstance(noise_data_list, NoiseData)):
return
n = int(len(self.image_list) * self.perc_tot)
# TODO: Parallelizzare
selected_images = np.random.choice(self.image_list, size=n, replace=False)
for image_path in selected_images:
image = self.__read_file(image_path)
label = self.__read_label(self.__parse_path(image_path))
if isinstance(label, str):
if label == None or label == '':
continue
if isinstance(label, list):
if len(label) == 0:
continue
original_image = image.copy()
image, label = self.__distorsions(noise_data_list, image, label)
_ = self.log_metric(original_image, image, True)
self.__save_image(image, label, os.path.basename(image_path),
os.path.basename(self.__parse_path(image_path)))
selected_set = set(selected_images)
oimages = [img for img in self.image_list if img not in selected_set]
for image_path in oimages:
image = self.__read_file(image_path)
label = self.__read_label(self.__parse_path(image_path))
if isinstance(label, str):
if label == None or label == '':
continue
if isinstance(label, list):
if len(label) == 0:
continue
self.__save_image(
image,
label,
os.path.basename(image_path),
os.path.basename(self.__parse_path(image_path))
)
[documenti]
def log_metric(self, original_image, distorted_image, save=True):
"""
Questo metodo calcola le metriche dell'immagine prima e dopo l'applicazione delle distorsioni
Args:
save: permette di salvare le metriche in un dizionario.
original_image: l'immagine senza disturbi.
distorted_image: la stessa immagine precedente con i disturbi applicati.
Returns:
un dizionario che contiene le metriche dell'immagine originale e l'immagine a cui sono stati applicati i disturbi.
"""
original_metrics = self.__get_image_metrics(original_image)
distorted_metrics = self.__get_image_metrics(distorted_image)
if save:
# print("saving metrics")
self.metrics["metrics_image"].append(original_metrics)
self.metrics["metrics_image_d"].append(distorted_metrics)
return {"metrics_image": [original_metrics], "metrics_image_d": [distorted_metrics]}
def __get_image_metrics(self, image):
"""
Calcola le metriche di distorsione data un immagine
Args:
image: l'immagine in input.
Returns:
un dizionario che contiene le metriche che riguardano l'immagine con il rumore.
"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
cont = contrast(gray)
blur = focus(gray)
return {
"mean_contrast_std": cont,
"blurriness_laplacian_var": blur
}
def __copy_yolo_dir(self) -> None:
"""
Questo metodo permette di copiare una directory yolo presente nel sistema all'interno dell'output path
specificato nel costruttore.
Returns:
"""
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
distorted_dir = os.path.join(self.output_path, 'train/')
for dir_name in ['train']:
dir_path = os.path.join(self.output_path, dir_name)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
data_yml_path = os.path.join(self.input_path, 'data.yaml')
if os.path.exists(data_yml_path):
shutil.copy(data_yml_path, self.output_path)
else:
print(f"Warning: 'data.yml' non trovato in '{self.input_path}'.")
for dir_name in ['test', 'valid']:
src_dir = os.path.join(self.input_path, dir_name)
dst_dir = os.path.join(self.output_path, dir_name)
if os.path.exists(src_dir):
shutil.copytree(src_dir, dst_dir)
else:
print(f"Warning: '{dir_name}' directory non trovata in '{self.input_path}'.")
self.output_path = distorted_dir
[documenti]
def make_directory(self) -> None:
"""
Questo metodo permette di invocare il metodo privato __copy_yolo_dir().
Returns:
"""
self.__copy_yolo_dir()
def __init_input_path(self) -> None:
#TODO
"""
Questo metodo permette di inizializzare l'input path sfruttando la prima immagine presente all'interno di image_list.
Returns:
"""
if len(self.image_list) == 0:
raise IndexError("image_list non contiene alcun path perché è vuota.")
url = self.image_list[0]
parts = url.split('/')
self.input_path = '/'.join(parts[:len(parts) - 3]) + '/'