CV计算机视觉核心09-图像分割FCN(Penn-Fudan Database数据集)
CV计算机视觉核心09-图像分割FCN(Penn-Fudan Database数据集)
·
CV计算机视觉核心09-图像分割FCN(Penn-Fudan Database数据集)
Penn-Fudan Database数据集下载地址:https://www.cis.upenn.edu/~jshi/ped_html/
1、首先读取数据
PennFudanDataset_main.py:
import os
import numpy as np
import torch
from PIL import Image
class PennFudanDataset(object):
def __init__(self, root, transforms):
self.root = root
self.transforms = transforms
# load all image files, sorting them to
# ensure that they are aligned
self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))
self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))
def __getitem__(self, idx):
# load images ad masks
img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])
mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])
img = Image.open(img_path).convert("RGB")
# note that we haven't converted the mask to RGB,
# because each color corresponds to a different instance
# with 0 being background
mask = Image.open(mask_path)
# convert the PIL Image into a numpy array
mask = np.array(mask)
# instances are encoded as different colors
obj_ids = np.unique(mask)
# first id is the background, so remove it
obj_ids = obj_ids[1:]
# split the color-encoded mask into a set
# of binary masks
masks = mask == obj_ids[:, None, None]
# get bounding box coordinates for each mask
num_objs = len(obj_ids)
boxes = []
for i in range(num_objs):
pos = np.where(masks[i])
xmin = np.min(pos[1])
xmax = np.max(pos[1])
ymin = np.min(pos[0])
ymax = np.max(pos[0])
boxes.append([xmin, ymin, xmax, ymax])
# convert everything into a torch.Tensor
boxes = torch.as_tensor(boxes, dtype=torch.float32)
# there is only one class
labels = torch.ones((num_objs,), dtype=torch.int64)
masks = torch.as_tensor(masks, dtype=torch.uint8)
image_id = torch.tensor([idx])
area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
# suppose all instances are not crowd
iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
target = {}
target["boxes"] = boxes
target["labels"] = labels
target["masks"] = masks
target["image_id"] = image_id
target["area"] = area
target["iscrowd"] = iscrowd
if self.transforms is not None:
img, target = self.transforms(img, target)
return img, target
def __len__(self):
return len(self.imgs)
import transforms as T
import transforms as T
def get_transform(train):
transforms = []
transforms.append(T.ToTensor())
if train:
transforms.append(T.RandomHorizontalFlip(0.5))
return T.Compose(transforms)
def main():
root = './PennFudanPed'
transforms = get_transform(train=False)
readData = PennFudanDataset(root,transforms)
print(readData.__len__())
if __name__ == '__main__':
main()
PennFudanDataset_main.py在这个python文件中有使用transforms内容:
transforms.py:
import random
import torch
from torchvision.transforms import functional as F
def _flip_coco_person_keypoints(kps, width):
flip_inds = [0, 2, 1, 4, 3, 6, 5, 8, 7, 10, 9, 12, 11, 14, 13, 16, 15]
flipped_data = kps[:, flip_inds]
flipped_data[..., 0] = width - flipped_data[..., 0]
# Maintain COCO convention that if visibility == 0, then x, y = 0
inds = flipped_data[..., 2] == 0
flipped_data[inds] = 0
return flipped_data
class Compose(object):
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, image, target):
for t in self.transforms:
image, target = t(image, target)
return image, target
class RandomHorizontalFlip(object):
def __init__(self, prob):
self.prob = prob
def __call__(self, image, target):
if random.random() < self.prob:
height, width = image.shape[-2:]
image = image.flip(-1)
bbox = target["boxes"]
bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
target["boxes"] = bbox
if "masks" in target:
target["masks"] = target["masks"].flip(-1)
if "keypoints" in target:
keypoints = target["keypoints"]
keypoints = _flip_coco_person_keypoints(keypoints, width)
target["keypoints"] = keypoints
return image, target
class ToTensor(object):
def __call__(self, image, target):
image = F.to_tensor(image)
return image, target
2、fcn模型
fcn.py:
# -*- coding: utf-8 -*-
from __future__ import print_function
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torchvision.models.vgg import VGG
class FCN32s(nn.Module):
def __init__(self, pretrained_net, n_class):
super().__init__()
self.n_class = n_class
self.pretrained_net = pretrained_net
#[homework] 补齐FCN32s网络各层得定义:
# ConvTranspose2d中参数一和二是channel通道数的变化。
self.relu = nn.ReLU(inplace=True)
self.deconv1 = nn.ConvTranspose2d(512,512,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn1 = nn.BatchNorm2d(512)
self.deconv2 = nn.ConvTranspose2d(512,256,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn2 = nn.BatchNorm2d(256)
self.deconv3 = nn.ConvTranspose2d(256,128,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn3 = nn.BatchNorm2d(128)
self.deconv4 = nn.ConvTranspose2d(128,64,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn4 = nn.BatchNorm2d(64)
self.deconv5 = nn.ConvTranspose2d(64,32,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn5 = nn.BatchNorm2d(32)
self.classifier = nn.Conv2d(32, n_class, kernel_size=1)
def forward(self, x):
#首先提取一下特征,vgg16,这样就可以获得特征值。
output = self.pretrained_net(x)
x5 = output['x5'] # size=(N, 512, x.H/32, x.W/32)
#将x5输入到逆卷积deconv1中,deconv是用来upsampling的
score = self.bn1(self.relu(self.deconv1(x5))) # size=(N, 512, x.H/16, x.W/16)
score = self.bn2(self.relu(self.deconv2(score))) # size=(N, 256, x.H/8, x.W/8)
score = self.bn3(self.relu(self.deconv3(score))) # size=(N, 128, x.H/4, x.W/4)
score = self.bn4(self.relu(self.deconv4(score))) # size=(N, 64, x.H/2, x.W/2)
score = self.bn5(self.relu(self.deconv5(score))) # size=(N, 32, x.H, x.W)
score = self.classifier(score) # size=(N, n_class, x.H/1, x.W/1)
return score # size=(N, n_class, x.H/1, x.W/1)
class FCN8s(nn.Module):
def __init__(self, pretrained_net, n_class):
super().__init__()
self.n_class = n_class
self.pretrained_net = pretrained_net
#[homework] 补齐FCN8s各层得定义
self.relu = nn.ReLU(inplace=True)
self.deconv1 = nn.ConvTranspose2d(512,512,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn1 = nn.BatchNorm2d(512)
self.deconv2 = nn.ConvTranspose2d(512,256,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn2 = nn.BatchNorm2d(256)
self.deconv3 = nn.ConvTranspose2d(256,128,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn3 = nn.BatchNorm2d(128)
self.deconv4 = nn.ConvTranspose2d(128,64,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn4 = nn.BatchNorm2d(64)
self.deconv5 = nn.ConvTranspose2d(64,32,kernel_size=3,stride=2,padding=1,dilation=1,output_padding=1)
self.bn5 = nn.BatchNorm2d(32)
self.classifier = nn.Conv2d(32, n_class, kernel_size=1)
def forward(self, x):
output = self.pretrained_net(x)
x5 = output['x5'] # size=(N, 512, x.H/32, x.W/32)
x4 = output['x4'] # size=(N, 512, x.H/16, x.W/16)
x3 = output['x3'] # size=(N, 256, x.H/8, x.W/8)
#[homework] 实现跳级结构
score = self.relu(self.deconv1(x5)) # size=(N, 512, x.H/16, x.W/16)
score = self.bn1(score+x4) # element-wise add, size=(N, 512, x.H/16, x.W/16)
score = self.relu(self.deconv2(score)) # size=(N, 256, x.H/8, x.W/8)
score = self.bn2(score+x3) # element-wise add, size=(N, 256, x.H/8, x.W/8)
score = self.bn3(self.relu(self.deconv3(score))) # size=(N, 128, x.H/4, x.W/4)
score = self.bn4(self.relu(self.deconv4(score))) # size=(N, 64, x.H/2, x.W/2)
score = self.bn5(self.relu(self.deconv5(score))) # size=(N, 32, x.H, x.W)
score = self.classifier(score) # size=(N, n_class, x.H/1, x.W/1)
return score # size=(N, n_class, x.H/1, x.W/1)
class VGGNet(VGG):
def __init__(self, pretrained=True, model='vgg16', requires_grad=True, remove_fc=True, show_params=False):
super().__init__(make_layers(cfg[model]))
self.ranges = ranges[model]
if pretrained:
exec("self.load_state_dict(models.%s(pretrained=True).state_dict())" % model)
if not requires_grad:
for param in super().parameters():
param.requires_grad = False
if remove_fc: # delete redundant fully-connected layer params, can save memory
del self.classifier
if show_params:
for name, param in self.named_parameters():
print(name, param.size())
def forward(self, x):
output = {}
# get the output of each maxpooling layer (5 maxpool in VGG net)
for idx in range(len(self.ranges)):
for layer in range(self.ranges[idx][0], self.ranges[idx][1]):
x = self.features[layer](x)
output["x%d"%(idx+1)] = x
return output
ranges = {
'vgg11': ((0, 3), (3, 6), (6, 11), (11, 16), (16, 21)),
'vgg13': ((0, 5), (5, 10), (10, 15), (15, 20), (20, 25)),
'vgg16': ((0, 5), (5, 10), (10, 17), (17, 24), (24, 31)),
'vgg19': ((0, 5), (5, 10), (10, 19), (19, 28), (28, 37))
}
# cropped version from https://github.com/pytorch/vision/blob/master/torchvision/models/vgg.py
cfg = {
'vgg11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'vgg13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'vgg16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
'vgg19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}
def make_layers(cfg, batch_norm=False):
layers = []
in_channels = 3
for v in cfg:
if v == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
if batch_norm:
layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
else:
layers += [conv2d, nn.ReLU(inplace=True)]
in_channels = v
return nn.Sequential(*layers)
if __name__ == "__main__":
batch_size, n_class, h, w = 10, 20, 160, 160
# test output size
vgg_model = VGGNet(pretrained=False,requires_grad=True)
input = torch.autograd.Variable(torch.randn(batch_size, 3, 224, 224))
output = vgg_model(input)
assert output['x5'].size() == torch.Size([batch_size, 512, 7, 7])
fcn_model = FCN32s(pretrained_net=vgg_model, n_class=n_class)
input = torch.autograd.Variable(torch.randn(batch_size, 3, h, w))
output = fcn_model(input)
assert output.size() == torch.Size([batch_size, n_class, h, w])
fcn_model = FCN8s(pretrained_net=vgg_model, n_class=n_class)
input = torch.autograd.Variable(torch.randn(batch_size, 3, h, w))
output = fcn_model(input)
assert output.size() == torch.Size([batch_size, n_class, h, w])
print("Pass size check")
# test a random batch, loss should decrease
fcn_model = FCN32s(pretrained_net=vgg_model, n_class=n_class)
criterion = nn.BCELoss()
optimizer = optim.SGD(fcn_model.parameters(), lr=1e-3, momentum=0.9)
input = torch.autograd.Variable(torch.randn(batch_size, 3, h, w))
y = torch.autograd.Variable(torch.randn(batch_size, n_class, h, w), requires_grad=False)
for iter in range(10):
optimizer.zero_grad()
output = fcn_model(input)
output = nn.functional.sigmoid(output)
loss = criterion(output, y)
loss.backward()
import pdb
pdb.set_trace()
print("iter{}, loss {}".format(iter, loss.data.item()))
optimizer.step()
3、训练模型
# -*- coding: utf-8 -*-
from __future__ import print_function
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.autograd import Variable
from torch.utils.data import DataLoader
import cv2
from fcn import VGGNet, FCN32s, FCN8s
#from matplotlib import pyplot as plt
import numpy as np
import time
import sys
import os
sys.path.append("../../")
from PennFudanDataset_main import *
#import pdb
#pdb.set_trace()
def collate_fn(batch):
return tuple(zip(*batch))
n_class = 2
batch_size = 6
epochs = 100
lr = 1e-4
momentum = 0
w_decay = 1e-5
step_size = 50
gamma = 0.5
configs = "FCNs-BCEWithLogits_batch{}_epoch{}_RMSprop_scheduler-step{}-gamma{}_lr{}_momentum{}_w_decay{}".format(batch_size, epochs, step_size, gamma, lr, momentum, w_decay)
print("Configs:", configs)
# create dir for model
model_dir = "models"
if not os.path.exists(model_dir):
os.makedirs(model_dir)
model_path = os.path.join(model_dir, configs)
use_gpu = torch.cuda.is_available()
num_gpu = list(range(torch.cuda.device_count()))
# our dataset has two classes only - background and person
num_classes = 2
# use our dataset and defined transformations
dataset = PennFudanDataset('./PennFudanPed', get_transform(train=True))
dataset_test = PennFudanDataset('./PennFudanPed', get_transform(train=False))
# split the dataset in train and test set
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])
# define training and validation data loaders
train_loader = torch.utils.data.DataLoader(
dataset, batch_size=2, shuffle=True, num_workers=4,
collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(
dataset_test, batch_size=3, shuffle=False, num_workers=4,
collate_fn=collate_fn)
print("That's it!")
#vgg_model = VGGNet(requires_grad=True, remove_fc=True)
print("not use pretrained VGG model")
vgg_model = VGGNet(pretrained=False,requires_grad=True, remove_fc=True)
fcn_model = FCN8s(pretrained_net=vgg_model, n_class=n_class)
if use_gpu:
ts = time.time()
vgg_model = vgg_model.cuda()
fcn_model = fcn_model.cuda()
fcn_model = nn.DataParallel(fcn_model, device_ids=num_gpu)
print("Finish cuda loading, time elapsed {}".format(time.time() - ts))
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.RMSprop(fcn_model.parameters(), lr=lr, momentum=momentum, weight_decay=w_decay)
scheduler = lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma) # decay LR by a factor of 0.5 every 30 epochs
#optimizer = optim.RMSprop(fcn_model.parameters(), lr=lr, momentum=momentum, weight_decay=w_decay)
# create dir for score
score_dir = os.path.join("scores", configs)
if not os.path.exists(score_dir):
os.makedirs(score_dir)
IU_scores = np.zeros((epochs, n_class))
pixel_scores = np.zeros(epochs)
def train():
for epoch in range(epochs):
ts = time.time()
for iter, batch in enumerate(train_loader):
optimizer.zero_grad()
inputs = input_process(batch)
_,labels = target_process(batch)
#if use_gpu:
# inputs = Variable(batch['X'].cuda())
# labels = Variable(batch['Y'].cuda())
#else:
# #inputs, labels = Variable(batch['X']), Variable(batch['Y'])
# inputs, labels = Variable(batch['X']), Variable(batch['Y'])
outputs = fcn_model(inputs)
#import pdb
#pdb.set_trace()
loss = criterion(outputs, labels.cuda())
loss.backward()
optimizer.step()
if iter % 10 == 0:
print("epoch{}, iter{}, loss: {}".format(epoch, iter, loss.data.item()))
print("Finish epoch {}, time elapsed {}".format(epoch, time.time() - ts))
torch.save(fcn_model, model_path)
val(epoch)
scheduler.step()
def input_process(batch):
#import pdb
#pdb.set_trace()
batch_size=len(batch[0])
input_batch= torch.zeros(batch_size,3,160,160)
for i in range(batch_size):
inputs_tmp = Variable(batch[0][i])
inputs_tmp1=cv2.resize(inputs_tmp.permute([1,2,0]).numpy(),(160,160))
inputs_tmp2=torch.tensor(inputs_tmp1).permute([2,0,1])
input_batch[i:i+1,:,:,:]= torch.unsqueeze(inputs_tmp2,0)
return input_batch
def target_process(batch):
batch_size=len(batch[0])
target_batch= torch.zeros(batch_size,160,160)
label_batch = torch.zeros(batch_size,2,160,160)
for i in range(batch_size):
#只处理batch中的第一张图片
masks=batch[1][i]['masks']
masks1=torch.sum(masks,0)
masks2=(masks1>0).float()
masks3=torch.tensor(cv2.resize(masks2.numpy(),(160,160)))
target_batch[i:i+1,:,:] = torch.unsqueeze(masks3,0)
label_batch[i:i+1,1,:,:] =torch.unsqueeze(masks3,0)
label_batch[i:i+1,0,:,:] =torch.unsqueeze(1-masks3,0)
#target = batch['l'].cpu().numpy().reshape(N, h, w)
return target_batch.numpy(),label_batch
def val(epoch):
fcn_model.eval()
total_ious = []
pixel_accs = []
for iter, batch in enumerate(val_loader):
inputs = input_process(batch)
target,label= target_process(batch)
output = fcn_model(inputs)
output = output.data.cpu().numpy()
N, _, h, w = output.shape
pred = output.transpose(0, 2, 3, 1).reshape(-1, n_class).argmax(axis=1).reshape(N, h, w)
for p, t in zip(pred, target):
total_ious.append(iou(p, t))
pixel_accs.append(pixel_acc(p, t))
# Calculate average IoU
total_ious = np.array(total_ious).T # n_class * val_len
ious = np.nanmean(total_ious, axis=1)
pixel_accs = np.array(pixel_accs).mean()
print("epoch{}, pix_acc: {}, meanIoU: {}, IoUs: {}".format(epoch, pixel_accs, np.nanmean(ious), ious))
IU_scores[epoch] = ious
np.save(os.path.join(score_dir, "meanIU"), IU_scores)
pixel_scores[epoch] = pixel_accs
np.save(os.path.join(score_dir, "meanPixel"), pixel_scores)
# borrow functions and modify it from https://github.com/Kaixhin/FCN-semantic-segmentation/blob/master/main.py
# Calculates class intersections over unions
def iou(pred, target):
ious = []
for cls in range(n_class):
pred_inds = pred == cls
target_inds = target == cls
intersection = pred_inds[target_inds].sum()
union = pred_inds.sum() + target_inds.sum() - intersection
if union == 0:
ious.append(float('nan')) # if there is no ground truth, do not include in evaluation
else:
ious.append(float(intersection) / max(union, 1))
# print("cls", cls, pred_inds.sum(), target_inds.sum(), intersection, float(intersection) / max(union, 1))
return ious
def pixel_acc(pred, target):
#import pdb
#pdb.set_trace()
correct = (pred == target).sum()
total = (target == target).sum()
return correct / total
if __name__ == "__main__":
val(0) # show the accuracy before training
train()

相关代码下载:https://download.csdn.net/download/m0_37755995/86262701
更多推荐




所有评论(0)