# -*- coding: utf-8 -*- """ Copyright 2019 Petr Masopust, Aprar s.r.o. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Adopted code from https://github.com/rainofmine/Face_Attention_Network """ import torch import numpy as np import random import csv import os from torch.utils.data import Dataset from torch.utils.data.sampler import Sampler from PIL import Image, ImageEnhance, ImageFilter from torchvision import transforms as T class CSVDataset(Dataset): """CSV dataset.""" def __init__(self, train_file, class_list, transform=None): """ Args: train_file (string): CSV file with training annotations class_list (string): CSV file with class list transform (optional): Transformation function """ self.train_file = train_file self.class_list = class_list self.transform = transform # parse the provided class file try: with open(self.class_list, 'r', newline='') as file: self.classes = CSVDataset.load_classes(csv.reader(file, delimiter=' ')) except ValueError as e: raise (ValueError('invalid CSV class file: {}: {}'.format(self.class_list, e)), None) self.labels = {} for key, value in self.classes.items(): self.labels[value] = key # csv with img_path, x1, y1, x2, y2, class_name try: with open(self.train_file, 'r', newline='') as file: self.image_data = CSVDataset._read_annotations(csv.reader(file, delimiter=' '), self.classes) except ValueError as e: raise (ValueError('invalid CSV annotations file: {}: {}'.format(self.train_file, e)), None) self.image_names = list(self.image_data.keys()) @staticmethod def _parse(value, function, fmt): """ Parse a string into a value, and format a nice ValueError if it fails. Returns `function(value)`. Any `ValueError` raised is catched and a new `ValueError` is raised with message `fmt.format(e)`, where `e` is the caught `ValueError`. """ try: return function(value) except ValueError as e: raise (ValueError(fmt.format(e)), None) @staticmethod def load_classes(csv_reader): result = {} for line, row in enumerate(csv_reader): line += 1 try: class_name, class_id = row except ValueError: raise (ValueError("line {}: format should be 'class_name,class_id'".format(line)), None) class_id = CSVDataset._parse(class_id, int, 'line {}: malformed class ID: {{}}'.format(line)) if class_name in result: raise ValueError('line {}: duplicate class name: \'{}\''.format(line, class_name)) result[class_name] = class_id return result def __len__(self): return len(self.image_names) def __getitem__(self, idx): img = self.load_image(idx) annot = self.load_annotations(idx) sample = {'img': img, 'annot': annot, 'scale': 1} if self.transform: sample = self.transform(sample) return sample def load_image(self, image_index): img = Image.open(filepath) img = img.convert(mode="RGB") return img def load_annotations(self, image_index): # get ground truth annotations annotation_list = self.image_data[self.image_names[image_index]] annotations = np.zeros((0, 5)) # some images appear to miss annotations (like image with id 257034) if len(annotation_list) == 0: return annotations # parse annotations for idx, a in enumerate(annotation_list): # some annotations have basically no width / height, skip them x1 = a['x1'] x2 = a['x2'] y1 = a['y1'] y2 = a['y2'] if (x2 - x1) < 1 or (y2 - y1) < 1: continue annotation = np.zeros((1, 5)) annotation[0, 0] = x1 annotation[0, 1] = y1 annotation[0, 2] = x2 annotation[0, 3] = y2 annotation[0, 4] = self.name_to_label(a['class']) annotations = np.append(annotations, annotation, axis=0) return annotations @staticmethod def _read_annotations(csv_reader, classes): result = {} for line, row in enumerate(csv_reader): line += 1 try: img_file, x1, y1, x2, y2, class_name = row[:6] except ValueError: raise (ValueError( "line {}: format should be 'img_file,x1,y1,x2,y2,class_name' or 'img_file,,,,,'".format(line)), None) if img_file not in result: result[img_file] = [] # If a row contains only an image path, it's an image without annotations. if (x1, y1, x2, y2, class_name) == ('.', '.', '.', '.', '.'): continue x1 = CSVDataset._parse(x1, int, 'line {}: malformed x1: {{}}'.format(line)) y1 = CSVDataset._parse(y1, int, 'line {}: malformed y1: {{}}'.format(line)) x2 = CSVDataset._parse(x2, int, 'line {}: malformed x2: {{}}'.format(line)) y2 = CSVDataset._parse(y2, int, 'line {}: malformed y2: {{}}'.format(line)) if class_name != 'ignore': # Check that the bounding box is valid. if x2 <= x1: raise ValueError('line {}: x2 ({}) must be higher than x1 ({})'.format(line, x2, x1)) if y2 <= y1: raise ValueError('line {}: y2 ({}) must be higher than y1 ({})'.format(line, y2, y1)) # check if the current class name is correctly present if class_name not in classes: raise ValueError( 'line {}: unknown class name: \'{}\' (classes: {})'.format(line, class_name, classes)) result[img_file].append({'x1': x1, 'x2': x2, 'y1': y1, 'y2': y2, 'class': class_name}) return result def name_to_label(self, name): return self.classes[name] def label_to_name(self, label): return self.labels[label] def num_classes(self): return max(self.classes.values()) + 1 def image_aspect_ratio(self, image_index): image = Image.open(self.image_names[image_index]) return float(image.width) / float(image.height) class WIDERDataset(Dataset): """Wider dataset.""" def __init__(self, train_file, img_prefix='', transform=None): """ Args: train_file (string): Wider txt file with training annotations img_prefix (string, optional): Prefix for images location transform (optional): Transformation function """ self.train_file = train_file self.transform = transform self.img_prefix = img_prefix # WIDER dataset has only faces. Enhance for additional face properties (see below). self.classes = {'face': 0} self.labels = {} for key, value in self.classes.items(): self.labels[value] = key # Wider file definition example: # file with image name, number of faces, x y w h blur expression illumination invalid occlusion pose # 0--Parade/0_Parade_marchingband_1_117.jpg # 9 # 69 359 50 36 1 0 0 0 0 1 # 227 382 56 43 1 0 1 0 0 1 # 296 305 44 26 1 0 0 0 0 1 # 353 280 40 36 2 0 0 0 2 1 # 885 377 63 41 1 0 0 0 0 1 # 819 391 34 43 2 0 0 0 1 0 # 727 342 37 31 2 0 0 0 0 1 # 598 246 33 29 2 0 0 0 0 1 # 740 308 45 33 1 0 0 0 2 1 try: with open(self.train_file, 'r') as file: self.image_data = WIDERDataset._read_data(file) except ValueError as e: raise (ValueError('invalid WIDER annotations file: {}: {}'.format(self.train_file, e)), None) self.image_names = list(self.image_data.keys()) def __len__(self): return len(self.image_names) def __getitem__(self, idx): img = self.load_image(idx) annot = self.load_annotations(idx) sample = {'img': img, 'annot': annot, 'scale': 1, 'img_name': self.image_names[idx]} if self.transform: sample = self.transform(sample) return sample def load_image(self, image_index): print('Loading image %s' % self.image_names[image_index]) img = Image.open(os.path.join(self.img_prefix, self.image_names[image_index])) img = img.convert(mode="RGB") return img def load_annotations(self, image_index): # get ground truth annotations annotation_list = self.image_data[self.image_names[image_index]] annotations = np.zeros((0, 5)) # some images appear to miss annotations (like image with id 257034) if len(annotation_list) == 0: return annotations # parse annotations for idx, a in enumerate(annotation_list): # some annotations have basically no width / height, skip them x1 = a['x1'] x2 = a['x2'] y1 = a['y1'] y2 = a['y2'] if (x2 - x1) < 1 or (y2 - y1) < 1: continue annotation = np.zeros((1, 5)) annotation[0, 0] = x1 annotation[0, 1] = y1 annotation[0, 2] = x2 annotation[0, 3] = y2 annotation[0, 4] = self.name_to_label(a['class']) annotations = np.append(annotations, annotation, axis=0) return annotations @staticmethod def _read_data(reader): result = {} counter = 0 img_file = None for line in reader: line = line.strip() if counter == 0: # file name or number of faces try: counter = int(line) except ValueError: if img_file and len(result[img_file]) == 0: print("Warning - no faces: %s" % img_file) img_file = line else: counter -= 1 # coordinates e.g. 370 170 9 13 2 0 0 0 2 0 nums = [int(x) for x in line.split()] result.setdefault(img_file, []).append({'x1': nums[0], 'x2': nums[0] + nums[2], 'y1': nums[1], 'y2': nums[1] + nums[3], 'class': 'face'}) return result def name_to_label(self, name): return self.classes[name] def label_to_name(self, label): return self.labels[label] def num_classes(self): return max(self.classes.values()) + 1 def image_aspect_ratio(self, image_index): image = Image.open(os.path.join(self.img_prefix, self.image_names[image_index])) return float(image.width) / float(image.height) def collater(data): imgs = [s['img'] for s in data] annots = [s['annot'] for s in data] scales = [s['scale'] for s in data] widths = [int(s.shape[0]) for s in imgs] heights = [int(s.shape[1]) for s in imgs] batch_size = len(imgs) max_width = np.array(widths).max() max_height = np.array(heights).max() padded_imgs = torch.zeros(batch_size, max_width, max_height, 3) for i in range(batch_size): img = imgs[i] padded_imgs[i, :int(img.shape[0]), :int(img.shape[1]), :] = img max_num_annots = max(annot.shape[0] for annot in annots) # print(annot_padded.shape) if max_num_annots > 0: annot_padded = torch.ones((len(annots), max_num_annots, 5)) * -1 for idx, annot in enumerate(annots): # print(annot.shape) if annot.shape[0] > 0: annot_padded[idx, :annot.shape[0], :] = annot else: annot_padded = torch.ones((len(annots), 1, 5)) * -1 padded_imgs = padded_imgs.permute(0, 3, 1, 2) return {'img': padded_imgs, 'annot': annot_padded, 'scale': scales} class Resizer(object): """Convert ndarrays in sample to Tensors.""" def __call__(self, sample, min_side=800, max_side=1400): image, annots, scale = sample['img'], sample['annot'], sample['scale'] cols, rows = image.size # scale = min_side / rows smallest_side = min(rows, cols) # rescale the image so the smallest side is min_side scale = min_side / smallest_side # check if the largest side is now greater than max_side, which can happen # when images have a large aspect ratio largest_side = max(rows, cols) if largest_side * scale > max_side: scale = max_side / largest_side # resize the image with the computed scale image = np.array( image.resize((int(round((cols * scale))), int(round((rows * scale)))), resample=Image.BILINEAR)) image = image / 255.0 rows, cols, cns = image.shape pad_w = 32 - rows % 32 pad_h = 32 - cols % 32 new_image = np.zeros((rows + pad_w, cols + pad_h, cns)).astype(np.float32) new_image[:rows, :cols, :] = image.astype(np.float32) annots[:, :4] *= scale return {'img': new_image, 'annot': annots, 'scale': scale} class RandomEraser(object): def __init__(self): self.eraser = T.RandomErasing() def __call__(self, sample): image, annots, scales = sample['img'], sample['annot'], sample['scale'] image = self.eraser(image) sample = {'img': image, 'annot': annots, 'scale': scales} return sample class Augmenter(object): """Convert ndarrays in sample to Tensors.""" def __call__(self, sample, flip_x=0.5): if np.random.rand() < flip_x: image, annots, scales = sample['img'], sample['annot'], sample['scale'] image = image[:, ::-1, :] rows, cols, channels = image.shape x1 = annots[:, 0].copy() x2 = annots[:, 2].copy() x_tmp = x1.copy() annots[:, 0] = cols - x2 annots[:, 2] = cols - x_tmp sample = {'img': image, 'annot': annots, 'scale': scales} return sample class RandomCrop(object): def __call__(self, sample): image, annots, scales = sample['img'], sample['annot'], sample['scale'] if not annots.shape[0]: return {'img': image, 'annot': annots, 'scale': scales} if random.choice([0, 1]): return {'img': image, 'annot': annots, 'scale': scales} else: rows, cols, cns = image.shape flag = 0 while True: flag += 1 if flag > 10: return {'img': image, 'annot': annots, 'scale': scales} crop_ratio = random.uniform(0.5, 1) rows_zero = int(rows * random.uniform(0, 1 - crop_ratio)) cols_zero = int(cols * random.uniform(0, 1 - crop_ratio)) crop_rows = int(rows * crop_ratio) crop_cols = int(cols * crop_ratio) ''' new_image = image[rows_zero:rows_zero+crop_rows, cols_zero:cols_zero+crop_cols, :] new_image = cv2.resize(new_image, (cols, rows)) #new_image = skimage.transform.resize(new_image, (rows, cols)) new_annots = np.zeros((0, 5)) for i in range(annots.shape[0]): x1 = max(annots[i, 0] - cols_zero, 0) y1 = max(annots[i, 1] - rows_zero, 0) x2 = min(annots[i, 2] - cols_zero, crop_cols) y2 = min(annots[i, 3] - rows_zero, crop_rows) label = annots[i, 4] if x1 + 10 < x2 and y1 + 10 < y2: x1 /= crop_ratio y1 /= crop_ratio x2 /= crop_ratio y2 /= crop_ratio new_annots = np.append(new_annots, np.array([[x1, y1, x2, y2, label]]), axis=0) if not new_annots.shape[0]: continue ''' new_image = np.zeros((rows, cols, cns)) new_image[rows_zero:rows_zero + crop_rows, cols_zero:cols_zero + crop_cols, :] = \ image[ rows_zero:rows_zero + crop_rows, cols_zero:cols_zero + crop_cols, :] new_annots = np.zeros((0, 5)) for i in range(annots.shape[0]): x1 = max(cols_zero, annots[i, 0]) y1 = max(rows_zero, annots[i, 1]) x2 = min(cols_zero + crop_cols, annots[i, 2]) y2 = min(rows_zero + crop_rows, annots[i, 3]) label = annots[i, 4] if x1 + 10 < x2 and y1 + 10 < y2: new_annots = np.append(new_annots, np.array([[x1, y1, x2, y2, label]]), axis=0) if not new_annots.shape[0]: continue return {'img': new_image, 'annot': new_annots, 'scale': scales} class Color(object): def __call__(self, sample): image, annots, scales = sample['img'], sample['annot'], sample['scale'] image = Image.fromarray(image) ratio = [0.5, 0.8, 1.2, 1.5] if random.choice([0, 1]): enh_bri = ImageEnhance.Brightness(image) brightness = random.choice(ratio) image = enh_bri.enhance(brightness) if random.choice([0, 1]): enh_col = ImageEnhance.Color(image) color = random.choice(ratio) image = enh_col.enhance(color) if random.choice([0, 1]): enh_con = ImageEnhance.Contrast(image) contrast = random.choice(ratio) image = enh_con.enhance(contrast) if random.choice([0, 1]): enh_sha = ImageEnhance.Sharpness(image) sharpness = random.choice(ratio) image = enh_sha.enhance(sharpness) if random.choice([0, 1]): image = image.filter(ImageFilter.BLUR) image = np.asarray(image) return {'img': image, 'annot': annots, 'scale': scales} class Normalizer(object): def __init__(self): self.mean = np.array([[[0.485, 0.456, 0.406]]]) self.std = np.array([[[0.229, 0.224, 0.225]]]) def __call__(self, sample): image, annots, scales = sample['img'], sample['annot'], sample['scale'] image = (image.astype(np.float32) - self.mean) / self.std sample = {'img': torch.from_numpy(image), 'annot': torch.from_numpy(annots), 'scale': scales} return sample class UnNormalizer(object): def __init__(self, mean=None, std=None): if mean is None: self.mean = [0.485, 0.456, 0.406] else: self.mean = mean if std is None: self.std = [0.229, 0.224, 0.225] else: self.std = std def __call__(self, tensor): """ Args: tensor (Tensor): Tensor image of size (C, H, W) to be normalized. Returns: Tensor: Normalized image. """ for t, m, s in zip(tensor, self.mean, self.std): t.mul_(s).add_(m) return tensor class AspectRatioBasedSampler(Sampler): def __init__(self, data_source, batch_size, drop_last): self.data_source = data_source self.batch_size = batch_size self.drop_last = drop_last self.groups = self.group_images() def __iter__(self): random.shuffle(self.groups) for group in self.groups: yield group def __len__(self): if self.drop_last: return len(self.data_source) // self.batch_size else: return (len(self.data_source) + self.batch_size - 1) // self.batch_size def group_images(self): # determine the order of the images order = list(range(len(self.data_source))) order.sort(key=lambda x: self.data_source.image_aspect_ratio(x)) # divide into groups, one group = one batch return [[order[x % len(order)] for x in range(i, i + self.batch_size)] for i in range(0, len(order), self.batch_size)]