Face identification and recognition scalable server with multiple face directories.
https://github.com/ehp/faceserver
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
586 lines
20 KiB
586 lines
20 KiB
# -*- coding: utf-8 -*- |
|
""" |
|
Copyright 2019 Petr Masopust, Aprar s.r.o. |
|
|
|
Licensed under the Apache License, Version 2.0 (the "License"); |
|
you may not use this file except in compliance with the License. |
|
You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
See the License for the specific language governing permissions and |
|
limitations under the License. |
|
|
|
Adopted code from https://github.com/rainofmine/Face_Attention_Network |
|
""" |
|
|
|
import torch |
|
import numpy as np |
|
import random |
|
import csv |
|
import os |
|
|
|
from torch.utils.data import Dataset |
|
from torch.utils.data.sampler import Sampler |
|
|
|
from PIL import Image, ImageEnhance, ImageFilter |
|
|
|
|
|
class CSVDataset(Dataset): |
|
"""CSV dataset.""" |
|
|
|
def __init__(self, train_file, class_list, transform=None): |
|
""" |
|
Args: |
|
train_file (string): CSV file with training annotations |
|
class_list (string): CSV file with class list |
|
transform (optional): Transformation function |
|
""" |
|
self.train_file = train_file |
|
self.class_list = class_list |
|
self.transform = transform |
|
|
|
# parse the provided class file |
|
try: |
|
with open(self.class_list, 'r', newline='') as file: |
|
self.classes = CSVDataset.load_classes(csv.reader(file, delimiter=' ')) |
|
except ValueError as e: |
|
raise (ValueError('invalid CSV class file: {}: {}'.format(self.class_list, e)), None) |
|
|
|
self.labels = {} |
|
for key, value in self.classes.items(): |
|
self.labels[value] = key |
|
|
|
# csv with img_path, x1, y1, x2, y2, class_name |
|
try: |
|
with open(self.train_file, 'r', newline='') as file: |
|
self.image_data = CSVDataset._read_annotations(csv.reader(file, delimiter=' '), self.classes) |
|
except ValueError as e: |
|
raise (ValueError('invalid CSV annotations file: {}: {}'.format(self.train_file, e)), None) |
|
self.image_names = list(self.image_data.keys()) |
|
|
|
@staticmethod |
|
def _parse(value, function, fmt): |
|
""" |
|
Parse a string into a value, and format a nice ValueError if it fails. |
|
Returns `function(value)`. |
|
Any `ValueError` raised is catched and a new `ValueError` is raised |
|
with message `fmt.format(e)`, where `e` is the caught `ValueError`. |
|
""" |
|
try: |
|
return function(value) |
|
except ValueError as e: |
|
raise (ValueError(fmt.format(e)), None) |
|
|
|
@staticmethod |
|
def load_classes(csv_reader): |
|
result = {} |
|
|
|
for line, row in enumerate(csv_reader): |
|
line += 1 |
|
|
|
try: |
|
class_name, class_id = row |
|
except ValueError: |
|
raise (ValueError("line {}: format should be 'class_name,class_id'".format(line)), None) |
|
class_id = CSVDataset._parse(class_id, int, 'line {}: malformed class ID: {{}}'.format(line)) |
|
|
|
if class_name in result: |
|
raise ValueError('line {}: duplicate class name: \'{}\''.format(line, class_name)) |
|
result[class_name] = class_id |
|
return result |
|
|
|
def __len__(self): |
|
return len(self.image_names) |
|
|
|
def __getitem__(self, idx): |
|
img = self.load_image(idx) |
|
annot = self.load_annotations(idx) |
|
sample = {'img': img, 'annot': annot, 'scale': 1} |
|
if self.transform: |
|
sample = self.transform(sample) |
|
|
|
return sample |
|
|
|
def load_image(self, image_index): |
|
img = Image.open(filepath) |
|
img = img.convert(mode="RGB") |
|
|
|
return img |
|
|
|
def load_annotations(self, image_index): |
|
# get ground truth annotations |
|
annotation_list = self.image_data[self.image_names[image_index]] |
|
annotations = np.zeros((0, 5)) |
|
|
|
# some images appear to miss annotations (like image with id 257034) |
|
if len(annotation_list) == 0: |
|
return annotations |
|
|
|
# parse annotations |
|
for idx, a in enumerate(annotation_list): |
|
# some annotations have basically no width / height, skip them |
|
x1 = a['x1'] |
|
x2 = a['x2'] |
|
y1 = a['y1'] |
|
y2 = a['y2'] |
|
|
|
if (x2 - x1) < 1 or (y2 - y1) < 1: |
|
continue |
|
|
|
annotation = np.zeros((1, 5)) |
|
|
|
annotation[0, 0] = x1 |
|
annotation[0, 1] = y1 |
|
annotation[0, 2] = x2 |
|
annotation[0, 3] = y2 |
|
|
|
annotation[0, 4] = self.name_to_label(a['class']) |
|
annotations = np.append(annotations, annotation, axis=0) |
|
|
|
return annotations |
|
|
|
@staticmethod |
|
def _read_annotations(csv_reader, classes): |
|
result = {} |
|
for line, row in enumerate(csv_reader): |
|
line += 1 |
|
|
|
try: |
|
img_file, x1, y1, x2, y2, class_name = row[:6] |
|
except ValueError: |
|
raise (ValueError( |
|
"line {}: format should be 'img_file,x1,y1,x2,y2,class_name' or 'img_file,,,,,'".format(line)), |
|
None) |
|
|
|
if img_file not in result: |
|
result[img_file] = [] |
|
|
|
# If a row contains only an image path, it's an image without annotations. |
|
if (x1, y1, x2, y2, class_name) == ('.', '.', '.', '.', '.'): |
|
continue |
|
|
|
x1 = CSVDataset._parse(x1, int, 'line {}: malformed x1: {{}}'.format(line)) |
|
y1 = CSVDataset._parse(y1, int, 'line {}: malformed y1: {{}}'.format(line)) |
|
x2 = CSVDataset._parse(x2, int, 'line {}: malformed x2: {{}}'.format(line)) |
|
y2 = CSVDataset._parse(y2, int, 'line {}: malformed y2: {{}}'.format(line)) |
|
|
|
if class_name != 'ignore': |
|
# Check that the bounding box is valid. |
|
if x2 <= x1: |
|
raise ValueError('line {}: x2 ({}) must be higher than x1 ({})'.format(line, x2, x1)) |
|
if y2 <= y1: |
|
raise ValueError('line {}: y2 ({}) must be higher than y1 ({})'.format(line, y2, y1)) |
|
|
|
# check if the current class name is correctly present |
|
if class_name not in classes: |
|
raise ValueError( |
|
'line {}: unknown class name: \'{}\' (classes: {})'.format(line, class_name, classes)) |
|
|
|
result[img_file].append({'x1': x1, 'x2': x2, 'y1': y1, 'y2': y2, 'class': class_name}) |
|
return result |
|
|
|
def name_to_label(self, name): |
|
return self.classes[name] |
|
|
|
def label_to_name(self, label): |
|
return self.labels[label] |
|
|
|
def num_classes(self): |
|
return max(self.classes.values()) + 1 |
|
|
|
def image_aspect_ratio(self, image_index): |
|
image = Image.open(self.image_names[image_index]) |
|
return float(image.width) / float(image.height) |
|
|
|
|
|
class WIDERDataset(Dataset): |
|
"""Wider dataset.""" |
|
|
|
def __init__(self, train_file, img_prefix='', transform=None): |
|
""" |
|
Args: |
|
train_file (string): Wider txt file with training annotations |
|
img_prefix (string, optional): Prefix for images location |
|
transform (optional): Transformation function |
|
""" |
|
self.train_file = train_file |
|
self.transform = transform |
|
self.img_prefix = img_prefix |
|
|
|
# WIDER dataset has only faces. Enhance for additional face properties (see below). |
|
self.classes = {'face': 0} |
|
|
|
self.labels = {} |
|
for key, value in self.classes.items(): |
|
self.labels[value] = key |
|
|
|
# Wider file definition example: |
|
# file with image name, number of faces, x y w h blur expression illumination invalid occlusion pose |
|
# 0--Parade/0_Parade_marchingband_1_117.jpg |
|
# 9 |
|
# 69 359 50 36 1 0 0 0 0 1 |
|
# 227 382 56 43 1 0 1 0 0 1 |
|
# 296 305 44 26 1 0 0 0 0 1 |
|
# 353 280 40 36 2 0 0 0 2 1 |
|
# 885 377 63 41 1 0 0 0 0 1 |
|
# 819 391 34 43 2 0 0 0 1 0 |
|
# 727 342 37 31 2 0 0 0 0 1 |
|
# 598 246 33 29 2 0 0 0 0 1 |
|
# 740 308 45 33 1 0 0 0 2 1 |
|
try: |
|
with open(self.train_file, 'r') as file: |
|
self.image_data = WIDERDataset._read_data(file) |
|
except ValueError as e: |
|
raise (ValueError('invalid WIDER annotations file: {}: {}'.format(self.train_file, e)), None) |
|
self.image_names = list(self.image_data.keys()) |
|
|
|
def __len__(self): |
|
return len(self.image_names) |
|
|
|
def __getitem__(self, idx): |
|
img = self.load_image(idx) |
|
annot = self.load_annotations(idx) |
|
sample = {'img': img, 'annot': annot, 'scale': 1, 'img_name': self.image_names[idx]} |
|
if self.transform: |
|
sample = self.transform(sample) |
|
|
|
return sample |
|
|
|
def load_image(self, image_index): |
|
print('Loading image %s' % self.image_names[image_index]) |
|
img = Image.open(os.path.join(self.img_prefix, self.image_names[image_index])) |
|
img = img.convert(mode="RGB") |
|
|
|
return img |
|
|
|
def load_annotations(self, image_index): |
|
# get ground truth annotations |
|
annotation_list = self.image_data[self.image_names[image_index]] |
|
annotations = np.zeros((0, 5)) |
|
|
|
# some images appear to miss annotations (like image with id 257034) |
|
if len(annotation_list) == 0: |
|
return annotations |
|
|
|
# parse annotations |
|
for idx, a in enumerate(annotation_list): |
|
# some annotations have basically no width / height, skip them |
|
x1 = a['x1'] |
|
x2 = a['x2'] |
|
y1 = a['y1'] |
|
y2 = a['y2'] |
|
|
|
if (x2 - x1) < 1 or (y2 - y1) < 1: |
|
continue |
|
|
|
annotation = np.zeros((1, 5)) |
|
|
|
annotation[0, 0] = x1 |
|
annotation[0, 1] = y1 |
|
annotation[0, 2] = x2 |
|
annotation[0, 3] = y2 |
|
|
|
annotation[0, 4] = self.name_to_label(a['class']) |
|
annotations = np.append(annotations, annotation, axis=0) |
|
|
|
return annotations |
|
|
|
@staticmethod |
|
def _read_data(reader): |
|
result = {} |
|
counter = 0 |
|
img_file = None |
|
for line in reader: |
|
line = line.strip() |
|
if counter == 0: |
|
# file name or number of faces |
|
try: |
|
counter = int(line) |
|
except ValueError: |
|
if img_file and len(result[img_file]) == 0: |
|
print("Warning - no faces: %s" % img_file) |
|
img_file = line |
|
else: |
|
counter -= 1 |
|
# coordinates e.g. 370 170 9 13 2 0 0 0 2 0 |
|
nums = [int(x) for x in line.split()] |
|
result.setdefault(img_file, []).append({'x1': nums[0], 'x2': nums[0] + nums[2], |
|
'y1': nums[1], 'y2': nums[1] + nums[3], |
|
'class': 'face'}) |
|
return result |
|
|
|
def name_to_label(self, name): |
|
return self.classes[name] |
|
|
|
def label_to_name(self, label): |
|
return self.labels[label] |
|
|
|
def num_classes(self): |
|
return max(self.classes.values()) + 1 |
|
|
|
def image_aspect_ratio(self, image_index): |
|
image = Image.open(os.path.join(self.img_prefix, self.image_names[image_index])) |
|
return float(image.width) / float(image.height) |
|
|
|
|
|
def collater(data): |
|
imgs = [s['img'] for s in data] |
|
annots = [s['annot'] for s in data] |
|
scales = [s['scale'] for s in data] |
|
|
|
widths = [int(s.shape[0]) for s in imgs] |
|
heights = [int(s.shape[1]) for s in imgs] |
|
batch_size = len(imgs) |
|
|
|
max_width = np.array(widths).max() |
|
max_height = np.array(heights).max() |
|
|
|
padded_imgs = torch.zeros(batch_size, max_width, max_height, 3) |
|
|
|
for i in range(batch_size): |
|
img = imgs[i] |
|
padded_imgs[i, :int(img.shape[0]), :int(img.shape[1]), :] = img |
|
|
|
max_num_annots = max(annot.shape[0] for annot in annots) |
|
# print(annot_padded.shape) |
|
if max_num_annots > 0: |
|
annot_padded = torch.ones((len(annots), max_num_annots, 5)) * -1 |
|
for idx, annot in enumerate(annots): |
|
# print(annot.shape) |
|
if annot.shape[0] > 0: |
|
annot_padded[idx, :annot.shape[0], :] = annot |
|
else: |
|
annot_padded = torch.ones((len(annots), 1, 5)) * -1 |
|
|
|
padded_imgs = padded_imgs.permute(0, 3, 1, 2) |
|
|
|
return {'img': padded_imgs, 'annot': annot_padded, 'scale': scales} |
|
|
|
|
|
class Resizer(object): |
|
"""Convert ndarrays in sample to Tensors.""" |
|
|
|
def __call__(self, sample, min_side=800, max_side=1400): |
|
image, annots, scale = sample['img'], sample['annot'], sample['scale'] |
|
|
|
cols, rows = image.size |
|
|
|
# scale = min_side / rows |
|
|
|
smallest_side = min(rows, cols) |
|
|
|
# rescale the image so the smallest side is min_side |
|
scale = min_side / smallest_side |
|
|
|
# check if the largest side is now greater than max_side, which can happen |
|
# when images have a large aspect ratio |
|
largest_side = max(rows, cols) |
|
|
|
if largest_side * scale > max_side: |
|
scale = max_side / largest_side |
|
|
|
# resize the image with the computed scale |
|
|
|
image = np.array( |
|
image.resize((int(round((cols * scale))), int(round((rows * scale)))), resample=Image.BILINEAR)) |
|
image = image / 255.0 |
|
|
|
rows, cols, cns = image.shape |
|
|
|
pad_w = 32 - rows % 32 |
|
pad_h = 32 - cols % 32 |
|
|
|
new_image = np.zeros((rows + pad_w, cols + pad_h, cns)).astype(np.float32) |
|
new_image[:rows, :cols, :] = image.astype(np.float32) |
|
|
|
annots[:, :4] *= scale |
|
|
|
return {'img': new_image, 'annot': annots, 'scale': scale} |
|
|
|
|
|
class Augmenter(object): |
|
"""Convert ndarrays in sample to Tensors.""" |
|
|
|
def __call__(self, sample, flip_x=0.5): |
|
if np.random.rand() < flip_x: |
|
image, annots, scales = sample['img'], sample['annot'], sample['scale'] |
|
image = image[:, ::-1, :] |
|
|
|
rows, cols, channels = image.shape |
|
|
|
x1 = annots[:, 0].copy() |
|
x2 = annots[:, 2].copy() |
|
|
|
x_tmp = x1.copy() |
|
|
|
annots[:, 0] = cols - x2 |
|
annots[:, 2] = cols - x_tmp |
|
|
|
sample = {'img': image, 'annot': annots, 'scale': scales} |
|
|
|
return sample |
|
|
|
|
|
class RandomCrop(object): |
|
def __call__(self, sample): |
|
image, annots, scales = sample['img'], sample['annot'], sample['scale'] |
|
|
|
if not annots.shape[0]: |
|
return {'img': image, 'annot': annots, 'scale': scales} |
|
if random.choice([0, 1]): |
|
return {'img': image, 'annot': annots, 'scale': scales} |
|
else: |
|
rows, cols, cns = image.shape |
|
flag = 0 |
|
while True: |
|
flag += 1 |
|
if flag > 10: |
|
return {'img': image, 'annot': annots, 'scale': scales} |
|
|
|
crop_ratio = random.uniform(0.5, 1) |
|
rows_zero = int(rows * random.uniform(0, 1 - crop_ratio)) |
|
cols_zero = int(cols * random.uniform(0, 1 - crop_ratio)) |
|
crop_rows = int(rows * crop_ratio) |
|
crop_cols = int(cols * crop_ratio) |
|
''' |
|
new_image = image[rows_zero:rows_zero+crop_rows, cols_zero:cols_zero+crop_cols, :] |
|
new_image = cv2.resize(new_image, (cols, rows)) |
|
#new_image = skimage.transform.resize(new_image, (rows, cols)) |
|
|
|
new_annots = np.zeros((0, 5)) |
|
for i in range(annots.shape[0]): |
|
x1 = max(annots[i, 0] - cols_zero, 0) |
|
y1 = max(annots[i, 1] - rows_zero, 0) |
|
x2 = min(annots[i, 2] - cols_zero, crop_cols) |
|
y2 = min(annots[i, 3] - rows_zero, crop_rows) |
|
label = annots[i, 4] |
|
if x1 + 10 < x2 and y1 + 10 < y2: |
|
x1 /= crop_ratio |
|
y1 /= crop_ratio |
|
x2 /= crop_ratio |
|
y2 /= crop_ratio |
|
new_annots = np.append(new_annots, np.array([[x1, y1, x2, y2, label]]), axis=0) |
|
|
|
if not new_annots.shape[0]: |
|
continue |
|
''' |
|
new_image = np.zeros((rows, cols, cns)) |
|
new_image[rows_zero:rows_zero + crop_rows, cols_zero:cols_zero + crop_cols, :] = \ |
|
image[ |
|
rows_zero:rows_zero + crop_rows, |
|
cols_zero:cols_zero + crop_cols, |
|
:] |
|
|
|
new_annots = np.zeros((0, 5)) |
|
for i in range(annots.shape[0]): |
|
x1 = max(cols_zero, annots[i, 0]) |
|
y1 = max(rows_zero, annots[i, 1]) |
|
x2 = min(cols_zero + crop_cols, annots[i, 2]) |
|
y2 = min(rows_zero + crop_rows, annots[i, 3]) |
|
label = annots[i, 4] |
|
if x1 + 10 < x2 and y1 + 10 < y2: |
|
new_annots = np.append(new_annots, np.array([[x1, y1, x2, y2, label]]), axis=0) |
|
|
|
if not new_annots.shape[0]: |
|
continue |
|
|
|
return {'img': new_image, 'annot': new_annots, 'scale': scales} |
|
|
|
|
|
class Color(object): |
|
def __call__(self, sample): |
|
image, annots, scales = sample['img'], sample['annot'], sample['scale'] |
|
image = Image.fromarray(image) |
|
|
|
ratio = [0.5, 0.8, 1.2, 1.5] |
|
|
|
if random.choice([0, 1]): |
|
enh_bri = ImageEnhance.Brightness(image) |
|
brightness = random.choice(ratio) |
|
image = enh_bri.enhance(brightness) |
|
if random.choice([0, 1]): |
|
enh_col = ImageEnhance.Color(image) |
|
color = random.choice(ratio) |
|
image = enh_col.enhance(color) |
|
if random.choice([0, 1]): |
|
enh_con = ImageEnhance.Contrast(image) |
|
contrast = random.choice(ratio) |
|
image = enh_con.enhance(contrast) |
|
if random.choice([0, 1]): |
|
enh_sha = ImageEnhance.Sharpness(image) |
|
sharpness = random.choice(ratio) |
|
image = enh_sha.enhance(sharpness) |
|
if random.choice([0, 1]): |
|
image = image.filter(ImageFilter.BLUR) |
|
|
|
image = np.asarray(image) |
|
return {'img': image, 'annot': annots, 'scale': scales} |
|
|
|
|
|
class Normalizer(object): |
|
def __init__(self): |
|
self.mean = np.array([[[0.485, 0.456, 0.406]]]) |
|
self.std = np.array([[[0.229, 0.224, 0.225]]]) |
|
|
|
def __call__(self, sample): |
|
image, annots, scales = sample['img'], sample['annot'], sample['scale'] |
|
|
|
image = (image.astype(np.float32) - self.mean) / self.std |
|
|
|
sample = {'img': torch.from_numpy(image), 'annot': torch.from_numpy(annots), 'scale': scales} |
|
return sample |
|
|
|
|
|
class UnNormalizer(object): |
|
def __init__(self, mean=None, std=None): |
|
if mean is None: |
|
self.mean = [0.485, 0.456, 0.406] |
|
else: |
|
self.mean = mean |
|
if std is None: |
|
self.std = [0.229, 0.224, 0.225] |
|
else: |
|
self.std = std |
|
|
|
def __call__(self, tensor): |
|
""" |
|
Args: |
|
tensor (Tensor): Tensor image of size (C, H, W) to be normalized. |
|
Returns: |
|
Tensor: Normalized image. |
|
""" |
|
for t, m, s in zip(tensor, self.mean, self.std): |
|
t.mul_(s).add_(m) |
|
return tensor |
|
|
|
|
|
class AspectRatioBasedSampler(Sampler): |
|
def __init__(self, data_source, batch_size, drop_last): |
|
self.data_source = data_source |
|
self.batch_size = batch_size |
|
self.drop_last = drop_last |
|
self.groups = self.group_images() |
|
|
|
def __iter__(self): |
|
random.shuffle(self.groups) |
|
for group in self.groups: |
|
yield group |
|
|
|
def __len__(self): |
|
if self.drop_last: |
|
return len(self.data_source) // self.batch_size |
|
else: |
|
return (len(self.data_source) + self.batch_size - 1) // self.batch_size |
|
|
|
def group_images(self): |
|
# determine the order of the images |
|
order = list(range(len(self.data_source))) |
|
order.sort(key=lambda x: self.data_source.image_aspect_ratio(x)) |
|
|
|
# divide into groups, one group = one batch |
|
return [[order[x % len(order)] for x in range(i, i + self.batch_size)] for i in |
|
range(0, len(order), self.batch_size)]
|
|
|